summaryrefslogtreecommitdiffstats
path: root/test/test_vxlan6.py
blob: 5ae8e7e12687c35faba35c6473a5434504682dcf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#!/usr/bin/env python

import socket
import unittest
from framework import VppTestCase, VppTestRunner
from template_bd import BridgeDomain

from scapy.layers.l2 import Ether
from scapy.layers.inet6 import IPv6, UDP
from scapy.layers.vxlan import VXLAN
from scapy.utils import atol


class TestVxlan6(BridgeDomain, VppTestCase):
    """ VXLAN over IPv6 Test Case """

    def __init__(self, *args):
        BridgeDomain.__init__(self)
        VppTestCase.__init__(self, *args)

    def encapsulate(self, pkt, vni):
        """
        Encapsulate the original payload frame by adding VXLAN header with its
        UDP, IP and Ethernet fields
        """
        return (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
                IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
                UDP(sport=self.dport, dport=self.dport, chksum=0) /
                VXLAN(vni=vni, flags=self.flags) /
                pkt)

    @classmethod
    def ip_range(cls, s, e):
        """ range of remote ip's """
        tmp = cls.pg0.remote_ip6.rsplit(':', 1)[0]
        return ("%s:%x" % (tmp, i) for i in range(s, e))

    def encap_mcast(self, pkt, src_ip, src_mac, vni):
        """
        Encapsulate the original payload frame by adding VXLAN header with its
        UDP, IP and Ethernet fields
        """
        return (Ether(src=src_mac, dst=self.mcast_mac) /
                IPv6(src=src_ip, dst=self.mcast_ip6) /
                UDP(sport=self.dport, dport=self.dport, chksum=0) /
                VXLAN(vni=vni, flags=self.flags) /
                pkt)

    def decapsulate(self, pkt):
        """
        Decapsulate the original payload frame by removing VXLAN header
        """
        # check if is set I flag
        self.assertEqual(pkt[VXLAN].flags, int('0x8', 16))
        return pkt[VXLAN].payload

    # Method for checking VXLAN encapsulation.
    #
    def check_encapsulation(self, pkt, vni, local_only=False, mcast_pkt=False):
        # TODO: add error messages
        # Verify source MAC is VPP_MAC and destination MAC is MY_MAC resolved
        #  by VPP using ARP.
        self.assertEqual(pkt[Ether].src, self.pg0.local_mac)
        if not local_only:
            if not mcast_pkt:
                self.assertEqual(pkt[Ether].dst, self.pg0.remote_mac)
            else:
                self.assertEqual(pkt[Ether].dst, type(self).mcast_mac)
        # Verify VXLAN tunnel source IP is VPP_IP and destination IP is MY_IP.
        self.assertEqual(pkt[IPv6].src, self.pg0.local_ip6)
        if not local_only:
            if not mcast_pkt:
                self.assertEqual(pkt[IPv6].dst, self.pg0.remote_ip6)
            else:
                self.assertEqual(pkt[IPv6].dst, type(self).mcast_ip6)
        # Verify UDP destination port is VXLAN 4789, source UDP port could be
        #  arbitrary.
        self.assertEqual(pkt[UDP].dport, type(self).dport)
        # TODO: checksum check
        # Verify VNI
        self.assertEqual(pkt[VXLAN].vni, vni)

    @classmethod
    def create_vxlan_flood_test_bd(cls, vni, n_ucast_tunnels):
        # Create 10 ucast vxlan tunnels under bd
        start = 10
        end = start + n_ucast_tunnels
        next_hop = cls.pg0.remote_ip6n
        for dest_ip6 in cls.ip_range(start, end):
            dest_ip6n = socket.inet_pton(socket.AF_INET6, dest_ip6)
            # add host route so dest ip will not be resolved
            cls.vapi.ip_add_del_route(dest_ip6n, 128, next_hop, is_ipv6=1)
            r = cls.vapi.vxlan_add_del_tunnel(
                is_ipv6=1,
                src_addr=cls.pg0.local_ip6n,
                dst_addr=dest_ip6n,
                vni=vni)
            cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index, bd_id=vni)

    @classmethod
    def add_mcast_tunnels_load(cls):
        cls.add_del_mcast_tunnels_load(is_add=1)

    @classmethod
    def del_mcast_tunnels_load(cls):
        cls.add_del_mcast_tunnels_load(is_add=0)

    # Class method to start the VXLAN test case.
    #  Overrides setUpClass method in VppTestCase class.
    #  Python try..except statement is used to ensure that the tear down of
    #  the class will be executed even if exception is raised.
    #  @param cls The class pointer.
    @classmethod
    def setUpClass(cls):
        super(TestVxlan6, cls).setUpClass()

        try:
            cls.dport = 4789
            cls.flags = 0x8

            # Create 2 pg interfaces.
            cls.create_pg_interfaces(range(4))
            for pg in cls.pg_interfaces:
                pg.admin_up()

            # Configure IPv4 addresses on VPP pg0.
            cls.pg0.config_ip6()

            # Resolve MAC address for VPP's IP address on pg0.
            cls.pg0.resolve_ndp()

            cls.mcast_ip6 = 'ff0e::1'
            cls.mcast_ip6n = socket.inet_pton(socket.AF_INET6, cls.mcast_ip6)
            cls.mcast_mac = "33:33:00:00:00:%02x" % (1)

            # Create VXLAN VTEP on VPP pg0, and put vxlan_tunnel0 and pg1
            #  into BD.
            cls.single_tunnel_bd = 1
            r = cls.vapi.vxlan_add_del_tunnel(
                is_ipv6=1,
                src_addr=cls.pg0.local_ip6n,
                dst_addr=cls.pg0.remote_ip6n,
                vni=cls.single_tunnel_bd)
            cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index,
                                                bd_id=cls.single_tunnel_bd)
            cls.vapi.sw_interface_set_l2_bridge(cls.pg1.sw_if_index,
                                                bd_id=cls.single_tunnel_bd)

            # Setup vni 2 to test multicast flooding
            cls.n_ucast_tunnels = 10
            cls.mcast_flood_bd = 2
            cls.create_vxlan_flood_test_bd(cls.mcast_flood_bd,
                                           cls.n_ucast_tunnels)
            r = cls.vapi.vxlan_add_del_tunnel(
                mcast_sw_if_index=1,
                src_addr=cls.pg0.local_ip6n,
                dst_addr=cls.mcast_ip6n,
                vni=cls.mcast_flood_bd, is_ipv6=1)
            cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index,
                                                bd_id=cls.mcast_flood_bd)
            cls.vapi.sw_interface_set_l2_bridge(cls.pg2.sw_if_index,
                                                bd_id=cls.mcast_flood_bd)

            # Setup vni 3 to test unicast flooding
            cls.ucast_flood_bd = 3
            cls.create_vxlan_flood_test_bd(cls.ucast_flood_bd,
                                           cls.n_ucast_tunnels)
            cls.vapi.sw_interface_set_l2_bridge(cls.pg3.sw_if_index,
                                                bd_id=cls.ucast_flood_bd)
        except Exception:
            super(TestVxlan6, cls).tearDownClass()
            raise

    @classmethod
    def tearDownClass(cls):
        super(TestVxlan6, cls).tearDownClass()

    # Method to define VPP actions before tear down of the test case.
    #  Overrides tearDown method in VppTestCase class.
    #  @param self The object pointer.
    def tearDown(self):
        super(TestVxlan6, self).tearDown()
        if not self.vpp_dead:
            self.logger.info(self.vapi.cli("show bridge-domain 1 detail"))
            self.logger.info(self.vapi.cli("show bridge-domain 2 detail"))
            self.logger.info(self.vapi.cli("show bridge-domain 3 detail"))
            self.logger.info(self.vapi.cli("show vxlan tunnel"))


if __name__ == '__main__':
    unittest.main(testRunner=VppTestRunner)
n class="p">() pg_if.configure_ipv4_neighbors() self.logger.debug(self.vapi.ppcli("show ip fib")) self.logger.debug(self.vapi.ppcli("show ip arp")) def reset_vrf_and_remove_from_vrf_list(self, vrf_id): """ Reset required FIB table / VRF and remove it from VRF list. :param int vrf_id: The FIB table / VRF ID to be reset. """ # self.vapi.reset_vrf(vrf_id, is_ipv6=0) self.vapi.reset_fib(vrf_id, is_ipv6=0) if vrf_id in self.vrf_list: self.vrf_list.remove(vrf_id) if vrf_id not in self.vrf_reset_list: self.vrf_reset_list.append(vrf_id) for j in range(self.pg_ifs_per_vrf): pg_if = self.pg_if_by_vrf_id[vrf_id][j] pg_if.unconfig_ip4() if pg_if in self.pg_in_vrf: self.pg_in_vrf.remove(pg_if) if pg_if not in self.pg_not_in_vrf: self.pg_not_in_vrf.append(pg_if) self.logger.info("IPv4 VRF ID %d reset finished" % vrf_id) self.logger.debug(self.vapi.ppcli("show ip fib")) self.logger.debug(self.vapi.ppcli("show ip arp")) self.vapi.ip_table_add_del(is_add=0, table_id=vrf_id) def create_stream(self, src_if, packet_sizes): """ Create input packet stream for defined interface using hosts list. :param object src_if: Interface to create packet stream for. :param list packet_sizes: List of required packet sizes. :return: Stream of packets. """ pkts = [] src_hosts = src_if.remote_hosts for dst_if in self.flows[src_if]: for dst_host in dst_if.remote_hosts: src_host = random.choice(src_hosts) pkt_info = self.create_packet_info(src_if, dst_if) payload = self.info_to_payload(pkt_info) p = (Ether(dst=src_if.local_mac, src=src_host.mac) / IP(src=src_host.ip4, dst=dst_host.ip4) / UDP(sport=1234, dport=1234) / Raw(payload)) pkt_info.data = p.copy() size = random.choice(packet_sizes) self.extend_packet(p, size) pkts.append(p) self.logger.debug("Input stream created for port %s. Length: %u pkt(s)" % (src_if.name, len(pkts))) return pkts def create_stream_crosswise_vrf(self, src_if, vrf_id, packet_sizes): """ Create input packet stream for negative test for leaking across different VRFs for defined interface using hosts list. :param object src_if: Interface to create packet stream for. :param int vrf_id: The FIB table / VRF ID where src_if is assigned. :param list packet_sizes: List of required packet sizes. :return: Stream of packets. """ pkts = [] src_hosts = src_if.remote_hosts vrf_lst = list(self.vrf_list) vrf_lst.remove(vrf_id) for vrf in vrf_lst: for dst_if in self.pg_if_by_vrf_id[vrf]: for dst_host in dst_if.remote_hosts: src_host = random.choice(src_hosts) pkt_info = self.create_packet_info(src_if, dst_if) payload = self.info_to_payload(pkt_info) p = (Ether(dst=src_if.local_mac, src=src_host.mac) / IP(src=src_host.ip4, dst=dst_host.ip4) / UDP(sport=1234, dport=1234) / Raw(payload)) pkt_info.data = p.copy() size = random.choice(packet_sizes) self.extend_packet(p, size) pkts.append(p) self.logger.debug("Input stream created for port %s. Length: %u pkt(s)" % (src_if.name, len(pkts))) return pkts def verify_capture(self, pg_if, capture): """ Verify captured input packet stream for defined interface. :param object pg_if: Interface to verify captured packet stream for. :param list capture: Captured packet stream. """ last_info = dict() for i in self.pg_interfaces: last_info[i.sw_if_index] = None dst_sw_if_index = pg_if.sw_if_index for packet in capture: try: ip = packet[IP] udp = packet[UDP] payload_info = self.payload_to_info(packet[Raw]) packet_index = payload_info.index self.assertEqual(payload_info.dst, dst_sw_if_index) self.logger.debug("Got packet on port %s: src=%u (id=%u)" % (pg_if.name, payload_info.src, packet_index)) next_info = self.get_next_packet_info_for_interface2( payload_info.src, dst_sw_if_index, last_info[payload_info.src]) last_info[payload_info.src] = next_info self.assertIsNotNone(next_info) self.assertEqual(packet_index, next_info.index) saved_packet = next_info.data # Check standard fields self.assertEqual(ip.src, saved_packet[IP].src) self.assertEqual(ip.dst, saved_packet[IP].dst) self.assertEqual(udp.sport, saved_packet[UDP].sport) self.assertEqual(udp.dport, saved_packet[UDP].dport) except: self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise for i in self.pg_interfaces: remaining_packet = self.get_next_packet_info_for_interface2( i, dst_sw_if_index, last_info[i.sw_if_index]) self.assertIsNone( remaining_packet, "Port %u: Packet expected from source %u didn't arrive" % (dst_sw_if_index, i.sw_if_index)) def verify_vrf(self, vrf_id): """ Check if the FIB table / VRF ID is configured. :param int vrf_id: The FIB table / VRF ID to be verified. :return: 1 if the FIB table / VRF ID is configured, otherwise return 0. """ ip_fib_dump = self.vapi.ip_fib_dump() vrf_exist = False vrf_count = 0 for ip_fib_details in ip_fib_dump: if ip_fib_details.table_id == vrf_id: if not vrf_exist: vrf_exist = True addr = socket.inet_ntoa(ip_fib_details.address) found = False for pg_if in self.pg_if_by_vrf_id[vrf_id]: if found: break for host in pg_if.remote_hosts: if scapy.compat.raw(addr) == \ scapy.compat.raw(host.ip4): vrf_count += 1 found = True break if not vrf_exist and vrf_count == 0: self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id) return VRFState.not_configured elif vrf_exist and vrf_count == 0: self.logger.info("IPv4 VRF ID %d has been reset" % vrf_id) return VRFState.reset else: self.logger.info("IPv4 VRF ID %d is configured" % vrf_id) return VRFState.configured def run_verify_test(self): """ Create packet streams for all configured pg interfaces, send all \ prepared packet streams and verify that: - all packets received correctly on all pg-ip4 interfaces assigned to VRFs - no packet received on all pg-ip4 interfaces not assigned to VRFs :raise RuntimeError: If no packet captured on pg-ip4 interface assigned to VRF or if any packet is captured on pg-ip4 interface not assigned to VRF. """ # Test # Create incoming packet streams for packet-generator interfaces for pg_if in self.pg_interfaces: pkts = self.create_stream(pg_if, self.pg_if_packet_sizes) pg_if.add_stream(pkts) # Enable packet capture and start packet sending self.pg_enable_capture(self.pg_interfaces) self.pg_start() # Verify # Verify outgoing packet streams per packet-generator interface for pg_if in self.pg_interfaces: if pg_if in self.pg_in_vrf: capture = pg_if.get_capture(remark="interface is in VRF") self.verify_capture(pg_if, capture) elif pg_if in self.pg_not_in_vrf: pg_if.assert_nothing_captured(remark="interface is not in VRF", filter_out_fn=is_ipv4_misc) self.logger.debug("No capture for interface %s" % pg_if.name) else: raise Exception("Unknown interface: %s" % pg_if.name) def run_crosswise_vrf_test(self): """ Create packet streams for every pg-ip4 interface in VRF towards all pg-ip4 interfaces in other VRFs, send all prepared packet streams and \ verify that: - no packet received on all configured pg-ip4 interfaces :raise RuntimeError: If any packet is captured on any pg-ip4 interface. """ # Test # Create incoming packet streams for packet-generator interfaces for vrf_id in self.vrf_list: for pg_if in self.pg_if_by_vrf_id[vrf_id]: pkts = self.create_stream_crosswise_vrf( pg_if, vrf_id, self.pg_if_packet_sizes) pg_if.add_stream(pkts) # Enable packet capture and start packet sending self.pg_enable_capture(self.pg_interfaces) self.pg_start() # Verify # Verify outgoing packet streams per packet-generator interface for pg_if in self.pg_interfaces: pg_if.assert_nothing_captured(remark="interface is in other VRF", filter_out_fn=is_ipv4_misc) self.logger.debug("No capture for interface %s" % pg_if.name) def test_ip4_vrf_01(self): """ IP4 VRF Multi-instance test 1 - create 4 VRFs """ # Config 1 # Create 4 VRFs self.create_vrf_and_assign_interfaces(4) # Verify 1 for vrf_id in self.vrf_list: self.assert_equal(self.verify_vrf(vrf_id), VRFState.configured, VRFState) # Test 1 self.run_verify_test() self.run_crosswise_vrf_test() def test_ip4_vrf_02(self): """ IP4 VRF Multi-instance test 2 - reset 2 VRFs """ # Config 2 # Reset 2 VRFs self.reset_vrf_and_remove_from_vrf_list(1) self.reset_vrf_and_remove_from_vrf_list(2) # Verify 2 for vrf_id in self.vrf_reset_list: self.assert_equal(self.verify_vrf(vrf_id), VRFState.reset, VRFState) for vrf_id in self.vrf_list: self.assert_equal(self.verify_vrf(vrf_id), VRFState.configured, VRFState) # Test 2 self.run_verify_test() self.run_crosswise_vrf_test() def test_ip4_vrf_03(self): """ IP4 VRF Multi-instance 3 - add 2 VRFs """ # Config 3 # Add 1 of reset VRFs and 1 new VRF self.create_vrf_and_assign_interfaces(1) self.create_vrf_and_assign_interfaces(1, start=5) # Verify 3 for vrf_id in self.vrf_reset_list: self.assert_equal(self.verify_vrf(vrf_id), VRFState.reset, VRFState) for vrf_id in self.vrf_list: self.assert_equal(self.verify_vrf(vrf_id), VRFState.configured, VRFState) # Test 3 self.run_verify_test() self.run_crosswise_vrf_test() def test_ip4_vrf_04(self): """ IP4 VRF Multi-instance test 4 - reset 4 VRFs """ # Config 4 # Reset all VRFs (i.e. no VRF except VRF=0 configured) for i in range(len(self.vrf_list)): self.reset_vrf_and_remove_from_vrf_list(self.vrf_list[0]) # Verify 4 for vrf_id in self.vrf_reset_list: self.assert_equal(self.verify_vrf(vrf_id), VRFState.reset, VRFState) vrf_list_length = len(self.vrf_list) self.assertEqual( vrf_list_length, 0, "List of configured VRFs is not empty: %s != 0" % vrf_list_length) # Test 4 self.run_verify_test() self.run_crosswise_vrf_test() if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)