summaryrefslogtreecommitdiffstats
path: root/src/plugins/acl/test/vpp_acl.py
diff options
context:
space:
mode:
authorJakub Grajciar <jgrajcia@cisco.com>2020-03-11 12:47:32 +0100
committerOle Trøan <otroan@employees.org>2020-03-26 17:45:58 +0000
commitaad1ee149403994194cf37cef4530b042ba7df3a (patch)
treea3a3cfabc2bbbc94ecaed842d15fc94c9136be9e /src/plugins/acl/test/vpp_acl.py
parent4897d77c6d4d5d04eb7e02bda57dc6c7005a609f (diff)
acl: API cleanup
Use consistent API types. Type: fix Signed-off-by: Jakub Grajciar <jgrajcia@cisco.com> Change-Id: If90d753f129312400c4c3669bb86289d0c3e0d99 Signed-off-by: Jakub Grajciar <jgrajcia@cisco.com>
Diffstat (limited to 'src/plugins/acl/test/vpp_acl.py')
-rw-r--r--src/plugins/acl/test/vpp_acl.py460
1 files changed, 460 insertions, 0 deletions
diff --git a/src/plugins/acl/test/vpp_acl.py b/src/plugins/acl/test/vpp_acl.py
new file mode 100644
index 00000000000..d4ed1674bf9
--- /dev/null
+++ b/src/plugins/acl/test/vpp_acl.py
@@ -0,0 +1,460 @@
+from ipaddress import IPv4Network
+
+from vpp_object import VppObject
+from vpp_papi import VppEnum
+from vpp_ip import INVALID_INDEX
+from vpp_papi_provider import UnexpectedApiReturnValueError
+
+
+class VppAclPlugin(VppObject):
+
+ def __init__(self, test, enable_intf_counters=False):
+ self._test = test
+ self.enable_intf_counters = enable_intf_counters
+
+ @property
+ def enable_intf_counters(self):
+ return self._enable_intf_counters
+
+ @enable_intf_counters.setter
+ def enable_intf_counters(self, enable):
+ self.vapi.acl_stats_intf_counters_enable(enable=enable)
+
+ def add_vpp_config(self):
+ pass
+
+ def remove_vpp_config(self):
+ pass
+
+ def query_vpp_config(self):
+ pass
+
+ def object_id(self):
+ return ("acl-plugin-%d" % (self._sw_if_index))
+
+
+class AclRule():
+ """ ACL Rule """
+
+ # port ranges
+ PORTS_ALL = -1
+ PORTS_RANGE = 0
+ PORTS_RANGE_2 = 1
+ udp_sport_from = 10
+ udp_sport_to = udp_sport_from + 5
+ udp_dport_from = 20000
+ udp_dport_to = udp_dport_from + 5000
+ tcp_sport_from = 30
+ tcp_sport_to = tcp_sport_from + 5
+ tcp_dport_from = 40000
+ tcp_dport_to = tcp_dport_from + 5000
+
+ udp_sport_from_2 = 90
+ udp_sport_to_2 = udp_sport_from_2 + 5
+ udp_dport_from_2 = 30000
+ udp_dport_to_2 = udp_dport_from_2 + 5000
+ tcp_sport_from_2 = 130
+ tcp_sport_to_2 = tcp_sport_from_2 + 5
+ tcp_dport_from_2 = 20000
+ tcp_dport_to_2 = tcp_dport_from_2 + 5000
+
+ icmp4_type = 8 # echo request
+ icmp4_code = 3
+ icmp6_type = 128 # echo request
+ icmp6_code = 3
+
+ icmp4_type_2 = 8
+ icmp4_code_from_2 = 5
+ icmp4_code_to_2 = 20
+ icmp6_type_2 = 128
+ icmp6_code_from_2 = 8
+ icmp6_code_to_2 = 42
+
+ def __init__(self, is_permit, src_prefix=IPv4Network('0.0.0.0/0'),
+ dst_prefix=IPv4Network('0.0.0.0/0'),
+ proto=0, ports=PORTS_ALL):
+ self.is_permit = is_permit
+ self.src_prefix = src_prefix
+ self.dst_prefix = dst_prefix
+ self._proto = proto
+ self._ports = ports
+ self.sport_from = 0
+ self.sport_to = 0
+ self.dport_from = 0
+ self.dport_to = 0
+ self.update_ports()
+
+ def __copy__(self):
+ """
+ ports are assigned implicitly based on _proto and _ports values,
+ so we need to set them manually in case they were user defined
+ """
+ new_rule = AclRule(self.is_permit, self.src_prefix, self.dst_prefix,
+ self._proto, self._ports)
+ new_rule.sport_from = self.sport_from
+ new_rule.sport_to = self.sport_to
+ new_rule.dport_from = self.dport_from
+ new_rule.dport_to = self.dport_to
+ return new_rule
+
+ def update_ports(self):
+ if self._ports == self.PORTS_ALL:
+ self.sport_from = 0
+ self.dport_from = 0
+ self.sport_to = 65535
+ if self._proto == 1 or self._proto == 58:
+ self.sport_to = 255
+ self.dport_to = self.sport_to
+ elif self._ports == self.PORTS_RANGE:
+ if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
+ self.sport_from = self.icmp4_type
+ self.sport_to = self.icmp4_type
+ self.dport_from = self.icmp4_code
+ self.dport_to = self.icmp4_code
+ elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
+ self.sport_from = self.icmp6_type
+ self.sport_to = self.icmp6_type
+ self.dport_from = self.icmp6_code
+ self.dport_to = self.icmp6_code
+ elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
+ self.sport_from = self.tcp_sport_from
+ self.sport_to = self.tcp_sport_to
+ self.dport_from = self.tcp_dport_from
+ self.dport_to = self.tcp_dport_to
+ elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
+ self.sport_from = self.udp_sport_from
+ self.sport_to = self.udp_sport_to
+ self.dport_from = self.udp_dport_from
+ self.dport_to = self.udp_dport_to
+ elif self._ports == self.PORTS_RANGE_2:
+ if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
+ self.sport_from = self.icmp4_type_2
+ self.sport_to = self.icmp4_type_2
+ self.dport_from = self.icmp4_code_from_2
+ self.dport_to = self.icmp4_code_to_2
+ elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
+ self.sport_from = self.icmp6_type_2
+ self.sport_to = self.icmp6_type_2
+ self.dport_from = self.icmp6_code_from_2
+ self.dport_to = self.icmp6_code_to_2
+ elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
+ self.sport_from = self.tcp_sport_from_2
+ self.sport_to = self.tcp_sport_to_2
+ self.dport_from = self.tcp_dport_from_2
+ self.dport_to = self.tcp_dport_to_2
+ elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
+ self.sport_from = self.udp_sport_from_2
+ self.sport_to = self.udp_sport_to_2
+ self.dport_from = self.udp_dport_from_2
+ self.dport_to = self.udp_dport_to_2
+ else:
+ self.sport_from = self._ports
+ self.sport_to = self._ports
+ self.dport_from = self._ports
+ self.dport_to = self._ports
+
+ @property
+ def proto(self):
+ return self._proto
+
+ @proto.setter
+ def proto(self, proto):
+ self._proto = proto
+ self.update_ports()
+
+ @property
+ def ports(self):
+ return self._ports
+
+ @ports.setter
+ def ports(self, ports):
+ self._ports = ports
+ self.update_ports()
+
+ def encode(self):
+ return {'is_permit': self.is_permit, 'proto': self.proto,
+ 'srcport_or_icmptype_first': self.sport_from,
+ 'srcport_or_icmptype_last': self.sport_to,
+ 'src_prefix': self.src_prefix,
+ 'dstport_or_icmpcode_first': self.dport_from,
+ 'dstport_or_icmpcode_last': self.dport_to,
+ 'dst_prefix': self.dst_prefix}
+
+
+class VppAcl(VppObject):
+ """ VPP ACL """
+
+ def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
+ self._test = test
+ self._acl_index = acl_index
+ self.tag = tag
+ self.rules = rules
+
+ @property
+ def acl_index(self):
+ return self._acl_index
+
+ @property
+ def count(self):
+ return len(self.rules)
+
+ def encode_rules(self):
+ rules = []
+ for rule in self.rules:
+ rules.append(rule.encode())
+ return rules
+
+ def add_vpp_config(self, expect_error=False):
+ try:
+ reply = self._test.vapi.acl_add_replace(
+ acl_index=self._acl_index, tag=self.tag, count=self.count,
+ r=self.encode_rules())
+ self._acl_index = reply.acl_index
+ self._test.registry.register(self, self._test.logger)
+ if expect_error:
+ self._test.fail("Unexpected api reply")
+ return self
+ except UnexpectedApiReturnValueError:
+ if not expect_error:
+ self._test.fail("Unexpected api reply")
+ return None
+
+ def remove_vpp_config(self, expect_error=False):
+ try:
+ self._test.vapi.acl_del(acl_index=self._acl_index)
+ if expect_error:
+ self._test.fail("Unexpected api reply")
+ except UnexpectedApiReturnValueError:
+ if not expect_error:
+ self._test.fail("Unexpected api reply")
+
+ def dump(self):
+ return self._test.vapi.acl_dump(acl_index=self._acl_index)
+
+ def query_vpp_config(self):
+ dump = self.dump()
+ for rule in dump:
+ if rule.acl_index == self._acl_index:
+ return True
+ return False
+
+ def object_id(self):
+ return ("acl-%s-%d" % (self.tag, self._acl_index))
+
+
+class VppEtypeWhitelist(VppObject):
+ """ VPP Etype Whitelist """
+
+ def __init__(self, test, sw_if_index, whitelist, n_input=0):
+ self._test = test
+ self.whitelist = whitelist
+ self.n_input = n_input
+ self._sw_if_index = sw_if_index
+
+ @property
+ def sw_if_index(self):
+ return self._sw_if_index
+
+ @property
+ def count(self):
+ return len(self.whitelist)
+
+ def add_vpp_config(self):
+ self._test.vapi.acl_interface_set_etype_whitelist(
+ sw_if_index=self._sw_if_index, count=self.count,
+ n_input=self.n_input, whitelist=self.whitelist)
+ self._test.registry.register(self, self._test.logger)
+ return self
+
+ def remove_vpp_config(self):
+ self._test.vapi.acl_interface_set_etype_whitelist(
+ sw_if_index=self._sw_if_index, count=0, n_input=0, whitelist=[])
+
+ def query_vpp_config(self):
+ self._test.vapi.acl_interface_etype_whitelist_dump(
+ sw_if_index=self._sw_if_index)
+ return False
+
+ def object_id(self):
+ return ("acl-etype_wl-%d" % (self._sw_if_index))
+
+
+class VppAclInterface(VppObject):
+ """ VPP ACL Interface """
+
+ def __init__(self, test, sw_if_index, acls, n_input=0):
+ self._test = test
+ self._sw_if_index = sw_if_index
+ self.n_input = n_input
+ self.acls = acls
+
+ @property
+ def sw_if_index(self):
+ return self._sw_if_index
+
+ @property
+ def count(self):
+ return len(self.acls)
+
+ def encode_acls(self):
+ acls = []
+ for acl in self.acls:
+ acls.append(acl.acl_index)
+ return acls
+
+ def add_vpp_config(self, expect_error=False):
+ try:
+ reply = self._test.vapi.acl_interface_set_acl_list(
+ sw_if_index=self._sw_if_index, n_input=self.n_input,
+ count=self.count, acls=self.encode_acls())
+ self._test.registry.register(self, self._test.logger)
+ if expect_error:
+ self._test.fail("Unexpected api reply")
+ return self
+ except UnexpectedApiReturnValueError:
+ if not expect_error:
+ self._test.fail("Unexpected api reply")
+ return None
+
+ def remove_vpp_config(self, expect_error=False):
+ try:
+ reply = self._test.vapi.acl_interface_set_acl_list(
+ sw_if_index=self._sw_if_index, n_input=0, count=0, acls=[])
+ if expect_error:
+ self._test.fail("Unexpected api reply")
+ except UnexpectedApiReturnValueError:
+ if not expect_error:
+ self._test.fail("Unexpected api reply")
+
+ def query_vpp_config(self):
+ dump = self._test.vapi.acl_interface_list_dump(
+ sw_if_index=self._sw_if_index)
+ for acl_list in dump:
+ if acl_list.count > 0:
+ return True
+ return False
+
+ def object_id(self):
+ return ("acl-if-list-%d" % (self._sw_if_index))
+
+
+class MacipRule():
+ """ Mac Ip rule """
+
+ def __init__(self, is_permit, src_mac=0, src_mac_mask=0,
+ src_prefix=IPv4Network('0.0.0.0/0')):
+ self.is_permit = is_permit
+ self.src_mac = src_mac
+ self.src_mac_mask = src_mac_mask
+ self.src_prefix = src_prefix
+
+ def encode(self):
+ return {'is_permit': self.is_permit, 'src_mac': self.src_mac,
+ 'src_mac_mask': self.src_mac_mask,
+ 'src_prefix': self.src_prefix}
+
+
+class VppMacipAcl(VppObject):
+ """ Vpp Mac Ip ACL """
+
+ def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
+ self._test = test
+ self._acl_index = acl_index
+ self.tag = tag
+ self.rules = rules
+
+ @property
+ def acl_index(self):
+ return self._acl_index
+
+ @property
+ def count(self):
+ return len(self.rules)
+
+ def encode_rules(self):
+ rules = []
+ for rule in self.rules:
+ rules.append(rule.encode())
+ return rules
+
+ def add_vpp_config(self, expect_error=False):
+ try:
+ reply = self._test.vapi.macip_acl_add_replace(
+ acl_index=self._acl_index, tag=self.tag, count=self.count,
+ r=self.encode_rules())
+ self._acl_index = reply.acl_index
+ self._test.registry.register(self, self._test.logger)
+ if expect_error:
+ self._test.fail("Unexpected api reply")
+ return self
+ except UnexpectedApiReturnValueError:
+ if not expect_error:
+ self._test.fail("Unexpected api reply")
+ return None
+
+ def remove_vpp_config(self, expect_error=False):
+ try:
+ self._test.vapi.macip_acl_del(acl_index=self._acl_index)
+ if expect_error:
+ self._test.fail("Unexpected api reply")
+ except UnexpectedApiReturnValueError:
+ if not expect_error:
+ self._test.fail("Unexpected api reply")
+
+ def dump(self):
+ return self._test.vapi.macip_acl_dump(acl_index=self._acl_index)
+
+ def query_vpp_config(self):
+ dump = self.dump()
+ for rule in dump:
+ if rule.acl_index == self._acl_index:
+ return True
+ return False
+
+ def object_id(self):
+ return ("macip-acl-%s-%d" % (self.tag, self._acl_index))
+
+
+class VppMacipAclInterface(VppObject):
+ """ VPP Mac Ip ACL Interface """
+
+ def __init__(self, test, sw_if_index, acls):
+ self._test = test
+ self._sw_if_index = sw_if_index
+ self.acls = acls
+
+ @property
+ def sw_if_index(self):
+ return self._sw_if_index
+
+ @property
+ def count(self):
+ return len(self.acls)
+
+ def add_vpp_config(self):
+ for acl in self.acls:
+ self._test.vapi.macip_acl_interface_add_del(
+ is_add=True, sw_if_index=self._sw_if_index,
+ acl_index=acl.acl_index)
+ self._test.registry.register(self, self._test.logger)
+
+ def remove_vpp_config(self):
+ for acl in self.acls:
+ self._test.vapi.macip_acl_interface_add_del(
+ is_add=False, sw_if_index=self._sw_if_index,
+ acl_index=acl.acl_index)
+
+ def dump(self):
+ return self._test.vapi.macip_acl_interface_list_dump(
+ sw_if_index=self._sw_if_index)
+
+ def query_vpp_config(self):
+ dump = self.dump()
+ for acl_list in dump:
+ for acl_index in acl_list.acls:
+ if acl_index != INVALID_INDEX:
+ return True
+ return False
+
+ def object_id(self):
+ return ("macip-acl-if-list-%d" % (self._sw_if_index))