summaryrefslogtreecommitdiffstats
path: root/test/vpp_acl.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/vpp_acl.py')
-rw-r--r--test/vpp_acl.py143
1 files changed, 90 insertions, 53 deletions
diff --git a/test/vpp_acl.py b/test/vpp_acl.py
index 2d2f7ca257b..958d6973f26 100644
--- a/test/vpp_acl.py
+++ b/test/vpp_acl.py
@@ -7,7 +7,6 @@ from vpp_papi_provider import UnexpectedApiReturnValueError
class VppAclPlugin(VppObject):
-
def __init__(self, test, enable_intf_counters=False):
self._test = test
self.enable_intf_counters = enable_intf_counters
@@ -30,11 +29,11 @@ class VppAclPlugin(VppObject):
pass
def object_id(self):
- return ("acl-plugin-%d" % (self._sw_if_index))
+ return "acl-plugin-%d" % (self._sw_if_index)
-class AclRule():
- """ ACL Rule """
+class AclRule:
+ """ACL Rule"""
# port ranges
PORTS_ALL = -1
@@ -70,10 +69,18 @@ class AclRule():
icmp6_code_from_2 = 8
icmp6_code_to_2 = 42
- def __init__(self, is_permit, src_prefix=IPv4Network('0.0.0.0/0'),
- dst_prefix=IPv4Network('0.0.0.0/0'),
- proto=0, ports=PORTS_ALL, sport_from=None, sport_to=None,
- dport_from=None, dport_to=None):
+ def __init__(
+ self,
+ is_permit,
+ src_prefix=IPv4Network("0.0.0.0/0"),
+ dst_prefix=IPv4Network("0.0.0.0/0"),
+ proto=0,
+ ports=PORTS_ALL,
+ sport_from=None,
+ sport_to=None,
+ dport_from=None,
+ dport_to=None,
+ ):
self.is_permit = is_permit
self.src_prefix = src_prefix
self.dst_prefix = dst_prefix
@@ -92,9 +99,17 @@ class AclRule():
self.dport_to = dport_to
def __copy__(self):
- new_rule = AclRule(self.is_permit, self.src_prefix, self.dst_prefix,
- self._proto, self._ports, self.sport_from,
- self.sport_to, self.dport_from, self.dport_to)
+ new_rule = AclRule(
+ self.is_permit,
+ self.src_prefix,
+ self.dst_prefix,
+ self._proto,
+ self._ports,
+ self.sport_from,
+ self.sport_to,
+ self.dport_from,
+ self.dport_to,
+ )
return new_rule
def update_ports(self):
@@ -172,17 +187,20 @@ class AclRule():
self.update_ports()
def encode(self):
- return {'is_permit': self.is_permit, 'proto': self.proto,
- 'srcport_or_icmptype_first': self.sport_from,
- 'srcport_or_icmptype_last': self.sport_to,
- 'src_prefix': self.src_prefix,
- 'dstport_or_icmpcode_first': self.dport_from,
- 'dstport_or_icmpcode_last': self.dport_to,
- 'dst_prefix': self.dst_prefix}
+ return {
+ "is_permit": self.is_permit,
+ "proto": self.proto,
+ "srcport_or_icmptype_first": self.sport_from,
+ "srcport_or_icmptype_last": self.sport_to,
+ "src_prefix": self.src_prefix,
+ "dstport_or_icmpcode_first": self.dport_from,
+ "dstport_or_icmpcode_last": self.dport_to,
+ "dst_prefix": self.dst_prefix,
+ }
class VppAcl(VppObject):
- """ VPP ACL """
+ """VPP ACL"""
def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
self._test = test
@@ -211,8 +229,11 @@ class VppAcl(VppObject):
def add_vpp_config(self, expect_error=False):
try:
reply = self._test.vapi.acl_add_replace(
- acl_index=self._acl_index, tag=self.tag, count=self.count,
- r=self.encode_rules())
+ acl_index=self._acl_index,
+ tag=self.tag,
+ count=self.count,
+ r=self.encode_rules(),
+ )
self._acl_index = reply.acl_index
self._test.registry.register(self, self._test.logger)
if expect_error:
@@ -247,11 +268,11 @@ class VppAcl(VppObject):
return False
def object_id(self):
- return ("acl-%s-%d" % (self.tag, self._acl_index))
+ return "acl-%s-%d" % (self.tag, self._acl_index)
class VppEtypeWhitelist(VppObject):
- """ VPP Etype Whitelist """
+ """VPP Etype Whitelist"""
def __init__(self, test, sw_if_index, whitelist, n_input=0):
self._test = test
@@ -269,26 +290,31 @@ class VppEtypeWhitelist(VppObject):
def add_vpp_config(self):
self._test.vapi.acl_interface_set_etype_whitelist(
- sw_if_index=self._sw_if_index, count=self.count,
- n_input=self.n_input, whitelist=self.whitelist)
+ sw_if_index=self._sw_if_index,
+ count=self.count,
+ n_input=self.n_input,
+ whitelist=self.whitelist,
+ )
self._test.registry.register(self, self._test.logger)
return self
def remove_vpp_config(self):
self._test.vapi.acl_interface_set_etype_whitelist(
- sw_if_index=self._sw_if_index, count=0, n_input=0, whitelist=[])
+ sw_if_index=self._sw_if_index, count=0, n_input=0, whitelist=[]
+ )
def query_vpp_config(self):
self._test.vapi.acl_interface_etype_whitelist_dump(
- sw_if_index=self._sw_if_index)
+ sw_if_index=self._sw_if_index
+ )
return False
def object_id(self):
- return ("acl-etype_wl-%d" % (self._sw_if_index))
+ return "acl-etype_wl-%d" % (self._sw_if_index)
class VppAclInterface(VppObject):
- """ VPP ACL Interface """
+ """VPP ACL Interface"""
def __init__(self, test, sw_if_index, acls, n_input=0):
self._test = test
@@ -313,8 +339,11 @@ class VppAclInterface(VppObject):
def add_vpp_config(self, expect_error=False):
try:
reply = self._test.vapi.acl_interface_set_acl_list(
- sw_if_index=self._sw_if_index, n_input=self.n_input,
- count=self.count, acls=self.encode_acls())
+ sw_if_index=self._sw_if_index,
+ n_input=self.n_input,
+ count=self.count,
+ acls=self.encode_acls(),
+ )
self._test.registry.register(self, self._test.logger)
if expect_error:
self._test.fail("Unexpected api reply")
@@ -327,7 +356,8 @@ class VppAclInterface(VppObject):
def remove_vpp_config(self, expect_error=False):
try:
reply = self._test.vapi.acl_interface_set_acl_list(
- sw_if_index=self._sw_if_index, n_input=0, count=0, acls=[])
+ sw_if_index=self._sw_if_index, n_input=0, count=0, acls=[]
+ )
if expect_error:
self._test.fail("Unexpected api reply")
except UnexpectedApiReturnValueError:
@@ -335,35 +365,38 @@ class VppAclInterface(VppObject):
self._test.fail("Unexpected api reply")
def query_vpp_config(self):
- dump = self._test.vapi.acl_interface_list_dump(
- sw_if_index=self._sw_if_index)
+ dump = self._test.vapi.acl_interface_list_dump(sw_if_index=self._sw_if_index)
for acl_list in dump:
if acl_list.count > 0:
return True
return False
def object_id(self):
- return ("acl-if-list-%d" % (self._sw_if_index))
+ return "acl-if-list-%d" % (self._sw_if_index)
-class MacipRule():
- """ Mac Ip rule """
+class MacipRule:
+ """Mac Ip rule"""
- def __init__(self, is_permit, src_mac=0, src_mac_mask=0,
- src_prefix=IPv4Network('0.0.0.0/0')):
+ def __init__(
+ self, is_permit, src_mac=0, src_mac_mask=0, src_prefix=IPv4Network("0.0.0.0/0")
+ ):
self.is_permit = is_permit
self.src_mac = src_mac
self.src_mac_mask = src_mac_mask
self.src_prefix = src_prefix
def encode(self):
- return {'is_permit': self.is_permit, 'src_mac': self.src_mac,
- 'src_mac_mask': self.src_mac_mask,
- 'src_prefix': self.src_prefix}
+ return {
+ "is_permit": self.is_permit,
+ "src_mac": self.src_mac,
+ "src_mac_mask": self.src_mac_mask,
+ "src_prefix": self.src_prefix,
+ }
class VppMacipAcl(VppObject):
- """ Vpp Mac Ip ACL """
+ """Vpp Mac Ip ACL"""
def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
self._test = test
@@ -392,8 +425,11 @@ class VppMacipAcl(VppObject):
def add_vpp_config(self, expect_error=False):
try:
reply = self._test.vapi.macip_acl_add_replace(
- acl_index=self._acl_index, tag=self.tag, count=self.count,
- r=self.encode_rules())
+ acl_index=self._acl_index,
+ tag=self.tag,
+ count=self.count,
+ r=self.encode_rules(),
+ )
self._acl_index = reply.acl_index
self._test.registry.register(self, self._test.logger)
if expect_error:
@@ -428,11 +464,11 @@ class VppMacipAcl(VppObject):
return False
def object_id(self):
- return ("macip-acl-%s-%d" % (self.tag, self._acl_index))
+ return "macip-acl-%s-%d" % (self.tag, self._acl_index)
class VppMacipAclInterface(VppObject):
- """ VPP Mac Ip ACL Interface """
+ """VPP Mac Ip ACL Interface"""
def __init__(self, test, sw_if_index, acls):
self._test = test
@@ -450,19 +486,20 @@ class VppMacipAclInterface(VppObject):
def add_vpp_config(self):
for acl in self.acls:
self._test.vapi.macip_acl_interface_add_del(
- is_add=True, sw_if_index=self._sw_if_index,
- acl_index=acl.acl_index)
+ is_add=True, sw_if_index=self._sw_if_index, acl_index=acl.acl_index
+ )
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
for acl in self.acls:
self._test.vapi.macip_acl_interface_add_del(
- is_add=False, sw_if_index=self._sw_if_index,
- acl_index=acl.acl_index)
+ is_add=False, sw_if_index=self._sw_if_index, acl_index=acl.acl_index
+ )
def dump(self):
return self._test.vapi.macip_acl_interface_list_dump(
- sw_if_index=self._sw_if_index)
+ sw_if_index=self._sw_if_index
+ )
def query_vpp_config(self):
dump = self.dump()
@@ -473,4 +510,4 @@ class VppMacipAclInterface(VppObject):
return False
def object_id(self):
- return ("macip-acl-if-list-%d" % (self._sw_if_index))
+ return "macip-acl-if-list-%d" % (self._sw_if_index)