""" IP Types """ import logging from ipaddress import ip_address from vpp_object import VppObject try: text_type = unicode except NameError: text_type = str _log = logging.getLogger(__name__) class DpoProto: DPO_PROTO_IP4 = 0 DPO_PROTO_IP6 = 1 DPO_PROTO_MPLS = 2 DPO_PROTO_ETHERNET = 3 DPO_PROTO_BIER = 4 DPO_PROTO_NSH = 5 INVALID_INDEX = 0xFFFFFFFF def get_dpo_proto(addr): if ip_address(addr).version == 6: return DpoProto.DPO_PROTO_IP6 else: return DpoProto.DPO_PROTO_IP4 class VppIpAddressUnion: def __init__(self, addr): self.addr = addr self.ip_addr = ip_address(text_type(self.addr)) def encode(self): if self.version == 6: return {"ip6": self.ip_addr} else: return {"ip4": self.ip_addr} @property def version(self): return self.ip_addr.version @property def address(self): return self.addr @property def length(self): return self.ip_addr.max_prefixlen @property def bytes(self): return self.ip_addr.packed def __eq__(self, other): if isinstance(other, self.__class__): return self.ip_addr == other.ip_addr elif hasattr(other, "ip4") and hasattr(other, "ip6"): # vl_api_address_union_t if 4 == self.version: return self.ip_addr == other.ip4 else: return self.ip_addr == other.ip6 else: raise Exception( "Comparing VppIpAddressUnions:%s with incomparable type: %s", self, other, ) def __ne__(self, other): return not (self == other) def __str__(self): return str(self.ip_addr) class VppIpMPrefix: def __init__(self, saddr, gaddr, glen): self.saddr = saddr self.gaddr = gaddr self.glen = glen if ip_address(self.saddr).version != ip_address(self.gaddr).version: raise ValueError( "Source and group addresses must be of the same address family." ) def encode(self): return { "af": ip_address(self.gaddr).vapi_af, "grp_address": {ip_address(self.gaddr).vapi_af_name: self.gaddr}, "src_address": {ip_address(self.saddr).vapi_af_name: self.saddr}, "grp_address_length": self.glen, } @property def length(self): return self.glen @property def version(self): return ip_address(self.gaddr).version def __str__(self): return "(%s,%s)/%d" % (self.saddr, self.gaddr, self.glen) def __eq__(self, other): if isinstance(other, self.__class__): return ( self.glen == other.glen and self.saddr == other.gaddr and self.saddr == other.saddr ) elif ( hasattr(other, "grp_address_length") and hasattr(other, "grp_address") and hasattr(other, "src_address") ): # vl_api_mprefix_t if 4 == self.version: return ( self.glen == other.grp_address_length and self.gaddr == str(other.grp_address.ip4) and self.saddr == str(other.src_address.ip4) ) else: return ( self.glen == other.grp_address_length and self.gaddr == str(other.grp_address.ip6) and self.saddr == str(other.src_address.ip6) ) return NotImplemented class VppIpPuntPolicer(VppObject): def __init__(self, test, policer_index, is_ip6=False): self._test = test self._policer_index = policer_index self._is_ip6 = is_ip6 def add_vpp_config(self): self._test.vapi.ip_punt_police( policer_index=self._policer_index, is_ip6=self._is_ip6, is_add=True ) def remove_vpp_config(self): self._test.vapi.ip_punt_police( policer_index=self._policer_index, is_ip6=self._is_ip6, is_add=False ) def query_vpp_config(self): NotImplemented class VppIpPuntRedirect(VppObject): def __init__(self, test, rx_index, tx_index, nh_addr): self._test = test self._rx_index = rx_index self._tx_index = tx_index self._nh_addr = ip_address(nh_addr) def encode(self): return { "rx_sw_if_index": self._rx_index, "tx_sw_if_index": self._tx_index, "nh": self._nh_addr, } def add_vpp_config(self): self._test.vapi.ip_punt_redirect(punt=self.encode(), is_add=True) self._test.registry.register(self, self._test.logger) return self def remove_vpp_config(self): self._test.vapi.ip_punt_redirect(punt=self.encode(), is_add=False) def get_vpp_config(self): is_ipv6 = True if self._nh_addr.version == 6 else False return self._test.vapi.ip_punt_redirect_dump( sw_if_index=self._rx_index, is_ipv6=is_ipv6 ) def query_vpp_config(self): if self.get_vpp_config(): return True return False class VppIpPathMtu(VppObject): def __init__(self, test, nh, pmtu, table_id=0): self._test = test self.nh = nh self.pmtu = pmtu self.table_id = table_id def add_vpp_config(self): self._test.vapi.ip_path_mtu_update( pmtu={"nh": self.nh, "table_id": self.table_id, "path_mtu": self.pmtu} ) self._test.registry.register(self, self._test.logger) return self def modify(self, pmtu): self.pmtu = pmtu self._test.vapi.ip_path_mtu_update( pmtu={"nh": self.nh, "table_id": self.table_id, "path_mtu": self.pmtu} ) return self def remove_vpp_config(self): self._test.vapi.ip_path_mtu_update( pmtu={"nh": self.nh, "table_id": self.table_id, "path_mtu": 0} ) def query_vpp_config(self): ds = list(self._test.vapi.vpp.details_iter(self._test.vapi.ip_path_mtu_get)) for d in ds: if ( self.nh == str(d.pmtu.nh) and self.table_id == d.pmtu.table_id and self.pmtu == d.pmtu.path_mtu ): return True return False def object_id(self): return "ip-path-mtu-%d-%s-%d" % (self.table_id, self.nh, self.pmtu) def __str__(self): return self.object_id()