#!/usr/bin/env python import unittest import binascii from socket import AF_INET6 from framework import VppTestCase, VppTestRunner from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \ SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes from scapy.packet import Raw from scapy.layers.l2 import Ether, Dot1Q from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting from scapy.layers.inet import IP, UDP from scapy.utils import inet_pton, inet_ntop from util import ppp class TestSRv6(VppTestCase): """ SRv6 Test Case """ @classmethod def setUpClass(self): super(TestSRv6, self).setUpClass() def setUp(self): """ Perform test setup before each test case. """ super(TestSRv6, self).setUp() # packet sizes, inclusive L2 overhead self.pg_packet_sizes = [64, 512, 1518, 9018] # reset packet_infos self.reset_packet_infos() def tearDown(self): """ Clean up test setup after each test case. """ self.teardown_interfaces() super(TestSRv6, self).tearDown() def configure_interface(self, interface, ipv6=False, ipv4=False, ipv6_table_id=0, ipv4_table_id=0): """ Configure interface. :param ipv6: configure IPv6 on interface :param ipv4: configure IPv4 on interface :param ipv6_table_id: FIB table_id for IPv6 :param ipv4_table_id: FIB table_id for IPv4 """ self.logger.debug("Configuring interface %s" % (interface.name)) if ipv6: self.logger.debug("Configuring IPv6") interface.set_table_ip6(ipv6_table_id) interface.config_ip6() interface.resolve_ndp(timeout=5) if ipv4: self.logger.debug("Configuring IPv4") interface.set_table_ip4(ipv4_table_id) interface.config_ip4() interface.resolve_arp() interface.admin_up() def setup_interfaces(self, ipv6=[], ipv4=[], ipv6_table_id=[], ipv4_table_id=[]): """ Create and configure interfaces. :param ipv6: list of interface IPv6 capabilities :param ipv4: list of interface IPv4 capabilities :param ipv6_table_id: list of intf IPv6 FIB table_ids :param ipv4_table_id: list of intf IPv4 FIB table_ids :returns: List of created interfaces. """ # how many interfaces? if len(ipv6): count = len(ipv6) else: count = len(ipv4) self.logger.debug("Creating and configuring %d interfaces" % (count)) # fill up ipv6 and ipv4 lists if needed # not enabled (False) is the default if len(ipv6) < count: ipv6 += (count - len(ipv6)) * [False] if len(ipv4) < count: ipv4 += (count - len(ipv4)) * [False] # fill up table_id lists if needed # table_id 0 (global) is the default if len(ipv6_table_id) < count: ipv6_table_id += (count - len(ipv6_table_id)) * [0] if len(ipv4_table_id) < count: ipv4_table_id += (count - len(ipv4_table_id)) * [0] # create 'count' pg interfaces self.create_pg_interfaces(range(count)) # setup all interfaces for i in range(count): intf = self.pg_interfaces[i] self.configure_interface(intf, ipv6[i], ipv4[i], ipv6_table_id[i], ipv4_table_id[i]) if any(ipv6): self.logger.debug(self.vapi.cli("show ip6 neighbors")) if any(ipv4): self.logger.debug(self.vapi.cli("show ip arp")) self.logger.debug(self.vapi.cli("show interface")) self.logger.debug(self.vapi.cli("show hardware")) return self.pg_interfaces def teardown_interfaces(self): """ Unconfigure and bring down interface. """ self.logger.debug("Tearing down interfaces") # tear down all interfaces # AFAIK they cannot be deleted for i in self.pg_interfaces: self.logger.debug("Tear down interface %s" % (i.name)) i.admin_down() i.unconfig() i.set_table_ip4(0) i.set_table_ip6(0) @unittest.skipUnless(0, "PC to fix") def test_SRv6_T_Encaps(self): """ Test SRv6 Transit.Encaps behavior for IPv6. """ # send traffic to one destination interface # source and destination are IPv6 only self.setup_interfaces(ipv6=[True, True]) # configure FIB entries route = VppIpRoute(self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index, proto=DpoProto.DPO_PROTO_IP6)], is_ip6=1) route.add_vpp_config() # configure encaps IPv6 source address # needs to be done before SR Policy config # TODO: API? self.vapi.cli("set sr encaps source addr a3::") bsid = 'a3::9999:1' # configure SRv6 Policy # Note: segment list order: first -> last sr_policy = VppSRv6Policy( self, bsid=bsid, is_encap=1, sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT, weight=1, fib_table=0, segments=['a4::', 'a5::', 'a6::c7'], source='a3::') sr_policy.add_vpp_config() self.sr_policy = sr_policy # log the sr policies self.logger.info(self.vapi.cli("show sr policies")) # steer IPv6 traffic to a7::/64 into SRv6 Policy # use the bsid of the above self.sr_policy pol_steering = VppSRv6Steering( self, bsid=self.sr_policy.bsid, prefix="a7::", mask_width=64, traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6, sr_policy_index=0, table_id=0, sw_if_index=0) pol_steering.add_vpp_config() # log the sr steering policies self.logger.info(self.vapi.cli("show sr steering policies")) # create pack
/*
* Copyright (c) 2020 Cisco and/or its affiliates.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <vnet/ip/vtep.h>
uword
vtep_addr_ref (vtep_table_t * t, u32 fib_index, ip46_address_t * ip)
{
vtep4_key_t key4 = {.addr = ip->ip4,.fib_index = fib_index };
vtep6_key_t key6 = {.addr = ip->ip6,.fib_index = fib_index };
uword *vtep = ip46_address_is_ip4 (ip) ?
hash_get (t->vtep4, key4.as_u64) : hash_get_mem (t->vtep6, &key6);
if (vtep)
return ++(*vtep);
ip46_address_is_ip4 (ip) ?
hash_set (t->vtep4, key4.as_u64, 1) :
hash_set_mem_alloc (&t->vtep6, &key6, 1);
return 1;
}
uword
vtep_addr_unref (vtep_table_t * t, u32 fib_index, ip46_address_t * ip)
{
vtep4_key_t key4 = {.addr = ip->ip4,.fib_index = fib_index };
vtep6_key_t key6 = {.addr = ip->ip6,.fib_index = fib_index };
uword *vtep = ip46_address_is_ip4 (ip) ?
hash_get (t->vtep4, key4.as_u64) : hash_get_mem (t->vtep6, &key6);
ALWAYS_ASSERT (vtep);
if (--(*vtep) != 0)
return *vtep;
ip46_address_is_ip4 (ip) ?
hash_unset (t->vtep4, key4.as_u64) :
hash_unset_mem_free (&t->vtep6, &key6);
return 0;
}
/*
* fd.io coding-style-patch-verification: ON
*
* Local Variables:
* eval: (c-set-style "gnu")
* End:
*/