#!/usr/bin/env python import unittest import binascii from socket import AF_INET6 from framework import VppTestCase, VppTestRunner from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \ SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether, Dot1Q from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting from scapy.layers.inet import IP, UDP from scapy.utils import inet_pton, inet_ntop from util import ppp class TestSRv6(VppTestCase): """ SRv6 Test Case """ @classmethod def setUpClass(self): super(TestSRv6, self).setUpClass() def setUp(self): """ Perform test setup before each test case. """ super(TestSRv6, self).setUp() # packet sizes, inclusive L2 overhead self.pg_packet_sizes = [64, 512, 1518, 9018] # reset packet_infos self.reset_packet_infos() def tearDown(self): """ Clean up test setup after each test case. """ self.teardown_interfaces() super(TestSRv6, self).tearDown() def configure_interface(self, interface, ipv6=False, ipv4=False, ipv6_table_id=0, ipv4_table_id=0): """ Configure interface. :param ipv6: configure IPv6 on interface :param ipv4: configure IPv4 on interface :param ipv6_table_id: FIB table_id for IPv6 :param ipv4_table_id: FIB table_id for IPv4 """ self.logger.debug("Configuring interface %s" % (interface.name)) if ipv6: self.logger.debug("Configuring IPv6") interface.set_table_ip6(ipv6_table_id) interface.config_ip6() interface.resolve_ndp(timeout=5) if ipv4: self.logger.debug("Configuring IPv4") interface.set_table_ip4(ipv4_table_id) interface.config_ip4() interface.resolve_arp() interface.admin_up() def setup_interfaces(self, ipv6=[], ipv4=[], ipv6_table_id=[], ipv4_table_id=[]): """ Create and configure interfaces. :param ipv6: list of interface IPv6 capabilities :param ipv4: list of interface IPv4 capabilities :param ipv6_table_id: list of intf IPv6 FIB table_ids :param ipv4_table_id: list of intf IPv4 FIB table_ids :returns: List of created interfaces. """ # how many interfaces? if len(ipv6): count = len(ipv6) else: count = len(ipv4) self.logger.debug("Creating and configuring %d interfaces" % (count)) # fill up ipv6 and ipv4 lists if needed # not enabled (False) is the default if len(ipv6) < count: ipv6 += (count - len(ipv6)) * [False] if len(ipv4) < count: ipv4 += (count - len(ipv4)) * [False] # fill up table_id lists if needed # table_id 0 (global) is the default if len(ipv6_table_id) < count: ipv6_table_id += (count - len(ipv6_table_id)) * [0] if len(ipv4_table_id) < count: ipv4_table_id += (count - len(ipv4_table_id)) * [0] # create 'count' pg interfaces self.create_pg_interfaces(range(count)) # setup all interfaces for i in range(count): intf = self.pg_interfaces[i] self.configure_interface(intf, ipv6[i], ipv4[i], ipv6_table_id[i], ipv4_table_id[i]) if any(ipv6): self.logger.debug(self.vapi.cli("show ip6 neighbors")) if any(ipv4): self.logger.debug(self.vapi.cli("show ip arp")) self.logger.debug(self.vapi.cli("show interface")) self.logger.debug(self.vapi.cli("show hardware")) return self.pg_interfaces def teardown_interfaces(self): """ Unconfigure and bring down interface. """ self.logger.debug("Tearing down interfaces") # tear down all interfaces # AFAIK they cannot be deleted for i in self.pg_interfaces: self.logger.debug("Tear down interface %s" % (i.name)) i.admin_down() i.unconfig() i.set_table_ip4(0) i.set_table_ip6(0) @unittest.skipUnless(0, "PC to fix") def test_SRv6_T_Encaps(self): """ Test SRv6 Transit.Encaps behavior for IPv6. """ # send traffic to one destination interface # source and destination are IPv6 only self.setup_interfaces(ipv6=[True, True]) # configure FIB entries route = VppIpRoute(self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index, proto=DpoProto.DPO_PROTO_IP6)], is_ip6=1) route.add_vpp_config() # configure encaps IPv6 source address # needs to be done before SR Policy config # TODO: API? self.vapi.cli("set sr encaps source addr a3::") bsid = 'a3::9999:1' # configure SRv6 Policy # Note: segment list order: first -> last sr_policy = VppSRv6Policy( self, bsid=bsid, is_encap=1, sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT, weight=1, fib_table=0, segments=['a4::', 'a5::', 'a6::c7'], source='a3::') sr_policy.add_vpp_config() self.sr_policy = sr_policy # log the sr policies self.logger.info(self.vapi.cli("show sr policies")) # steer IPv6 traffic to a7::/64 into SRv6 Policy # use the bsid of the above self.sr_policy pol_steering = VppSRv6Steering( self, bsid=self.sr_policy.bsid, prefix="a7::", mask_width=64, traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6, sr_policy_index=0, table_id=0, sw_if_index=0) pol_steering.add_vpp_config() # log the sr steering policies self.logger.info(self.vapi.cli("show sr steering policies")) # create packets count = len(self.pg_packet_sizes) dst_inner = 'a7::1234' pkts = [] # create IPv6 packets without SRH packet_header = self.create_packet_header_IPv6(dst_inner) # create traffic stream pg0->pg1 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count)) # create IPv6 packets with SRH # packets with segments-left 1, active segment a7:: packet_header = self.create_packet_header_IPv6_SRH( sidlist=['a8::', 'a7::', 'a6::'], segleft=1) # create traffic stream pg0->pg1 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count)) # create IPv6 packets with SRH and IPv6 # packets with segments-left 1, active segment a7:: packet_header = self.create_packet_header_IPv6_SRH_IPv6( dst_inner, sidlist=['a8::', 'a7::', 'a6::'], segleft=1) # create traffic stream pg0->pg1 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count)) # send packets and verify received packets self.send_and_verify_pkts(self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps) # log the localsid counters self.logger.info(self.vapi.cli("show sr localsid")) # remove SR steering pol_steering.remove_vpp_config() self.logger.info(self.vapi.cli("show sr steering policies")) # remove SR Policies self.sr_policy.remove_vpp_config() self.logger.info(self.vapi.cli("show sr policies")) # remove FIB entries # done by tearDown # cleanup interfaces self.teardown_interfaces() @unittest.skipUnless(0, "PC to fix") def test_SRv6_T_Insert(self): """ Test SRv6 Transit.Insert behavior (IPv6 only). """ # send traffic to one destination interface # source and destination are IPv6 only self.setup_interfaces(ipv6=[True, True]) # configure FIB entries route = VppIpRoute(self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index, proto=DpoProto.DPO_PROTO_IP6)], is_ip6=1) route.add_vpp_config() # configure encaps IPv6 source address # needs to be done before SR Policy config # TODO: API? self.vapi.cli("set sr encaps source addr a3::") bsid = 'a3::9999:1' # configure SRv6 Policy # Note: segment list order: first -> last sr_policy = VppSRv6Policy( self, bsid=bsid, is_encap=0, sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT, weight=1, fib_table=0, segments=['a4::', 'a5::', 'a6::c7'], source='a3::') sr_policy.add_vpp_config() self.sr_policy = sr_policy # log the sr policies self.logger.info(self.vapi.cli("show sr policies")) # steer IPv6 traffic to a7::/64 into SRv6 Policy # use the bsid of the above self.sr_policy pol_steering = VppSRv6Steering( self, bsid=self.sr_policy.bsid, prefix="a7::", mask_width=64, traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6, sr_policy_index=0, table_id=0, sw_if_index=0) pol_steering.add_vpp_config() # log the sr steering policies self.logger.info(self.vapi.cli("show sr steering policies")) # create packets count = len(self.pg_packet_sizes) dst_inner = 'a7::1234' pkts = [] # create IPv6 packets without SRH packet_header = self.create_packet_header_IPv6(dst_inner) # create traffic stream pg0->pg1 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count)) # create IPv6 packets with SRH # packets with segments-left 1, active segment a7:: packet_header = self.create_packet_header_IPv6_SRH( sidlist=['a8::', 'a7::', 'a6::'], segleft=1) # create traffic stream pg0->pg1 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count)) # send packets and verify received packets self.send_and_verify_pkts(self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Insert) # log the localsid counters self.logger.info(self.vapi.cli("show sr localsid")) # remove SR steering pol_steering.remove_vpp_config() self.logger.info(self.vapi.cli("show sr steering policies")) # remove SR Policies self.sr_policy.remove_vpp_config() self.logger.info(self.vapi.cli("show sr policies")) # remove FIB entries # done by tearDown # cleanup interfaces self.teardown_interfaces() @unittest.skipUnless(0, "PC to fix") def test_SRv6_T_Encaps_IPv4(self): """ Test SRv6 Transit.Encaps behavior for IPv4. """ # send traffic to one destination interface # source interface is IPv4 only # destination interface is IPv6 only self.setup_interfaces(ipv6=[False, True], ipv4=[True, False]) # configure FIB entries route = VppIpRoute(self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index, proto=DpoProto.DPO_PROTO_IP6)], is_ip6=1) route.add_vpp_config() # configure encaps IPv6 source address # needs to be done before SR Policy config # TODO: API? self.vapi.cli("set sr encaps source addr a3::") bsid = 'a3::9999:1' # configure SRv6 Policy # Note: segment list order: first -> last sr_policy = VppSRv6Policy( self, bsid=bsid, is_encap=1, sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT, weight=1, fib_table=0, segments=['a4::', 'a5::', 'a6::c7'], source='a3::') sr_policy.add_vpp_config() self.sr_policy = sr_policy # log the sr policies self.logger.info(self.vapi.cli("show sr policies")) # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy # use the bsid of the above self.sr_policy pol_steering = VppSRv6Steering( self, bsid=self.sr_policy.bsid, prefix="7.1.1.0", mask_width=24, traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4, sr_policy_index=0, table_id=0, sw_if_index=0) pol_steering.add_vpp_config() # log the sr steering policies self.logger.info(self.vapi.cli("show
#!/usr/bin/env python

import socket
from util import ip4n_range, ip4_range
import unittest
from framework import VppTestCase, VppTestRunner
from template_bd import BridgeDomain

from scapy.layers.l2 import Ether, Raw
from scapy.layers.inet import IP, UDP
from scapy.layers.vxlan import VXLAN
from scapy.utils import atol

import StringIO


def reassemble(listoffragments):
    buffer = StringIO.StringIO()
    first = listoffragments[0]
    buffer.seek(20)
    for pkt in listoffragments:
        buffer.seek(pkt[IP].frag*8)
        buffer.write(pkt[IP].payload)
    first.len = len(buffer.getvalue()) + 20
    first.flags = 0
    del(first.chksum)
    header = str(first[IP])[:20]
    return first[IP].__class__(header + buffer.getvalue())


class TestVxlan(BridgeDomain, VppTestCase):
    """ VXLAN Test Case """

    def __init__(self, *args):
        BridgeDomain.__init__(self)
        VppTestCase.__init__(self, *args)

    def encapsulate(self, pkt, vni):
        """
        Encapsulate the original payload frame by adding VXLAN header with its
        UDP, IP and Ethernet fields
        """
        return (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
                UDP(sport=self.dport, dport=self.dport, chksum=0) /
                VXLAN(vni=vni, flags=self.flags) /
                pkt)

    def ip_range(self, start, end):
        """ range of remote ip's """
        return ip4_range(self.pg0.remote_ip4, start, end)

    def encap_mcast(self, pkt, src_ip, src_mac, vni):
        """
        Encapsulate the original payload frame by adding VXLAN header with its
        UDP, IP and Ethernet fields
        """
        return (Ether(src=src_mac, dst=self.mcast_mac) /
                IP(src=src_ip, dst=self.mcast_ip4) /
                UDP(sport=self.dport, dport=self.dport, chksum=0) /
                VXLAN(vni=vni, flags=self.flags) /
                pkt)

    def decapsulate(self, pkt):
        """
        Decapsulate the original payload frame by removing VXLAN header
        """
        # check if is set I flag
        self.assertEqual(pkt[VXLAN].flags, int('0x8', 16))
        return pkt[VXLAN].payload

    # Method for checking VXLAN encapsulation.
    #
    def check_encapsulation(self, pkt, vni, local_only=False, mcast_pkt=False):
        # TODO: add error messages
        # Verify source MAC is VPP_MAC and destination MAC is MY_MAC resolved
        #  by VPP using ARP.
        self.assertEqual(pkt[Ether].src, self.pg0.local_mac)
        if not local_only:
            if not mcast_pkt:
                self.assertEqual(pkt[Ether].dst, self.pg0.remote_mac)
            else:
                self.assertEqual(pkt[Ether].dst, type(self).mcast_mac)
        # Verify VXLAN tunnel source IP is VPP_IP and destination IP is MY_IP.
        self.assertEqual(pkt[IP].src, self.pg0.local_ip4)
        if not local_only:
            if not mcast_pkt:
                self.assertEqual(pkt[IP].dst, self.pg0.remote_ip4)
            else:
                self.assertEqual(pkt[IP].dst, type(self).mcast_ip4)
        # Verify UDP destination port is VXLAN 4789, source UDP port could be
        #  arbitrary.
        self.assertEqual(pkt[UDP].dport, type(self).dport)
        # TODO: checksum check
        # Verify VNI
        self.assertEqual(pkt[VXLAN].vni, vni)

    @classmethod
    def create_vxlan_flood_test_bd(cls, vni, n_ucast_tunnels):
        # Create 10 ucast vxlan tunnels under bd
        ip_range_start = 10
        ip_range_end = ip_range_start + n_ucast_tunnels
        next_hop_address = cls.pg0.remote_ip4n
        for dest_ip4n in ip4n_range(next_hop_address, ip_range_start,
                                    ip_range_end):
            # add host route so dest_ip4n will not be resolved
            cls.vapi.ip_add_del_route(dest_ip4n, 32, next_hop_address)
            r = cls.vapi.vxlan_add_del_tunnel(
                src_addr=cls.pg0.local_ip4n,
                dst_addr=dest_ip4n,
                vni=vni)
            cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index, bd_id=vni)

    @classmethod
    def add_del_shared_mcast_dst_load(cls, is_add):
        """
        add or del tunnels sharing the same mcast dst
        to test vxlan ref_count mechanism
        """
        n_shared_dst_tunnels = 20
        vni_start = 10000
        vni_end = vni_start + n_shared_dst_tunnels
        for vni in range(vni_start, vni_end):
            r = cls.vapi.vxlan_add_del_tunnel(
                src_addr=cls.pg0.local_ip4n,
                dst_addr=cls.mcast_ip4n,
                mcast_sw_if_index=1,
                vni=vni,
                is_add=is_add)
            if r.sw_if_index == 0xffffffff:
                raise "bad sw_if_index"

    @classmethod
    def add_shared_mcast_dst_load(cls):
        cls.add_del_shared_mcast_dst_load(is_add=1)

    @classmethod
    def del_shared_mcast_dst_load(cls):
        cls.add_del_shared_mcast_dst_load(is_add=0)

    @classmethod
    def add_del_mcast_tunnels_load(cls, is_add):
        """
        add or del tunnels to test vxlan stability
        """
        n_distinct_dst_tunnels = 200
        ip_range_start = 10
        ip_range_end = ip_range_start + n_distinct_dst_tunnels
        for dest_ip4n in ip4n_range(cls.mcast_ip4n, ip_range_start,
                                    ip_range_end):
            vni = bytearray(dest_ip4n)[3]
            cls.vapi.vxlan_add_del_tunnel(
                src_addr=cls.pg0.local_ip4n,
                dst_addr=dest_ip4n,
                mcast_sw_if_index=1,
                vni=vni,
                is_add=is_add)

    @classmethod
    def add_mcast_tunnels_load(cls):
        cls.add_del_mcast_tunnels_load(is_add=1)

    @classmethod
    def del_mcast_tunnels_load(cls):
        cls.add_del_mcast_tunnels_load(is_add=0)

    # Class method to start the VXLAN test case.
    #  Overrides setUpClass method in VppTestCase class.
    #  Python try..except statement is used to ensure that the tear down of
    #  the class will be executed even if exception is raised.
    #  @param cls The class pointer.
    @classmethod
    def setUpClass(cls):
        super(TestVxlan, cls).setUpClass()

        try:
            cls.dport = 4789
            cls.flags = 0x8

            # Create 2 pg interfaces.
            cls.create_pg_interfaces(range(4))
            for pg in cls.pg_interfaces:
                pg.admin_up()

            # Configure IPv4 addresses on VPP pg0.
            cls.pg0.config_ip4()

            # Resolve MAC address for VPP's IP address on pg0.
            cls.pg0.resolve_arp()

            # Our Multicast address
            cls.mcast_ip4 = '239.1.1.1'
            cls.mcast_ip4n = socket.inet_pton(socket.AF_INET, cls.mcast_ip4)
            iplong = atol(cls.mcast_ip4)
            cls.mcast_mac = "01:00:5e:%02x:%02x:%02x" % (
                (iplong >> 16) & 0x7F, (iplong >> 8) & 0xFF, iplong & 0xFF)

            # Create VXLAN VTEP on VPP pg0, and put vxlan_tunnel0 and pg1
            #  into BD.
            cls.single_tunnel_bd = 1
            r = cls.vapi.vxlan_add_del_tunnel(
                src_addr=cls.pg0.local_ip4n,
                dst_addr=cls.pg0.remote_ip4n,
                vni=cls.single_tunnel_bd)
            cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index,
                                                bd_id=cls.single_tunnel_bd)
            cls.vapi.sw_interface_set_l2_bridge(cls.pg1.sw_if_index,
                                                bd_id=cls.single_tunnel_bd)

            # Setup vni 2 to test multicast flooding
            cls.n_ucast_tunnels = 10
            cls.mcast_flood_bd = 2
            cls.create_vxlan_flood_test_bd(cls.mcast_flood_bd,
                                           cls.n_ucast_tunnels)
            r = cls.vapi.vxlan_add_del_tunnel(
                src_addr=cls.pg0.local_ip4n,
                dst_addr=cls.mcast_ip4n,
                mcast_sw_if_index=1,
                vni=cls.mcast_flood_bd)
            cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index,
                                                bd_id=cls.mcast_flood_bd)
            cls.vapi.sw_interface_set_l2_bridge(cls.pg2.sw_if_index,
                                                bd_id=cls.mcast_flood_bd)

            # Add and delete mcast tunnels to check stability
            cls.add_shared_mcast_dst_load()
            cls.add_mcast_tunnels_load()
            cls.del_shared_mcast_dst_load()
            cls.del_mcast_tunnels_load()

            # Setup vni 3 to test unicast flooding
            cls.ucast_flood_bd = 3
            cls.create_vxlan_flood_test_bd(cls.ucast_flood_bd,
                                           cls.n_ucast_tunnels)
            cls.vapi.sw_interface_set_l2_bridge(cls.pg3.sw_if_index,
                                                bd_id=cls.ucast_flood_bd)
        except Exception:
            super(TestVxlan, cls).tearDownClass()
            raise

    def test_encap_big_packet(self):
        """ Encapsulation test send big frame from pg1
        Verify receipt of encapsulated frames on pg0
        """

        self.vapi.sw_interface_set_mtu(self.pg0.sw_if_index, [1500, 0, 0, 0])

        frame = (Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') /
                 IP(src='4.3.2.1', dst='1.2.3.4') /
                 UDP(sport=20000, dport=10000) /
                 Raw('\xa5' * 1450))

        self.pg1.add_stream([frame])

        self.pg0.enable_capture()

        self.pg_start()

        # Pick first received frame and check if it's correctly encapsulated.
        out = self.pg0.get_capture(2)
        ether = out[0]
        pkt = reassemble(out)
        pkt = ether / pkt
        self.check_encapsulation(pkt, self.single_tunnel_bd)

        payload = self.decapsulate(pkt)
        # TODO: Scapy bug?
        # self.assert_eq_pkts(payload, frame)

    # Method to define VPP actions before tear down of the test case.
    #  Overrides tearDown method in VppTestCase class.
    #  @param self The object pointer.
    def tearDown(self):
        super(TestVxlan, self).tearDown()
        if not self.vpp_dead:
            self.logger.info(self.vapi.cli("show bridge-domain 1 detail"))
            self.logger.info(self.vapi.cli("show bridge-domain 2 detail"))
            self.logger.info(self.vapi.cli("show bridge-domain 3 detail"))
            self.logger.info(self.vapi.cli("show vxlan tunnel"))


if __name__ == '__main__':
    unittest.main(testRunner=VppTestRunner)
# in: IPv6(A, S3)IPv4(B, D) # out: IPv4(B, D) # get IPv4 header of rx'ed packet rx_ip = rx_pkt.getlayer(IP) tx_ip = tx_pkt.getlayer(IPv6) tx_ip2 = tx_pkt.getlayer(IP) # verify if rx'ed packet has no SRH self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) # the whole rx_ip pkt should be equal to tx_ip2 # except for the ttl field and ip checksum # -> adjust tx'ed ttl to expected ttl tx_ip2.ttl = tx_ip2.ttl - 1 # -> set tx'ed ip checksum to None and let scapy recompute tx_ip2.chksum = None # read back the pkt (with str()) to force computing these fields # probably other ways to accomplish this are possible tx_ip2 = IP(scapy.compat.raw(tx_ip2)) self.assertEqual(rx_ip, tx_ip2) self.logger.debug("packet verification: SUCCESS") def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt): """ Compare input and output packet after passing End.DX2 :param tx_pkt: transmitted packet :param rx_pkt: received packet """ # End.DX2 updates the headers as follows: # IPv6 + SRH (SL=0): # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2 # out: L2 # IPv6: # in: IPv6(A, S3)L2 # out: L2 # get IPv4 header of rx'ed packet rx_eth = rx_pkt.getlayer(Ether) tx_ip = tx_pkt.getlayer(IPv6) # we can't just get the 2nd Ether layer # get the Raw content and dissect it as Ether tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw])) # verify if rx'ed packet has no SRH self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting)) # the whole rx_eth pkt should be equal to tx_eth1 self.assertEqual(rx_eth, tx_eth1) self.logger.debug("packet verification: SUCCESS") def create_stream(self, src_if, dst_if, packet_header, packet_sizes, count): """Create SRv6 input packet stream for defined interface. :param VppInterface src_if: Interface to create packet stream for :param VppInterface dst_if: destination interface of packet stream :param packet_header: Layer3 scapy packet headers, L2 is added when not provided, Raw(payload) with packet_info is added :param list packet_sizes: packet stream pckt sizes,sequentially applied to packets in stream have :param int count: number of packets in packet stream :return: list of packets """ self.logger.info("Creating packets") pkts = [] for i in range(0, count-1): payload_info = self.create_packet_info(src_if, dst_if) self.logger.debug( "Creating packet with index %d" % (payload_info.index)) payload = self.info_to_payload(payload_info) # add L2 header if not yet provided in packet_header if packet_header.getlayer(0).name == 'Ethernet': p = (packet_header / Raw(payload)) else: p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / packet_header / Raw(payload)) size = packet_sizes[i % len(packet_sizes)] self.logger.debug("Packet size %d" % (size)) self.extend_packet(p, size) # we need to store the packet with the automatic fields computed # read back the dumped packet (with str()) # to force computing these fields # probably other ways are possible p = Ether(scapy.compat.raw(p)) payload_info.data = p.copy() self.logger.debug(ppp("Created packet:", p)) pkts.append(p) self.logger.info("Done creating packets") return pkts def send_and_verify_pkts(self, input, pkts, output, compare_func): """Send packets and verify received packets using compare_func :param input: ingress interface of DUT :param pkts: list of packets to transmit :param output: egress interface of DUT :param compare_func: function to compare in and out packets """ # add traffic stream to input interface input.add_stream(pkts) # enable capture on all interfaces self.pg_enable_capture(self.pg_interfaces) # start traffic self.logger.info("Starting traffic") self.pg_start() # get output capture self.logger.info("Getting packet capture") capture = output.get_capture() # assert nothing was captured on input interface input.assert_nothing_captured() # verify captured packets self.verify_captured_pkts(output, capture, compare_func) def create_packet_header_IPv6(self, dst): """Create packet header: IPv6 header, UDP header :param dst: IPv6 destination address IPv6 source address is 1234::1 UDP source port and destination port are 1234 """ p = (IPv6(src='1234::1', dst=dst) / UDP(sport=1234, dport=1234)) return p def create_packet_header_IPv6_SRH(self, sidlist, segleft): """Create packet header: IPv6 header with SRH, UDP header :param list sidlist: segment list :param int segleft: segments-left field value IPv6 destination address is set to sidlist[segleft] IPv6 source addresses are 1234::1 and 4321::1 UDP source port and destination port are 1234 """ p = (IPv6(src='1234::1', dst=sidlist[segleft]) / IPv6ExtHdrSegmentRouting(addresses=sidlist) / UDP(sport=1234, dport=1234)) return p def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft): """Create packet header: IPv6 encapsulated in SRv6: IPv6 header with SRH, IPv6 header, UDP header :param ipv6address dst: inner IPv6 destination address :param list sidlist: segment list of outer IPv6 SRH :param int segleft: segments-left field of outer IPv6 SRH Outer IPv6 destination address is set to sidlist[segleft] IPv6 source addresses are 1234::1 and 4321::1 UDP source port and destination port are 1234 """ p = (IPv6(src='1234::1', dst=sidlist[segleft]) / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=41) / IPv6(src='4321::1', dst=dst) / UDP(sport=1234, dport=1234)) return p def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer): """Create packet header: IPv6 encapsulated in IPv6: IPv6 header, IPv6 header, UDP header :param ipv6address dst_inner: inner IPv6 destination address :param ipv6address dst_outer: outer IPv6 destination address IPv6 source addresses are 1234::1 and 4321::1 UDP source port and destination port are 1234 """ p = (IPv6(src='1234::1', dst=dst_outer) / IPv6(src='4321::1', dst=dst_inner) / UDP(sport=1234, dport=1234)) return p def create_packet_header_IPv6_SRH_SRH_IPv6(self, dst, sidlist1, segleft1, sidlist2, segleft2): """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH: IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header :param ipv6address dst: inner IPv6 destination address :param list sidlist1: segment list of outer IPv6 SRH :param int segleft1: segments-left field of outer IPv6 SRH :param list sidlist2: segment list of inner IPv6 SRH :param int segleft2: segments-left field of inner IPv6 SRH Outer IPv6 destination address is set to sidlist[segleft] IPv6 source addresses are 1234::1 and 4321::1 UDP source port and destination port are 1234 """ p = (IPv6(src='1234::1', dst=sidlist1[segleft1]) / IPv6ExtHdrSegmentRouting(addresses=sidlist1, segleft=segleft1, nh=43) / IPv6ExtHdrSegmentRouting(addresses=sidlist2, segleft=segleft2, nh=41) / IPv6(src='4321::1', dst=dst) / UDP(sport=1234, dport=1234)) return p def create_packet_header_IPv4(self, dst): """Create packet header: IPv4 header, UDP header :param dst: IPv4 destination address IPv4 source address is 123.1.1.1 UDP source port and destination port are 1234 """ p = (IP(src='123.1.1.1', dst=dst) / UDP(sport=1234, dport=1234)) return p def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer): """Create packet header: IPv4 encapsulated in IPv6: IPv6 header, IPv4 header, UDP header :param ipv4address dst_inner: inner IPv4 destination address :param ipv6address dst_outer: outer IPv6 destination address IPv6 source address is 1234::1 IPv4 source address is 123.1.1.1 UDP source port and destination port are 1234 """ p = (IPv6(src='1234::1', dst=dst_outer) / IP(src='123.1.1.1', dst=dst_inner) / UDP(sport=1234, dport=1234)) return p def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft): """Create packet header: IPv4 encapsulated in SRv6: IPv6 header with SRH, IPv4 header, UDP header :param ipv4address dst: inner IPv4 destination address :param list sidlist: segment list of outer IPv6 SRH :param int segleft: segments-left field of outer IPv6 SRH Outer IPv6 destination address is set to sidlist[segleft] IPv6 source address is 1234::1 IPv4 source address is 123.1.1.1 UDP source port and destination port are 1234 """ p = (IPv6(src='1234::1', dst=sidlist[segleft]) / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=4) / IP(src='123.1.1.1', dst=dst) / UDP(sport=1234, dport=1234)) return p def create_packet_header_L2(self, vlan=0): """Create packet header: L2 header :param vlan: if vlan!=0 then add 802.1q header """ # Note: the dst addr ('00:55:44:33:22:11') is used in # the compare function compare_rx_tx_packet_T_Encaps_L2 # to detect presence of L2 in SRH payload p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11') etype = 0x8137 # IPX if vlan: # add 802.1q layer p /= Dot1Q(vlan=vlan, type=etype) else: p.type = etype return p def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0): """Create packet header: L2 encapsulated in SRv6: IPv6 header with SRH, L2 :param list sidlist: segment list of outer IPv6 SRH :param int segleft: segments-left field of outer IPv6 SRH :param vlan: L2 vlan; if vlan!=0 then add 802.1q header Outer IPv6 destination address is set to sidlist[segleft] IPv6 source address is 1234::1 """ eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11') etype = 0x8137 # IPX if vlan: # add 802.1q layer eth /= Dot1Q(vlan=vlan, type=etype) else: eth.type = etype p = (IPv6(src='1234::1', dst=sidlist[segleft]) / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=59) / eth) return p def create_packet_header_IPv6_L2(self, dst_outer, vlan=0): """Create packet header: L2 encapsulated in IPv6: IPv6 header, L2 :param ipv6address dst_outer: outer IPv6 destination address :param vlan: L2 vlan; if vlan!=0 then add 802.1q header """ eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11') etype = 0x8137 # IPX if vlan: # add 802.1q layer eth /= Dot1Q(vlan=vlan, type=etype) else: eth.type = etype p = (IPv6(src='1234::1', dst=dst_outer, nh=59) / eth) return p def get_payload_info(self, packet): """ Extract the payload_info from the packet """ # in most cases, payload_info is in packet[Raw] # but packet[Raw] gives the complete payload # (incl L2 header) for the T.Encaps L2 case try: payload_info = self.payload_to_info(packet[Raw]) except: # remote L2 header from packet[Raw]: # take packet[Raw], convert it to an Ether layer # and then extract Raw from it payload_info = self.payload_to_info( Ether(scapy.compat.r(packet[Raw]))[Raw]) return payload_info def verify_captured_pkts(self, dst_if, capture, compare_func): """ Verify captured packet stream for specified interface. Compare ingress with egress packets using the specified compare fn :param dst_if: egress interface of DUT :param capture: captured packets :param compare_func: function to compare in and out packet """ self.logger.info("Verifying capture on interface %s using function %s" % (dst_if.name, compare_func.__name__)) last_info = dict() for i in self.pg_interfaces: last_info[i.sw_if_index] = None dst_sw_if_index = dst_if.sw_if_index for packet in capture: try: # extract payload_info from packet's payload payload_info = self.get_payload_info(packet) packet_index = payload_info.index self.logger.debug("Verifying packet with index %d" % (packet_index)) # packet should have arrived on the expected interface self.assertEqual(payload_info.dst, dst_sw_if_index) self.logger.debug( "Got packet on interface %s: src=%u (idx=%u)" % (dst_if.name, payload_info.src, packet_index)) # search for payload_info with same src and dst if_index # this will give us the transmitted packet next_info = self.get_next_packet_info_for_interface2( payload_info.src, dst_sw_if_index, last_info[payload_info.src]) last_info[payload_info.src] = next_info # next_info should not be None self.assertTrue(next_info is not None) # index of tx and rx packets should be equal self.assertEqual(packet_index, next_info.index) # data field of next_info contains the tx packet txed_packet = next_info.data self.logger.debug(ppp("Transmitted packet:", txed_packet)) # ppp=Pretty Print Packet self.logger.debug(ppp("Received packet:", packet)) # compare rcvd packet with expected packet using compare_func compare_func(txed_packet, packet) except: self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise # have all expected packets arrived? for i in self.pg_interfaces: remaining_packet = self.get_next_packet_info_for_interface2( i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]) self.assertTrue(remaining_packet is None, "Interface %s: Packet expected from interface %s " "didn't arrive" % (dst_if.name, i.name)) if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)