#!/usr/bin/env python from socket import AF_INET, AF_INET6 import unittest from scapy.packet import Raw from scapy.layers.l2 import Ether, ARP, Dot1Q from scapy.layers.inet import IP, UDP, ICMP from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6NDOptSrcLLAddr, \ ICMPv6ND_NA, ICMPv6EchoRequest from scapy.utils6 import in6_getnsma, in6_getnsmac from scapy.layers.vxlan import VXLAN from scapy.data import ETH_P_IP, ETH_P_IPV6, ETH_P_ARP from scapy.utils import inet_pton, inet_ntop from framework import VppTestCase, VppTestRunner from vpp_object import VppObject from vpp_interface import VppInterface from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable, \ VppIpInterfaceAddress, VppIpInterfaceBind, find_route, FibPathProto, \ FibPathType from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort, \ VppBridgeDomainArpEntry, VppL2FibEntry, find_bridge_domain_port, VppL2Vtr from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint from vpp_ip import VppIpAddress, VppIpPrefix, DpoProto from vpp_papi import VppEnum, MACAddress from vpp_vxlan_gbp_tunnel import find_vxlan_gbp_tunnel, INDEX_INVALID, \ VppVxlanGbpTunnel from vpp_neighbor import VppNeighbor try: text_type = unicode except NameError: text_type = str NUM_PKTS = 67 def find_gbp_endpoint(test, sw_if_index=None, ip=None, mac=None, tep=None, sclass=None): if ip: vip = VppIpAddress(ip) if mac: vmac = MACAddress(mac) eps = test.vapi.gbp_endpoint_dump() for ep in eps: if tep: src = VppIpAddress(tep[0]) dst = VppIpAddress(tep[1]) if src != ep.endpoint.tun.src or dst != ep.endpoint.tun.dst: continue if sw_if_index: if ep.endpoint.sw_if_index != sw_if_index: continue if sclass: if ep.endpoint.sclass != sclass: continue if ip: for eip in ep.endpoint.ips: if vip == eip: return True if mac: if vmac.packed == ep.endpoint.mac: return True return False def find_gbp_vxlan(test, vni): ts = test.vapi.gbp_vxlan_tunnel_dump() for t in ts: if t.tunnel.vni == vni: return True return False class VppGbpEndpoint(VppObject): """ GBP Endpoint """ @property def mac(self): return str(self.vmac) @property def ip4(self): return self._ip4 @property def fip4(self): return self._fip4 @property def ip6(self): return self._ip6 @property def fip6(self): return self._fip6 @property def ips(self): return [self.ip4, self.ip6] @property def fips(self): return [self.fip4, self.fip6] def __init__(self, test, itf, epg, recirc, ip4, fip4, ip6, fip6, flags=0, tun_src="0.0.0.0", tun_dst="0.0.0.0", mac=True): self._test = test self.itf = itf self.epg = epg self.recirc = recirc self._ip4 = VppIpAddress(ip4) self._fip4 = VppIpAddress(fip4) self._ip6 = VppIpAddress(ip6) self._fip6 = VppIpAddress(fip6) if mac: self.vmac = MACAddress(self.itf.remote_mac) else: self.vmac = MACAddress("00:00:00:00:00:00") self.flags = flags self.tun_src = VppIpAddress(tun_src) self.tun_dst = VppIpAddress(tun_dst) def add_vpp_config(self): res = self._test.vapi.gbp_endpoint_add( self.itf.sw_if_index, [self.ip4.encode(), self.ip6.encode()], self.vmac.packed, self.epg.sclass, self.flags, self.tun_src.encode(), self.tun_dst.encode()) self.handle = res.handle self._test.registry.register(self, self._test.logger) def remove_vpp_config(self): self._test.vapi.gbp_endpoint_del(self.handle) def object_id(self): return "gbp-endpoint:[%d==%d:%s:%d]" % (self.handle, self.itf.sw_if_index, self.ip4.address, self.epg.sclass) def query_vpp_config(self): return find_gbp_endpoint(self._test, self.itf.sw_if_index, self.ip4.address) class VppGbpRecirc(VppObject): """ GBP Recirculation Interface """ def __init__(self, test, epg, recirc, is_ext=False): self._test = test self.recirc = recirc self.epg = epg self.is_ext = is_ext def add_vpp_config(self): self._test.vapi.gbp_recirc_add_del( 1, self.recirc.sw_if_index, self.epg.sclass, self.is_ext) self._test.registry.register(self, self._test.logger) def remove_vpp_config(self): self._test.vapi.gbp_recirc_add_del( 0, self.recirc.sw_if_index, self.epg.sclass, self.is_ext) def object_id(self): return "gbp-recirc:[%d]" % (self.recirc.sw_if_index) def query_vpp_config(self): rs = self._test.vapi.gbp_recirc_dump() for r in rs: if r.recirc.sw_if_index == self.recirc.sw_if_index: return True return False class VppGbpExtItf(VppObject): """ GBP ExtItfulation Interface """ def __init__(self, test, itf, bd, rd, anon=False): self._test = test self.itf = itf self.bd = bd self.rd = rd self.flags = 1 if anon else 0 def add_vpp_config(self): self._test.vapi.gbp_ext_itf_add_del( 1, self.itf.sw_if_index, self.bd.bd_id, self.rd.rd_id, self.flags) self._test.registry.register(self, self._test.logger) def remove_vpp_config(self): self._test.vapi.gbp_ext_itf_add_del( 0, self.itf.sw_if_index, self.bd.bd_id, self.rd.rd_id, self.flags) def object_id(self): return "gbp-ext-itf:[%d]%s" % (self.itf.sw_if_index, " [anon]" if self.flags else "") def query_vpp_config(self): rs = self._test.vapi.gbp_ext_itf_dump() for r in rs: if r.ext_itf.sw_if_index == self.itf.sw_if_index: return True return False class VppGbpSubnet(VppObject): """ GBP Subnet """ def __init__(self, test, rd, address, address_len, type, sw_if_index=None, sclass=None): self._test = test self.rd_id = rd.rd_id self.prefix = VppIpPrefix(address, address_len) self.type = type self.sw_if_index = sw_if_index self.sclass = sclass def add_vpp_config(self): self._test.vapi.gbp_subnet_add_del( 1, self.rd_id, self.prefix.encode(), self.type, sw_if_index=self.sw_if_index if self.sw_if_index else 0xffffffff, sclass=self.sclass if self.sclass else 0xffff) self._test.registry.register(self, self._test.logger) def remove_vpp_config(self): self._test.vapi.gbp_subnet_add_del( 0, self.rd_id, self.prefix.encode(), self.type) def object_id(self): return "gbp-subnet:[%d-%s]" % (self.rd_id, self.prefix) def query_vpp_config(self): ss = self._test.vapi.gbp_subnet_dump() for s in ss: if s.subnet.rd_id == self.rd_id and \ s.subnet.type == self.type and \ s.subnet.prefix == self.prefix: return True return False class VppGbpEndpointRetention(object): def __init__(self, remote_ep_timeout=0xffffffff): self.remote_ep_timeout = remote_ep_timeout def encode(self): return {'remote_ep_timeout': self.remote_ep_timeout} class VppGbpEndpointGroup(VppObject): """ GBP Endpoint Group """ def __init__(self, test, vnid, sclass, rd, bd, uplink, bvi, bvi_ip4, bvi_ip6=None, retention=VppGbpEndpointRetention()): self._test = test self.uplink = uplink self.bvi = bvi self.bvi_ip4 = VppIpAddress(bvi_ip4) self.bvi_ip6 = VppIpAddress(bvi_ip6) self.vnid = vnid self.bd = bd self.rd = rd self.sclass = sclass if 0 == self.sclass: self.sclass = 0xffff self.retention = retention def add_vpp_config(self): self._test.vapi.gbp_endpoint_group_add( self.vnid, self.sclass, self.bd.bd.bd_id, self.rd.rd_id, self.uplink.sw_if_index if self.uplink else INDEX_INVALID, self.retention.encode()) self._test.registry.register(self, self._test.logger) def remove_vpp_config(self): self._test.vapi.gbp_endpoint_group_del(self.sclass) def object_id(self): return "gbp-endpoint-group:[%d]" % (self.vnid) def query_vpp_config(self): epgs = self._test.vapi.gbp_endpoint_group_dump() for epg in epgs: if epg.epg.vnid == self.vnid: return True return False class VppGbpBridgeDomain(VppObject): """ GBP Bridge Domain """ def __init__(self, test, bd, rd, bvi, uu_fwd=None, bm_flood=None, learn=True, uu_drop=False, bm_drop=False, ucast_arp=False): self._test = test self.bvi = bvi self.uu_fwd = uu_fwd self.bm_flood = bm_flood self.bd = bd self.rd = rd e = VppEnum.vl_api_gbp_bridge_domain_flags_t self.flags = e.GBP_BD_API_FLAG_NONE if not learn: self.flags |= e.GBP_BD_API_FLAG_DO_NOT_LEARN if uu_drop: self.flags |= e.GBP_BD_API_FLAG_UU_FWD_DROP if bm_drop: self.flags |= e.GBP_BD_API_FLAG_MCAST_DROP if ucast_arp: self.flags |= e.GBP_BD_API_FLAG_UCAST_ARP def add_vpp_config(self): self._test.vapi.gbp_bridge_domain_add( self.bd.bd_id, self.rd.rd_id, self.flags, self.bvi.sw_if_index, self.uu_fwd.sw_if_index if self.uu_fwd else INDEX_INVALID, self.bm_flood.sw_if_index if self.bm_flood else INDEX_INVALID) self._test.registry.register(self, self._test.logger) def remove_vpp_config(self): self._test.vapi.gbp_bridge_domain_del(self.bd.bd_id) def object_id(self): return "gbp-bridge-domain:[%d]" % (self.bd.bd_id) def query_vpp_config(self): bds = self._test.vapi.gbp_bridge_domain_dump() for bd in bds: if bd.bd.bd_id == self.bd.bd_id: return True return False class VppGbpRouteDomain(VppObject): """ GBP Route Domain """ def __init__(self, test, rd_id, scope, t4, t6, ip4_uu=None, ip6_uu=None): self._test = test self.rd_id = rd_id self.scope = scope self.t4 = t4 self.t6 = t6 self.ip4_uu = ip4_uu self.ip6_uu = ip6_uu def add_vpp_config(self): self._test.vapi.gbp_route_domain_add( self.rd_id, self.scope, self.t4.table_id, self.t6.table_id, self.ip4_uu.sw_if_index if self.ip4_uu else INDEX_INVALID, self.ip6_uu.sw_if_index if self.ip6_uu else INDEX_INVALID) self._test.registry.register(self, self._test.logger) def remove_vpp_config(self): self._test.vapi.gbp_route_domain_del(self.rd_id) def object_id(self): return "gbp-ro
SRv6 Routing
============
.. toctree::
srv6-3n-skx-xxv710
srv6-3n-hsw-xl710
srv6-3n-tsh-x520
srv6-3n-dnv-x553