from ipaddress import IPv4Network from vpp_object import VppObject from vpp_papi import VppEnum from vpp_ip import INVALID_INDEX from vpp_papi_provider import UnexpectedApiReturnValueError class VppAclPlugin(VppObject): def __init__(self, test, enable_intf_counters=False): self._test = test self.enable_intf_counters = enable_intf_counters @property def enable_intf_counters(self): return self._enable_intf_counters @enable_intf_counters.setter def enable_intf_counters(self, enable): self.vapi.acl_stats_intf_counters_enable(enable=enable) def add_vpp_config(self): pass def remove_vpp_config(self): pass def query_vpp_config(self): pass def object_id(self): return "acl-plugin-%d" % (self._sw_if_index) class AclRule: """ACL Rule""" # port ranges PORTS_ALL = -1 PORTS_RANGE = 0 PORTS_RANGE_2 = 1 udp_sport_from = 10 udp_sport_to = udp_sport_from + 5 udp_dport_from = 20000 udp_dport_to = udp_dport_from + 5000 tcp_sport_from = 30 tcp_sport_to = tcp_sport_from + 5 tcp_dport_from = 40000 tcp_dport_to = tcp_dport_from + 5000 udp_sport_from_2 = 90 udp_sport_to_2 = udp_sport_from_2 + 5 udp_dport_from_2 = 30000 udp_dport_to_2 = udp_dport_from_2 + 5000 tcp_sport_from_2 = 130 tcp_sport_to_2 = tcp_sport_from_2 + 5 tcp_dport_from_2 = 20000 tcp_dport_to_2 = tcp_dport_from_2 + 5000 icmp4_type = 8 # echo request icmp4_code = 3 icmp6_type = 128 # echo request icmp6_code = 3 icmp4_type_2 = 8 icmp4_code_from_2 = 5 icmp4_code_to_2 = 20 icmp6_type_2 = 128 icmp6_code_from_2 = 8 icmp6_code_to_2 = 42 def __init__( self, is_permit, src_prefix=IPv4Network("0.0.0.0/0"), dst_prefix=IPv4Network("0.0.0.0/0"), proto=0, ports=PORTS_ALL, sport_from=None, sport_to=None, dport_from=None, dport_to=None, ): self.is_permit = is_permit self.src_prefix = src_prefix self.dst_prefix = dst_prefix self._proto = proto self._ports = ports # assign ports by range self.update_ports() # assign specified ports if sport_from: self.sport_from = sport_from if sport_to: self.sport_to = sport_to if dport_from: self.dport_from = dport_from if dport_to: self.dport_to = dport_to def __copy__(self): new_rule = AclRule( self.is_permit, self.src_prefix, self.dst_prefix, self._proto, self._ports, self.sport_from, self.sport_to, self.dport_from, self.dport_to, ) return new_rule def update_ports(self): if self._ports == self.PORTS_ALL: self.sport_from = 0 self.dport_from = 0 self.sport_to = 65535 if self._proto == 1 or self._proto == 58: self.sport_to = 255 self.dport_to = self.sport_to elif self._ports == self.PORTS_RANGE: if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP: self.sport_from = self.icmp4_type self.sport_to = self.icmp4_type self.dport_from = self.icmp4_code self.dport_to = self.icmp4_code elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6: self.sport_from = self.icmp6_type self.sport_to = self.icmp6_type self.dport_from = self.icmp6_code self.dport_to = self.icmp6_code elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP: self.sport_from = self.tcp_sport_from self.sport_to = self.tcp_sport_to self.dport_from = self.tcp_dport_from self.dport_to = self.tcp_dport_to elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP: self.sport_from = self.udp_sport_from self.sport_to = self.udp_sport_to self.dport_from = self.udp_dport_from self.dport_to = self.udp_dport_to elif self._ports == self.PORTS_RANGE_2: if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP: self.sport_from = self.icmp4_type_2 self.sport_to = self.icmp4_type_2 self.dport_from = self.icmp4_code_from_2 self.dport_to = self.icmp4_code_to_2 elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6: self.sport_from = self.icmp6_type_2 self.sport_to = self.icmp6_type_2 self.dport_from = self.icmp6_code_from_2 self.dport_to = self.icmp6_code_to_2 elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP: self.sport_from = self.tcp_sport_from_2 self.sport_to = self.tcp_sport_to_2 self.dport_from = self.tcp_dport_from_2 self.dport_to = self.tcp_dport_to_2 elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP: self.sport_from = self.udp_sport_from_2 self.sport_to = self.udp_sport_to_2 self.dport_from = self.udp_dport_from_2 self.dport_to = self.udp_dport_to_2 else: self.sport_from = self._ports self.sport_to = self._ports self.dport_from = self._ports self.dport_to = self._ports @property def proto(self): return self._proto @proto.setter def proto(self, proto): self._proto = proto self.update_ports() @property def ports(self): return self._ports @ports.setter def ports(self, ports): self._ports = ports self.update_ports() def encode(self): return { "is_permit": self.is_permit, "proto": self.proto, "srcport_or_icmptype_first": self.sport_from, "srcport_or_icmptype_last": self.sport_to, "src_prefix": self.src_prefix, "dstport_or_icmpcode_first": self.dport_from, "dstport_or_icmpcode_last": self.dport_to, "dst_prefix": self.dst_prefix, } class VppAcl(VppObject): """VPP ACL""" def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None): self._test = test self._acl_index = acl_index self.tag = tag self._rules = rules @property def rules(self): return self._rules @property def acl_index(self): return self._acl_index @property def count(self): return len(self._rules) def encode_rules(self): rules = [] for rule in self._rules: rules.append(rule.encode()) return rules def add_vpp_config(self, expect_error=False): try: reply = self._test.vapi.acl_add_replace( acl_index=self._acl_index, tag=self.tag, count=self.count, r=self.encode_rules(), ) self._acl_index = reply.acl_index self._test.registry.register(self, self._test.logger) if expect_error: self._test.fail("Unexpected api reply") return self except UnexpectedApiReturnValueError: if not expect_error: self._test.fail("Unexpected api reply") return None def modify_vpp_config(self, rules): self._rules = rules self.add_vpp_config() def remove_vpp_config(self, expect_error=False): try: self._test.vapi.acl_del(acl_index=self._acl_index) if expect_error: self._test.fail("Unexpected api reply") except UnexpectedApiReturnValueError: if not expect_error: self._test.fail("Unexpected api reply") def dump(self): return self._test.vapi.acl_dump(acl_index=self._acl_index) def query_vpp_config(self): dump = self.dump() for rule in dump: if rule.acl_index == self._acl_index: return True return False def object_id(self): return "acl-%s-%d" % (self.tag, self._acl_index) class VppEtypeWhitelist(VppObject): """VPP Etype Whitelist""" def __init__(self, test, sw_if_index, whitelist, n_input=0): self._test = test self.whitelist = whitelist self.n_input = n_input self._sw_if_index = sw_if_index @property def sw_if_index(self): return self._sw_if_index @property def count(self): return len(self.whitelist) def add_vpp_config(self): self._test.vapi.acl_interface_set_etype_whitelist( sw_if_index=self._sw_if_index, count=self.count, n_input=self.n_input, whitelist=self.whitelist, ) self._test.registry.register(self, self._test.logger) return self def remove_vpp_config(self): self._test.vapi.acl_interface_set_etype_whitelist( sw_if_index=self._sw_if_index, count=0, n_input=0, whitelist=[] ) def query_vpp_config(self): self._test.vapi.acl_interface_etype_whitelist_dump( sw_if_index=self._sw_if_index ) return False def object_id(self): return "acl-etype_wl-%d" % (self._sw_if_index) class VppAclInterface(VppObject): """VPP ACL Interface""" def __init__(self, test, sw_if_index, acls, n_input=0): self._test = test self._sw_if_index = sw_if_index self.n_input = n_input self.acls = acls @property def sw_if_index(self): return self._sw_if_index @property def count(self): return len(self.acls) def encode_acls(self): acls = [] for acl in self.acls: acls.append(acl.acl_index) return acls def add_vpp_config(self, expect_error=False): try: reply = self._test.vapi.acl_interface_set_acl_list( sw_if_index=self._sw_if_index, n_input=self.n_input, count=self.count, acls=self.encode_acls(), ) self._test.registry.register(self, self._test.logger) if expect_error: self._test.fail("Unexpected api reply") return self except UnexpectedApiReturnValueError: if not expect_error: self._test.fail("Unexpected api reply") return None def remove_vpp_config(self, expect_error=False): try: reply = self._test.vapi.acl_interface_set_acl_list( sw_if_index=self._sw_if_index,<style>.highlight .hll { background-color: #ffffcc } .highlight .c { color: #888888 } /* Comment */ .highlight .err { color: #a61717; background-color: #e3d2d2 } /* Error */ .highlight .k { color: #008800; font-weight: bold } /* Keyword */ .highlight .ch { color: #888888 } /* Comment.Hashbang */ .highlight .cm { color: #888888 } /* Comment.Multiline */ .highlight .cp { color: #cc0000; font-weight: bold } /* Comment.Preproc */ .highlight .cpf { color: #888888 } /* Comment.PreprocFile */ .highlight .c1 { color: #888888 } /* Comment.Single */ .highlight .cs { color: #cc0000; font-weight: bold; background-color: #fff0f0 } /* Comment.Special */ .highlight .gd { color: #000000; background-color: #ffdddd } /* Generic.Deleted */ .highlight .ge { font-style: italic } /* Generic.Emph */ .highlight .gr { color: #aa0000 } /* Generic.Error */ .highlight .gh { color: #333333 } /* Generic.Heading */ .highlight .gi { color: #000000; background-color: #ddffdd } /* Generic.Inserted */ .highlight .go { color: #888888 } /* Generic.Output */ .highlight .gp { color: #555555 } /* Generic.Prompt */ .highlight .gs { font-weight: bold } /* Generic.Strong */ .highlight .gu { color: #666666 } /* Generic.Subheading */ .highlight .gt { color: #aa0000 } /* Generic.Traceback */ .highlight .kc { color: #008800; font-weight: bold } /* Keyword.Constant */ .highlight .kd { color: #008800; font-weight: bold } /* Keyword.Declaration */ .highlight .kn { color: #008800; font-weight: bold } /* Keyword.Namespace */ .highlight .kp { color: #008800 } /* Keyword.Pseudo */ .highlight .kr { color: #008800; font-weight: bold } /* Keyword.Reserved */ .highlight .kt { color: #888888; font-weight: bold } /* Keyword.Type */ .highlight .m { color: #0000DD; font-weight: bold } /* Literal.Number */ .highlight .s { color: #dd2200; background-color: #fff0f0 } /* Literal.String */ .highlight .na { color: #336699 } /* Name.Attribute */ .highlight .nb { color: #003388 } /* Name.Builtin */ .highlight .nc { color: #bb0066; font-weight: bold } /* Name.Class */ .highlight .no { color: #003366; font-weight: bold } /* Name.Constant */ .highlight .nd { color: #555555 } /* Name.Decorator */ .highlight .ne { color: #bb0066; font-weight: bold } /* Name.Exception */ .highlight .nf { color: #0066bb; font-weight: bold } /* Name.Function */ .highlight .nl { color: #336699; font-style: italic } /* Name.Label */ .highlight .nn { color: #bb0066; font-weight: bold } /* Name.Namespace */ .highlight .py { color: #336699; font-weight: bold } /* Name.Property */ .highlight .nt { color: #bb0066; font-weight: bold } /* Name.Tag */ .highlight .nv { color: #336699 } /* Name.Variable */ .highlight .ow { color: #008800 } /* Operator.Word */ .highlight .w { color: #bbbbbb } /* Text.Whitespace */ .highlight .mb { color: #0000DD; font-weight: bold } /* Literal.Number.Bin */ .highlight .mf { color: #0000DD; font-weight: bold } /* Literal.Number.Float */ .highlight .mh { color: #0000DD; font-weight: bold } /* Literal.Number.Hex */ .highlight .mi { color: #0000DD; font-weight: bold } /* Literal.Number.Integer */ .highlight .mo { color: #0000DD; font-weight: bold } /* Literal.Number.Oct */ .highlight .sa { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Affix */ .highlight .sb { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Backtick */ .highlight .sc { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Char */ .highlight .dl { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Delimiter */ .highlight .sd { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Doc */ .highlight .s2 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Double */ .highlight .se { color: #0044dd; background-color: #fff0f0 } /* Literal.String.Escape */ .highlight .sh { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Heredoc */ .highlight .si { color: #3333bb; background-color: #fff0f0 } /* Literal.String.Interpol */ .highlight .sx { color: #22bb22; background-color: #f0fff0 } /* Literal.String.Other */ .highlight .sr { color: #008800; background-color: #fff0ff } /* Literal.String.Regex */ .highlight .s1 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Single */ .highlight .ss { color: #aa6600; background-color: #fff0f0 } /* Literal.String.Symbol */ .highlight .bp { color: #003388 } /* Name.Builtin.Pseudo */ .highlight .fm { color: #0066bb; font-weight: bold } /* Name.Function.Magic */ .highlight .vc { color: