summaryrefslogtreecommitdiffstats
path: root/src/plugins/acl/test
diff options
context:
space:
mode:
authorJakub Grajciar <jgrajcia@cisco.com>2020-03-27 06:55:06 +0100
committerOle Trøan <otroan@employees.org>2020-03-27 12:46:42 +0000
commit2f8cd914514fe54f91974c6d465d4769dfac8de8 (patch)
tree0800301d4e2ba8d1cf9bf695798b88684bee39f7 /src/plugins/acl/test
parent64d9da3ba3b07d23782ef1a947fb5a71b9f4de56 (diff)
acl: API cleanup
Use consistent API types. Type: fix Signed-off-by: Jakub Grajciar <jgrajcia@cisco.com> Change-Id: I09fa6c1b6917936351bd376b56c414ce24488095 Signed-off-by: Jakub Grajciar <jgrajcia@cisco.com>
Diffstat (limited to 'src/plugins/acl/test')
-rw-r--r--src/plugins/acl/test/test_acl_plugin.py363
-rw-r--r--src/plugins/acl/test/test_acl_plugin_conns.py86
-rw-r--r--src/plugins/acl/test/test_acl_plugin_l2l3.py213
-rw-r--r--src/plugins/acl/test/test_acl_plugin_macip.py191
-rw-r--r--src/plugins/acl/test/test_classify_l2_acl.py1
-rw-r--r--src/plugins/acl/test/vpp_acl.py476
6 files changed, 847 insertions, 483 deletions
diff --git a/src/plugins/acl/test/test_acl_plugin.py b/src/plugins/acl/test/test_acl_plugin.py
index f07d37548fe..d5e195fe44b 100644
--- a/src/plugins/acl/test/test_acl_plugin.py
+++ b/src/plugins/acl/test/test_acl_plugin.py
@@ -12,8 +12,11 @@ from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
from scapy.layers.inet6 import IPv6ExtHdrFragment
from framework import VppTestCase, VppTestRunner
from util import Host, ppp
+from ipaddress import IPv4Network, IPv6Network
from vpp_lo_interface import VppLoInterface
+from vpp_acl import AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist
+from vpp_ip import INVALID_INDEX
class TestACLplugin(VppTestCase):
@@ -175,105 +178,49 @@ class TestACLplugin(VppTestCase):
% self.bd_id))
def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
- s_prefix=0, s_ip=b'\x00\x00\x00\x00',
- d_prefix=0, d_ip=b'\x00\x00\x00\x00'):
- if proto == -1:
- return
- if ports == self.PORTS_ALL:
- sport_from = 0
- dport_from = 0
- sport_to = 65535 if proto != 1 and proto != 58 else 255
- dport_to = sport_to
- elif ports == self.PORTS_RANGE:
- if proto == 1:
- sport_from = self.icmp4_type
- sport_to = self.icmp4_type
- dport_from = self.icmp4_code
- dport_to = self.icmp4_code
- elif proto == 58:
- sport_from = self.icmp6_type
- sport_to = self.icmp6_type
- dport_from = self.icmp6_code
- dport_to = self.icmp6_code
- elif proto == self.proto[self.IP][self.TCP]:
- sport_from = self.tcp_sport_from
- sport_to = self.tcp_sport_to
- dport_from = self.tcp_dport_from
- dport_to = self.tcp_dport_to
- elif proto == self.proto[self.IP][self.UDP]:
- sport_from = self.udp_sport_from
- sport_to = self.udp_sport_to
- dport_from = self.udp_dport_from
- dport_to = self.udp_dport_to
- elif ports == self.PORTS_RANGE_2:
- if proto == 1:
- sport_from = self.icmp4_type_2
- sport_to = self.icmp4_type_2
- dport_from = self.icmp4_code_from_2
- dport_to = self.icmp4_code_to_2
- elif proto == 58:
- sport_from = self.icmp6_type_2
- sport_to = self.icmp6_type_2
- dport_from = self.icmp6_code_from_2
- dport_to = self.icmp6_code_to_2
- elif proto == self.proto[self.IP][self.TCP]:
- sport_from = self.tcp_sport_from_2
- sport_to = self.tcp_sport_to_2
- dport_from = self.tcp_dport_from_2
- dport_to = self.tcp_dport_to_2
- elif proto == self.proto[self.IP][self.UDP]:
- sport_from = self.udp_sport_from_2
- sport_to = self.udp_sport_to_2
- dport_from = self.udp_dport_from_2
- dport_to = self.udp_dport_to_2
+ s_prefix=0, s_ip=0,
+ d_prefix=0, d_ip=0):
+ if ip:
+ src_prefix = IPv6Network((s_ip, s_prefix))
+ dst_prefix = IPv6Network((d_ip, d_prefix))
else:
- sport_from = ports
- sport_to = ports
- dport_from = ports
- dport_to = ports
-
- rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
- 'srcport_or_icmptype_first': sport_from,
- 'srcport_or_icmptype_last': sport_to,
- 'src_ip_prefix_len': s_prefix,
- 'src_ip_addr': s_ip,
- 'dstport_or_icmpcode_first': dport_from,
- 'dstport_or_icmpcode_last': dport_to,
- 'dst_ip_prefix_len': d_prefix,
- 'dst_ip_addr': d_ip})
- return rule
-
- def apply_rules(self, rules, tag=b''):
- reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
- tag=tag)
- self.logger.info("Dumped ACL: " + str(
- self.vapi.acl_dump(reply.acl_index)))
+ src_prefix = IPv4Network((s_ip, s_prefix))
+ dst_prefix = IPv4Network((d_ip, d_prefix))
+ return AclRule(is_permit=permit_deny, ports=ports, proto=proto,
+ src_prefix=src_prefix, dst_prefix=dst_prefix)
+
+ def apply_rules(self, rules, tag=None):
+ acl = VppAcl(self, rules, tag=tag)
+ acl.add_vpp_config()
+ self.logger.info("Dumped ACL: " + str(acl.dump()))
# Apply a ACL on the interface as inbound
for i in self.pg_interfaces:
- self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
- n_input=1,
- acls=[reply.acl_index])
- return reply.acl_index
-
- def apply_rules_to(self, rules, tag=b'', sw_if_index=0xFFFFFFFF):
- reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
- tag=tag)
- self.logger.info("Dumped ACL: " + str(
- self.vapi.acl_dump(reply.acl_index)))
+ acl_if = VppAclInterface(
+ self, sw_if_index=i.sw_if_index, n_input=1, acls=[acl])
+ acl_if.add_vpp_config()
+ return acl.acl_index
+
+ def apply_rules_to(self, rules, tag=None, sw_if_index=INVALID_INDEX):
+ acl = VppAcl(self, rules, tag=tag)
+ acl.add_vpp_config()
+ self.logger.info("Dumped ACL: " + str(acl.dump()))
# Apply a ACL on the interface as inbound
- self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
- n_input=1,
- acls=[reply.acl_index])
- return reply.acl_index
+ acl_if = VppAclInterface(self, sw_if_index=sw_if_index, n_input=1,
+ acls=[acl])
+ return acl.acl_index
- def etype_whitelist(self, whitelist, n_input):
+ def etype_whitelist(self, whitelist, n_input, add=True):
# Apply whitelists on all the interfaces
- for i in self.pg_interfaces:
- # checkstyle can't read long names. Help them.
- fun = self.vapi.acl_interface_set_etype_whitelist
- fun(sw_if_index=i.sw_if_index, n_input=n_input,
- whitelist=whitelist)
- return
+ if add:
+ self._wl = []
+ for i in self.pg_interfaces:
+ self._wl.append(VppEtypeWhitelist(
+ self, sw_if_index=i.sw_if_index, whitelist=whitelist,
+ n_input=n_input).add_vpp_config())
+ else:
+ if hasattr(self, "_wl"):
+ for wl in self._wl:
+ wl.remove_vpp_config()
def create_upper_layer(self, packet_index, proto, ports=0):
p = self.proto_map[proto]
@@ -542,24 +489,15 @@ class TestACLplugin(VppTestCase):
"""
self.logger.info("ACLP_TEST_START_0001")
- # Add an ACL
- r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
- 'srcport_or_icmptype_first': 1234,
- 'srcport_or_icmptype_last': 1235,
- 'src_ip_prefix_len': 0,
- 'src_ip_addr': b'\x00\x00\x00\x00',
- 'dstport_or_icmpcode_first': 1234,
- 'dstport_or_icmpcode_last': 1234,
- 'dst_ip_addr': b'\x00\x00\x00\x00',
- 'dst_ip_prefix_len': 0}]
+ # Create a permit-1234 ACL
+ r = [AclRule(is_permit=1, proto=17, ports=1234, sport_to=1235)]
# Test 1: add a new ACL
- reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
- tag=b"permit 1234")
- self.assertEqual(reply.retval, 0)
+ first_acl = VppAcl(self, rules=r, tag="permit 1234")
+ first_acl.add_vpp_config()
+ self.assertTrue(first_acl.query_vpp_config())
# The very first ACL gets #0
- self.assertEqual(reply.acl_index, 0)
- first_acl = reply.acl_index
- rr = self.vapi.acl_dump(reply.acl_index)
+ self.assertEqual(first_acl.acl_index, 0)
+ rr = first_acl.dump()
self.logger.info("Dumped ACL: " + str(rr))
self.assertEqual(len(rr), 1)
# We should have the same number of ACL entries as we had asked
@@ -568,70 +506,49 @@ class TestACLplugin(VppTestCase):
# are different types, we need to iterate over rules and keys to get
# to basic values.
for i_rule in range(0, len(r) - 1):
- for rule_key in r[i_rule]:
+ encoded_rule = r[i_rule].encode()
+ for rule_key in encoded_rule:
self.assertEqual(rr[0].r[i_rule][rule_key],
- r[i_rule][rule_key])
-
- # Add a deny-1234 ACL
- r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
- 'srcport_or_icmptype_first': 1234,
- 'srcport_or_icmptype_last': 1235,
- 'src_ip_prefix_len': 0,
- 'src_ip_addr': b'\x00\x00\x00\x00',
- 'dstport_or_icmpcode_first': 1234,
- 'dstport_or_icmpcode_last': 1234,
- 'dst_ip_addr': b'\x00\x00\x00\x00',
- 'dst_ip_prefix_len': 0},
- {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
- 'srcport_or_icmptype_first': 0,
- 'srcport_or_icmptype_last': 0,
- 'src_ip_prefix_len': 0,
- 'src_ip_addr': b'\x00\x00\x00\x00',
- 'dstport_or_icmpcode_first': 0,
- 'dstport_or_icmpcode_last': 0,
- 'dst_ip_addr': b'\x00\x00\x00\x00',
- 'dst_ip_prefix_len': 0}]
-
- reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
- tag=b"deny 1234;permit all")
- self.assertEqual(reply.retval, 0)
+ encoded_rule[rule_key])
+
+ # Create a deny-1234 ACL
+ r_deny = [AclRule(is_permit=0, proto=17, ports=1234, sport_to=1235),
+ AclRule(is_permit=1, proto=17, ports=0)]
+ second_acl = VppAcl(self, rules=r_deny, tag="deny 1234;permit all")
+ second_acl.add_vpp_config()
+ self.assertTrue(second_acl.query_vpp_config())
# The second ACL gets #1
- self.assertEqual(reply.acl_index, 1)
- second_acl = reply.acl_index
+ self.assertEqual(second_acl.acl_index, 1)
# Test 2: try to modify a nonexistent ACL
- reply = self.vapi.acl_add_replace(acl_index=432, r=r,
- tag=b"FFFF:FFFF", expected_retval=-6)
- self.assertEqual(reply.retval, -6)
- # The ACL number should pass through
- self.assertEqual(reply.acl_index, 432)
+ invalid_acl = VppAcl(self, acl_index=432, rules=r, tag="FFFF:FFFF")
+ reply = invalid_acl.add_vpp_config(expect_error=True)
+
+ # apply an ACL on an interface inbound, try to delete ACL, must fail
+ acl_if_list = VppAclInterface(
+ self, sw_if_index=self.pg0.sw_if_index, n_input=1,
+ acls=[first_acl])
+ acl_if_list.add_vpp_config()
+ first_acl.remove_vpp_config(expect_error=True)
+ # Unapply an ACL and then try to delete it - must be ok
+ acl_if_list.remove_vpp_config()
+ first_acl.remove_vpp_config()
+
# apply an ACL on an interface inbound, try to delete ACL, must fail
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- n_input=1,
- acls=[first_acl])
- reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
+ acl_if_list = VppAclInterface(
+ self, sw_if_index=self.pg0.sw_if_index, n_input=0,
+ acls=[second_acl])
+ acl_if_list.add_vpp_config()
+ second_acl.remove_vpp_config(expect_error=True)
# Unapply an ACL and then try to delete it - must be ok
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- n_input=0,
- acls=[])
- reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
-
- # apply an ACL on an interface outbound, try to delete ACL, must fail
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- n_input=0,
- acls=[second_acl])
- reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
- # Unapply the ACL and then try to delete it - must be ok
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- n_input=0,
- acls=[])
- reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
+ acl_if_list.remove_vpp_config()
+ second_acl.remove_vpp_config()
# try to apply a nonexistent ACL - must fail
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- n_input=1,
- acls=[first_acl],
- expected_retval=-6)
+ acl_if_list = VppAclInterface(
+ self, sw_if_index=self.pg0.sw_if_index, n_input=0,
+ acls=[invalid_acl])
+ acl_if_list.add_vpp_config(expect_error=True)
self.logger.info("ACLP_TEST_FINISH_0001")
@@ -642,12 +559,12 @@ class TestACLplugin(VppTestCase):
rules = []
rules.append(self.create_rule(self.IPV4, self.PERMIT,
- 0, self.proto[self.IP][self.UDP]))
+ 0, self.proto[self.IP][self.UDP]))
rules.append(self.create_rule(self.IPV4, self.PERMIT,
- 0, self.proto[self.IP][self.TCP]))
+ 0, self.proto[self.IP][self.TCP]))
# Apply rules
- acl_idx = self.apply_rules(rules, b"permit per-flow")
+ acl_idx = self.apply_rules(rules, "permit per-flow")
# enable counters
reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
@@ -676,14 +593,15 @@ class TestACLplugin(VppTestCase):
self.logger.info("ACLP_TEST_START_0003")
# Add a deny-flows ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY,
- self.PORTS_ALL, self.proto[self.IP][self.UDP]))
+ rules.append(self.create_rule(
+ self.IPV4, self.DENY, self.PORTS_ALL,
+ self.proto[self.IP][self.UDP]))
# Permit ip any any in the end
rules.append(self.create_rule(self.IPV4, self.PERMIT,
self.PORTS_ALL, 0))
# Apply rules
- acl_idx = self.apply_rules(rules, b"deny per-flow;permit all")
+ acl_idx = self.apply_rules(rules, "deny per-flow;permit all")
# enable counters
reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
@@ -717,7 +635,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit icmpv4")
+ self.apply_rules(rules, "permit icmpv4")
# Traffic should still pass
self.run_verify_test(self.ICMP, self.IPV4,
@@ -738,7 +656,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit icmpv6")
+ self.apply_rules(rules, "permit icmpv6")
# Traffic should still pass
self.run_verify_test(self.ICMP, self.IPV6,
@@ -759,7 +677,7 @@ class TestACLplugin(VppTestCase):
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny icmpv4")
+ self.apply_rules(rules, "deny icmpv4")
# Traffic should not pass
self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
@@ -779,7 +697,7 @@ class TestACLplugin(VppTestCase):
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny icmpv6")
+ self.apply_rules(rules, "deny icmpv6")
# Traffic should not pass
self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
@@ -794,12 +712,12 @@ class TestACLplugin(VppTestCase):
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv4 tcp")
+ self.apply_rules(rules, "permit ipv4 tcp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
@@ -819,7 +737,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip6 tcp")
+ self.apply_rules(rules, "permit ip6 tcp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
@@ -839,7 +757,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv udp")
+ self.apply_rules(rules, "permit ipv udp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
@@ -859,7 +777,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip6 udp")
+ self.apply_rules(rules, "permit ip6 udp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
@@ -884,7 +802,7 @@ class TestACLplugin(VppTestCase):
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny ip4/ip6 tcp")
+ self.apply_rules(rules, "deny ip4/ip6 tcp")
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
@@ -910,7 +828,7 @@ class TestACLplugin(VppTestCase):
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny ip4/ip6 udp")
+ self.apply_rules(rules, "deny ip4/ip6 udp")
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
@@ -948,13 +866,13 @@ class TestACLplugin(VppTestCase):
for i in range(len(r)):
rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
- reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
- result = self.vapi.acl_dump(reply.acl_index)
+ acl = VppAcl(self, rules=rules)
+ acl.add_vpp_config()
+ result = acl.dump()
i = 0
for drules in result:
for dr in drules.r:
- self.assertEqual(dr.is_ipv6, r[i][0])
self.assertEqual(dr.is_permit, r[i][1])
self.assertEqual(dr.proto, r[i][3])
@@ -1001,7 +919,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip4 tcp %d" % port)
+ self.apply_rules(rules, "permit ip4 tcp %d" % port)
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4,
@@ -1023,7 +941,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip4 tcp %d" % port)
+ self.apply_rules(rules, "permit ip4 tcp %d" % port)
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4,
@@ -1045,7 +963,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip4 tcp %d" % port)
+ self.apply_rules(rules, "permit ip4 tcp %d" % port)
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6,
@@ -1054,7 +972,7 @@ class TestACLplugin(VppTestCase):
self.logger.info("ACLP_TEST_FINISH_0017")
def test_0018_udp_permit_port_v6(self):
- """ permit single UPPv6
+ """ permit single UDPv6
"""
self.logger.info("ACLP_TEST_START_0018")
@@ -1068,7 +986,7 @@ class TestACLplugin(VppTestCase):
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip4 tcp %d" % port)
+ self.apply_rules(rules, "permit ip4 tcp %d" % port)
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6,
@@ -1095,7 +1013,7 @@ class TestACLplugin(VppTestCase):
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
+ self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
@@ -1122,7 +1040,7 @@ class TestACLplugin(VppTestCase):
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
+ self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
@@ -1150,7 +1068,7 @@ class TestACLplugin(VppTestCase):
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
+ self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
@@ -1172,7 +1090,7 @@ class TestACLplugin(VppTestCase):
self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit empty udp ip4 %d" % port)
+ self.apply_rules(rules, "permit empty udp ip4 %d" % port)
# Traffic should still pass
# Create incoming packet streams for packet-generator interfaces
@@ -1206,7 +1124,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit empty udp ip6 %d" % port)
+ self.apply_rules(rules, "permit empty udp ip6 %d" % port)
# Traffic should still pass
# Create incoming packet streams for packet-generator interfaces
@@ -1236,14 +1154,14 @@ class TestACLplugin(VppTestCase):
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv4 tcp")
+ self.apply_rules(rules, "permit ipv4 tcp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
@@ -1265,7 +1183,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip6 tcp")
+ self.apply_rules(rules, "permit ip6 tcp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
@@ -1287,7 +1205,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv4 udp")
+ self.apply_rules(rules, "permit ipv4 udp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
@@ -1309,7 +1227,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip6 udp")
+ self.apply_rules(rules, "permit ip6 udp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
@@ -1340,7 +1258,7 @@ class TestACLplugin(VppTestCase):
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny ip4/ip6 tcp")
+ self.apply_rules(rules, "deny ip4/ip6 tcp")
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
@@ -1372,7 +1290,7 @@ class TestACLplugin(VppTestCase):
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny ip4/ip6 udp")
+ self.apply_rules(rules, "deny ip4/ip6 udp")
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
@@ -1388,14 +1306,14 @@ class TestACLplugin(VppTestCase):
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv4 tcp")
+ self.apply_rules(rules, "permit ipv4 tcp")
# Traffic should still pass also for an odd ethertype
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
@@ -1410,15 +1328,14 @@ class TestACLplugin(VppTestCase):
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv4 tcp")
-
+ self.apply_rules(rules, "permit ipv4 tcp")
# whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
self.etype_whitelist([0xbbb], 1)
@@ -1428,7 +1345,7 @@ class TestACLplugin(VppTestCase):
0, False, 0xaaaa)
# remove the whitelist
- self.etype_whitelist([], 0)
+ self.etype_whitelist([], 0, add=False)
self.logger.info("ACLP_TEST_FINISH_0305")
@@ -1440,15 +1357,14 @@ class TestACLplugin(VppTestCase):
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv4 tcp")
-
+ self.apply_rules(rules, "permit ipv4 tcp")
# whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
self.etype_whitelist([0xbbb], 1)
@@ -1457,7 +1373,7 @@ class TestACLplugin(VppTestCase):
0, False, True, 0x0bbb)
# remove the whitelist, the previously blocked 0xAAAA should pass now
- self.etype_whitelist([], 0)
+ self.etype_whitelist([], 0, add=False)
self.logger.info("ACLP_TEST_FINISH_0306")
@@ -1469,19 +1385,19 @@ class TestACLplugin(VppTestCase):
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv4 tcp")
+ self.apply_rules(rules, "permit ipv4 tcp")
# whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
self.etype_whitelist([0xbbb], 1)
# remove the whitelist, the previously blocked 0xAAAA should pass now
- self.etype_whitelist([], 0)
+ self.etype_whitelist([], 0, add=False)
# The whitelisted traffic, should pass
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
@@ -1497,9 +1413,9 @@ class TestACLplugin(VppTestCase):
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
@@ -1508,12 +1424,13 @@ class TestACLplugin(VppTestCase):
intf.append(VppLoInterface(self))
# Apply rules
- self.apply_rules_to(rules, b"permit ipv4 tcp", intf[0].sw_if_index)
+ self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
# Remove the interface
intf[0].remove_vpp_config()
self.logger.info("ACLP_TEST_FINISH_0315")
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)
diff --git a/src/plugins/acl/test/test_acl_plugin_conns.py b/src/plugins/acl/test/test_acl_plugin_conns.py
index f4cf5947043..c7941fa150b 100644
--- a/src/plugins/acl/test/test_acl_plugin_conns.py
+++ b/src/plugins/acl/test/test_acl_plugin_conns.py
@@ -14,6 +14,9 @@ from scapy.layers.inet6 import IPv6ExtHdrFragment
from pprint import pprint
from random import randint
from util import L4_Conn
+from ipaddress import ip_network
+
+from vpp_acl import AclRule, VppAcl, VppAclInterface
def to_acl_rule(self, is_permit, wildcard_sport=False):
@@ -35,23 +38,18 @@ def to_acl_rule(self, is_permit, wildcard_sport=False):
rule_l4_sport_first = rule_l4_sport
rule_l4_sport_last = rule_l4_sport
- new_rule = {
- 'is_permit': is_permit,
- 'is_ipv6': p.haslayer(IPv6),
- 'src_ip_addr': inet_pton(rule_family,
- p[rule_l3_layer].src),
- 'src_ip_prefix_len': rule_prefix_len,
- 'dst_ip_addr': inet_pton(rule_family,
- p[rule_l3_layer].dst),
- 'dst_ip_prefix_len': rule_prefix_len,
- 'srcport_or_icmptype_first': rule_l4_sport_first,
- 'srcport_or_icmptype_last': rule_l4_sport_last,
- 'dstport_or_icmpcode_first': rule_l4_dport,
- 'dstport_or_icmpcode_last': rule_l4_dport,
- 'proto': rule_l4_proto,
- }
+ new_rule = AclRule(is_permit=is_permit, proto=rule_l4_proto,
+ src_prefix=ip_network(
+ (p[rule_l3_layer].src, rule_prefix_len)),
+ dst_prefix=ip_network(
+ (p[rule_l3_layer].dst, rule_prefix_len)),
+ sport_from=rule_l4_sport_first,
+ sport_to=rule_l4_sport_last,
+ dport_from=rule_l4_dport, dport_to=rule_l4_dport)
+
return new_rule
+
Packet.to_acl_rule = to_acl_rule
@@ -79,48 +77,44 @@ class Conn(L4_Conn):
r = []
r.append(pkt.to_acl_rule(2, wildcard_sport=True))
r.append(self.wildcard_rule(0))
- res = self.testcase.vapi.acl_add_replace(0xffffffff, r)
- self.testcase.assert_equal(res.retval, 0, "error adding ACL")
- reflect_acl_index = res.acl_index
+ reflect_acl = VppAcl(self.testcase, r)
+ reflect_acl.add_vpp_config()
r = []
r.append(self.wildcard_rule(0))
- res = self.testcase.vapi.acl_add_replace(0xffffffff, r)
- self.testcase.assert_equal(res.retval, 0, "error adding deny ACL")
- deny_acl_index = res.acl_index
+ deny_acl = VppAcl(self.testcase, r)
+ deny_acl.add_vpp_config()
if reflect_side == acl_side:
- self.testcase.vapi.acl_interface_set_acl_list(
- self.ifs[acl_side].sw_if_index, 1,
- [reflect_acl_index,
- deny_acl_index])
- self.testcase.vapi.acl_interface_set_acl_list(
- self.ifs[1-acl_side].sw_if_index, 0, [])
+ acl_if0 = VppAclInterface(self.testcase,
+ self.ifs[acl_side].sw_if_index,
+ [reflect_acl, deny_acl], n_input=1)
+ acl_if1 = VppAclInterface(self.testcase,
+ self.ifs[1-acl_side].sw_if_index, [],
+ n_input=0)
+ acl_if0.add_vpp_config()
+ acl_if1.add_vpp_config()
else:
- self.testcase.vapi.acl_interface_set_acl_list(
- self.ifs[acl_side].sw_if_index, 1,
- [deny_acl_index,
- reflect_acl_index])
- self.testcase.vapi.acl_interface_set_acl_list(
- self.ifs[1-acl_side].sw_if_index, 0, [])
+ acl_if0 = VppAclInterface(self.testcase,
+ self.ifs[acl_side].sw_if_index,
+ [deny_acl, reflect_acl], n_input=1)
+ acl_if1 = VppAclInterface(self.testcase,
+ self.ifs[1-acl_side].sw_if_index, [],
+ n_input=0)
+ acl_if0.add_vpp_config()
+ acl_if1.add_vpp_config()
def wildcard_rule(self, is_permit):
any_addr = ["0.0.0.0", "::"]
rule_family = self.address_family
is_ip6 = 1 if rule_family == AF_INET6 else 0
- new_rule = {
- 'is_permit': is_permit,
- 'is_ipv6': is_ip6,
- 'src_ip_addr': inet_pton(rule_family, any_addr[is_ip6]),
- 'src_ip_prefix_len': 0,
- 'dst_ip_addr': inet_pton(rule_family, any_addr[is_ip6]),
- 'dst_ip_prefix_len': 0,
- 'srcport_or_icmptype_first': 0,
- 'srcport_or_icmptype_last': 65535,
- 'dstport_or_icmpcode_first': 0,
- 'dstport_or_icmpcode_last': 65535,
- 'proto': 0,
- }
+ new_rule = AclRule(is_permit=is_permit, proto=0,
+ src_prefix=ip_network(
+ (any_addr[is_ip6], 0)),
+ dst_prefix=ip_network(
+ (any_addr[is_ip6], 0)),
+ sport_from=0, sport_to=65535, dport_from=0,
+ dport_to=65535)
return new_rule
diff --git a/src/plugins/acl/test/test_acl_plugin_l2l3.py b/src/plugins/acl/test/test_acl_plugin_l2l3.py
index 3379871e0b9..30b53728c63 100644
--- a/src/plugins/acl/test/test_acl_plugin_l2l3.py
+++ b/src/plugins/acl/test/test_acl_plugin_l2l3.py
@@ -23,10 +23,12 @@
"""
+import copy
import unittest
from socket import inet_pton, AF_INET, AF_INET6
from random import choice, shuffle
from pprint import pprint
+from ipaddress import ip_network
import scapy.compat
from scapy.packet import Raw
@@ -40,6 +42,8 @@ from framework import VppTestCase, VppTestRunner
from vpp_l2 import L2_PORT_TYPE
import time
+from vpp_acl import AclRule, VppAcl, VppAclInterface
+
class TestACLpluginL2L3(VppTestCase):
"""TestACLpluginL2L3 Test Case"""
@@ -259,31 +263,26 @@ class TestACLpluginL2L3(VppTestCase):
else:
rule_l4_proto = p[IP].proto
- new_rule = {
- 'is_permit': is_permit,
- 'is_ipv6': p.haslayer(IPv6),
- 'src_ip_addr': inet_pton(rule_family,
- p[rule_l3_layer].src),
- 'src_ip_prefix_len': rule_prefix_len,
- 'dst_ip_addr': inet_pton(rule_family,
- p[rule_l3_layer].dst),
- 'dst_ip_prefix_len': rule_prefix_len,
- 'srcport_or_icmptype_first': rule_l4_sport,
- 'srcport_or_icmptype_last': rule_l4_sport,
- 'dstport_or_icmpcode_first': rule_l4_dport,
- 'dstport_or_icmpcode_last': rule_l4_dport,
- 'proto': rule_l4_proto,
- }
+ new_rule = AclRule(is_permit=is_permit, proto=rule_l4_proto,
+ src_prefix=ip_network(
+ (p[rule_l3_layer].src, rule_prefix_len)),
+ dst_prefix=ip_network(
+ (p[rule_l3_layer].dst, rule_prefix_len)),
+ sport_from=rule_l4_sport,
+ sport_to=rule_l4_sport,
+ dport_from=rule_l4_dport,
+ dport_to=rule_l4_dport)
+
rules.append(new_rule)
- new_rule_permit = new_rule.copy()
- new_rule_permit['is_permit'] = 1
+ new_rule_permit = copy.copy(new_rule)
+ new_rule_permit.is_permit = 1
permit_rules.append(new_rule_permit)
- new_rule_permit_and_reflect = new_rule.copy()
+ new_rule_permit_and_reflect = copy.copy(new_rule)
if can_reflect_this_packet:
- new_rule_permit_and_reflect['is_permit'] = 2
+ new_rule_permit_and_reflect.is_permit = 2
else:
- new_rule_permit_and_reflect['is_permit'] = is_permit
+ new_rule_permit_and_reflect.is_permit = is_permit
permit_and_reflect_rules.append(new_rule_permit_and_reflect)
self.logger.info("create_stream pkt#%d: %s" % (i, payload))
@@ -356,79 +355,67 @@ class TestACLpluginL2L3(VppTestCase):
# UDP:
- def applied_acl_shuffle(self, sw_if_index):
- # first collect what ACLs are applied and what they look like
- r = self.vapi.acl_interface_list_dump(sw_if_index=sw_if_index)
- orig_applied_acls = r[0]
-
- # we will collect these just to save and generate additional rulesets
- orig_acls = []
- for acl_num in orig_applied_acls.acls:
- rr = self.vapi.acl_dump(acl_num)
- orig_acls.append(rr[0])
+ def applied_acl_shuffle(self, acl_if):
+ saved_n_input = acl_if.n_input
+ # TOTO: maybe copy each one??
+ saved_acls = acl_if.acls
# now create a list of all the rules in all ACLs
all_rules = []
- for old_acl in orig_acls:
- for rule in old_acl.r:
- all_rules.append(dict(rule._asdict()))
+ for old_acl in saved_acls:
+ for rule in old_acl.rules:
+ all_rules.append(rule)
# Add a few ACLs made from shuffled rules
shuffle(all_rules)
- reply = self.vapi.acl_add_replace(acl_index=4294967295,
- r=all_rules[::2],
- tag=b"shuffle 1. acl")
- shuffle_acl_1 = reply.acl_index
+ acl1 = VppAcl(self, rules=all_rules[::2], tag="shuffle 1. acl")
+ acl1.add_vpp_config()
+
shuffle(all_rules)
- reply = self.vapi.acl_add_replace(acl_index=4294967295,
- r=all_rules[::3],
- tag=b"shuffle 2. acl")
- shuffle_acl_2 = reply.acl_index
+ acl2 = VppAcl(self, rules=all_rules[::3], tag="shuffle 2. acl")
+ acl2.add_vpp_config()
+
shuffle(all_rules)
- reply = self.vapi.acl_add_replace(acl_index=4294967295,
- r=all_rules[::2],
- tag=b"shuffle 3. acl")
- shuffle_acl_3 = reply.acl_index
+ acl3 = VppAcl(self, rules=all_rules[::2], tag="shuffle 3. acl")
+ acl3.add_vpp_config()
# apply the shuffle ACLs in front
- input_acls = [shuffle_acl_1, shuffle_acl_2]
- output_acls = [shuffle_acl_1, shuffle_acl_2]
+ input_acls = [acl1, acl2]
+ output_acls = [acl1, acl2]
# add the currently applied ACLs
- n_input = orig_applied_acls.n_input
- input_acls.extend(orig_applied_acls.acls[:n_input])
- output_acls.extend(orig_applied_acls.acls[n_input:])
+ n_input = acl_if.n_input
+ input_acls.extend(saved_acls[:n_input])
+ output_acls.extend(saved_acls[n_input:])
# and the trailing shuffle ACL(s)
- input_acls.extend([shuffle_acl_3])
- output_acls.extend([shuffle_acl_3])
+ input_acls.extend([acl3])
+ output_acls.extend([acl3])
# set the interface ACL list to the result
- self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
- n_input=len(input_acls),
- acls=input_acls + output_acls)
+ acl_if.n_input = len(input_acls)
+ acl_if.acls = input_acls + output_acls
+ acl_if.add_vpp_config()
+
# change the ACLs a few times
for i in range(1, 10):
shuffle(all_rules)
- reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_1,
- r=all_rules[::1+(i % 2)],
- tag=b"shuffle 1. acl")
+ acl1.modify_vpp_config(all_rules[::1+(i % 2)])
+
shuffle(all_rules)
- reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2,
- r=all_rules[::1+(i % 3)],
- tag=b"shuffle 2. acl")
+ acl2.modify_vpp_config(all_rules[::1+(i % 3)])
+
shuffle(all_rules)
- reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2,
- r=all_rules[::1+(i % 5)],
- tag=b"shuffle 3. acl")
+ acl3.modify_vpp_config(all_rules[::1+(i % 5)])
# restore to how it was before and clean up
- self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
- n_input=orig_applied_acls.n_input,
- acls=orig_applied_acls.acls)
- reply = self.vapi.acl_del(acl_index=shuffle_acl_1)
- reply = self.vapi.acl_del(acl_index=shuffle_acl_2)
- reply = self.vapi.acl_del(acl_index=shuffle_acl_3)
+ acl_if.n_input = saved_n_input
+ acl_if.acls = saved_acls
+ acl_if.add_vpp_config()
+
+ acl1.remove_vpp_config()
+ acl2.remove_vpp_config()
+ acl3.remove_vpp_config()
def create_acls_for_a_stream(self, stream_dict,
test_l2_action, is_reflect):
@@ -436,15 +423,14 @@ class TestACLpluginL2L3(VppTestCase):
r_permit = stream_dict['permit_rules']
r_permit_reflect = stream_dict['permit_and_reflect_rules']
r_action = r_permit_reflect if is_reflect else r
- reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_action,
- tag=b"act. acl")
- action_acl_index = reply.acl_index
- reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_permit,
- tag=b"perm. acl")
- permit_acl_index = reply.acl_index
- return {'L2': action_acl_index if test_l2_action else permit_acl_index,
- 'L3': permit_acl_index if test_l2_action else action_acl_index,
- 'permit': permit_acl_index, 'action': action_acl_index}
+ action_acl = VppAcl(self, rules=r_action, tag="act. acl")
+ action_acl.add_vpp_config()
+ permit_acl = VppAcl(self, rules=r_permit, tag="perm. acl")
+ permit_acl.add_vpp_config()
+
+ return {'L2': action_acl if test_l2_action else permit_acl,
+ 'L3': permit_acl if test_l2_action else action_acl,
+ 'permit': permit_acl, 'action': action_acl}
def apply_acl_ip46_x_to_y(self, bridged_to_routed, test_l2_deny,
is_ip6, is_reflect, add_eh):
@@ -452,26 +438,30 @@ class TestACLpluginL2L3(VppTestCase):
"""
self.reset_packet_infos()
stream_dict = self.create_stream(
- self.pg2, self.loop0,
- bridged_to_routed,
- self.pg_if_packet_sizes, is_ip6,
- not is_reflect, False, add_eh)
+ self.pg2, self.loop0,
+ bridged_to_routed,
+ self.pg_if_packet_sizes, is_ip6,
+ not is_reflect, False, add_eh)
stream = stream_dict['stream']
acl_idx = self.create_acls_for_a_stream(stream_dict, test_l2_deny,
is_reflect)
n_input_l3 = 0 if bridged_to_routed else 1
n_input_l2 = 1 if bridged_to_routed else 0
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
- n_input=n_input_l3,
- acls=[acl_idx['L3']])
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- n_input=n_input_l2,
- acls=[acl_idx['L2']])
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
- n_input=n_input_l2,
- acls=[acl_idx['L2']])
- self.applied_acl_shuffle(self.pg0.sw_if_index)
- self.applied_acl_shuffle(self.pg2.sw_if_index)
+
+ acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index,
+ n_input=n_input_l3, acls=[acl_idx['L3']])
+ acl_if_pg2.add_vpp_config()
+
+ acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index,
+ n_input=n_input_l2, acls=[acl_idx['L2']])
+ acl_if_pg0.add_vpp_config()
+
+ acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index,
+ n_input=n_input_l2, acls=[acl_idx['L2']])
+ acl_if_pg1.add_vpp_config()
+
+ self.applied_acl_shuffle(acl_if_pg0)
+ self.applied_acl_shuffle(acl_if_pg1)
return {'L2': acl_idx['L2'], 'L3': acl_idx['L3']}
def apply_acl_ip46_both_directions_reflect(self,
@@ -516,20 +506,23 @@ class TestACLpluginL2L3(VppTestCase):
else:
outbound_l3_acl = acl_idx_rev['L3']
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
- n_input=1,
- acls=[inbound_l3_acl,
- outbound_l3_acl])
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- n_input=1,
- acls=[inbound_l2_acl,
- outbound_l2_acl])
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
- n_input=1,
- acls=[inbound_l2_acl,
- outbound_l2_acl])
- self.applied_acl_shuffle(self.pg0.sw_if_index)
- self.applied_acl_shuffle(self.pg2.sw_if_index)
+ acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index,
+ n_input=1,
+ acls=[inbound_l3_acl, outbound_l3_acl])
+ acl_if_pg2.add_vpp_config()
+
+ acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index,
+ n_input=1,
+ acls=[inbound_l2_acl, outbound_l2_acl])
+ acl_if_pg0.add_vpp_config()
+
+ acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index,
+ n_input=1,
+ acls=[inbound_l2_acl, outbound_l2_acl])
+ acl_if_pg1.add_vpp_config()
+
+ self.applied_acl_shuffle(acl_if_pg0)
+ self.applied_acl_shuffle(acl_if_pg2)
def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
is_reflect, add_eh):
@@ -594,7 +587,7 @@ class TestACLpluginL2L3(VppTestCase):
pkts = self.run_traffic_ip46_routed_to_bridged(test_l2_deny, is_ip6,
is_reflect, False,
add_eh)
- self.verify_acl_packet_count(acls['L3'], pkts)
+ self.verify_acl_packet_count(acls['L3'].acl_index, pkts)
def run_test_ip46_bridged_to_routed(self, test_l2_deny,
is_ip6, is_reflect, add_eh):
@@ -604,7 +597,7 @@ class TestACLpluginL2L3(VppTestCase):
pkts = self.run_traffic_ip46_bridged_to_routed(test_l2_deny, is_ip6,
is_reflect, False,
add_eh)
- self.verify_acl_packet_count(acls['L2'], pkts)
+ self.verify_acl_packet_count(acls['L2'].acl_index, pkts)
def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action,
is_ip6, add_eh,
diff --git a/src/plugins/acl/test/test_acl_plugin_macip.py b/src/plugins/acl/test/test_acl_plugin_macip.py
index 0f178a36b69..5edd7b03258 100644
--- a/src/plugins/acl/test/test_acl_plugin_macip.py
+++ b/src/plugins/acl/test/test_acl_plugin_macip.py
@@ -9,6 +9,7 @@ from socket import inet_ntop, inet_pton, AF_INET, AF_INET6
from struct import pack, unpack
import re
import unittest
+from ipaddress import ip_network, IPv4Network, IPv6Network
import scapy.compat
from scapy.packet import Raw
@@ -21,6 +22,9 @@ from vpp_lo_interface import VppLoInterface
from vpp_l2 import L2_PORT_TYPE
from vpp_sub_interface import L2_VTR_OP, VppSubInterface, VppDot1QSubint, \
VppDot1ADSubint
+from vpp_acl import AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist, \
+ VppMacipAclInterface, VppMacipAcl, MacipRule
+from vpp_papi import MACAddress
class MethodHolder(VppTestCase):
@@ -72,10 +76,10 @@ class MethodHolder(VppTestCase):
# create 2 subinterfaces
cls.subifs = [
- VppDot1QSubint(cls, cls.pg1, 10),
- VppDot1ADSubint(cls, cls.pg2, 20, 300, 400),
- VppDot1QSubint(cls, cls.pg3, 30),
- VppDot1ADSubint(cls, cls.pg3, 40, 600, 700)]
+ VppDot1QSubint(cls, cls.pg1, 10),
+ VppDot1ADSubint(cls, cls.pg2, 20, 300, 400),
+ VppDot1QSubint(cls, cls.pg3, 30),
+ VppDot1ADSubint(cls, cls.pg3, 40, 600, 700)]
cls.subifs[0].set_vtr(L2_VTR_OP.L2_POP_1,
inner=10, push1q=1)
@@ -158,11 +162,6 @@ class MethodHolder(VppTestCase):
def setUp(self):
super(MethodHolder, self).setUp()
self.reset_packet_infos()
- del self.ACLS[:]
-
- def tearDown(self):
- super(MethodHolder, self).tearDown()
- self.delete_acls()
def show_commands_at_teardown(self):
self.logger.info(self.vapi.ppcli("show interface address"))
@@ -182,19 +181,21 @@ class MethodHolder(VppTestCase):
acls = self.vapi.macip_acl_dump()
if self.DEBUG:
for acl in acls:
- print("ACL #"+str(acl.acl_index))
+ # print("ACL #"+str(acl.acl_index))
for r in acl.r:
rule = "ACTION"
if r.is_permit == 1:
rule = "PERMIT"
elif r.is_permit == 0:
rule = "DENY "
+ """
print(" IP6" if r.is_ipv6 else " IP4",
rule,
binascii.hexlify(r.src_mac),
binascii.hexlify(r.src_mac_mask),
unpack('<16B', r.src_ip_addr),
r.src_ip_prefix_len)
+ """
return acls
def create_rules(self, mac_type=EXACT_MAC, ip_type=EXACT_IP,
@@ -252,19 +253,16 @@ class MethodHolder(VppTestCase):
elif ip_type == self.SUBNET_IP:
ip4[2] = random.randint(100, 200)
ip4[3] = 0
- ip6[8] = random.randint(100, 200)
+ ip6[7] = random.randint(100, 200)
ip6[15] = 0
ip_pack = b''
for j in range(0, len(ip)):
ip_pack += pack('<B', int(ip[j]))
- rule = ({'is_permit': self.PERMIT,
- 'is_ipv6': is_ip6,
- 'src_ip_addr': ip_pack,
- 'src_ip_prefix_len': ip_len,
- 'src_mac': binascii.unhexlify(mac.replace(':', '')),
- 'src_mac_mask': binascii.unhexlify(
- mask.replace(':', ''))})
+ rule = MacipRule(is_permit=self.PERMIT,
+ src_prefix=ip_network((ip_pack, ip_len)),
+ src_mac=MACAddress(mac).packed,
+ src_mac_mask=MACAddress(mask).packed)
rules.append(rule)
if ip_type == self.WILD_IP:
break
@@ -274,10 +272,12 @@ class MethodHolder(VppTestCase):
return acls
def apply_macip_rules(self, acls):
+ macip_acls = []
for acl in acls:
- reply = self.vapi.macip_acl_add(acl)
- self.assertEqual(reply.retval, 0)
- self.ACLS.append(reply.acl_index)
+ macip_acl = VppMacipAcl(self, rules=acl)
+ macip_acl.add_vpp_config()
+ macip_acls.append(macip_acl)
+ return macip_acls
def verify_macip_acls(self, acl_count, rules_count, expected_count=2):
reply = self.macip_acl_dump_debug()
@@ -292,20 +292,6 @@ class MethodHolder(VppTestCase):
reply = self.vapi.macip_acl_interface_get()
self.assertEqual(reply.count, expected_count)
- def delete_acls(self):
- for acl in range(len(self.ACLS)-1, -1, -1):
- self.vapi.macip_acl_del(self.ACLS[acl])
-
- reply = self.vapi.macip_acl_dump()
- self.assertEqual(len(reply), 0)
-
- intf_acls = self.vapi.acl_interface_list_dump()
- for i_a in intf_acls:
- sw_if_index = i_a.sw_if_index
- for acl_index in i_a.acls:
- self.vapi.acl_interface_add_del(sw_if_index, acl_index, 0)
- self.vapi.acl_del(acl_index)
-
def create_stream(self, mac_type, ip_type, packet_count,
src_if, dst_if, traffic, is_ip6, tags=PERMIT_TAGS):
# exact MAC and exact IP
@@ -526,48 +512,44 @@ class MethodHolder(VppTestCase):
else:
rule_l4_proto = packet[IP].proto
- acl_rule = {
- 'is_permit': is_permit,
- 'is_ipv6': is_ip6,
- 'src_ip_addr': inet_pton(rule_family,
- packet[rule_l3_layer].src),
- 'src_ip_prefix_len': rule_prefix_len,
- 'dst_ip_addr': inet_pton(rule_family,
- packet[rule_l3_layer].dst),
- 'dst_ip_prefix_len': rule_prefix_len,
- 'srcport_or_icmptype_first': rule_l4_sport,
- 'srcport_or_icmptype_last': rule_l4_sport,
- 'dstport_or_icmpcode_first': rule_l4_dport,
- 'dstport_or_icmpcode_last': rule_l4_dport,
- 'proto': rule_l4_proto}
+ src_network = ip_network(
+ (packet[rule_l3_layer].src, rule_prefix_len))
+ dst_network = ip_network(
+ (packet[rule_l3_layer].dst, rule_prefix_len))
+ acl_rule = AclRule(is_permit=is_permit, proto=rule_l4_proto,
+ src_prefix=src_network,
+ dst_prefix=dst_network,
+ sport_from=rule_l4_sport,
+ sport_to=rule_l4_sport,
+ dport_from=rule_l4_dport,
+ dport_to=rule_l4_dport)
acl_rules.append(acl_rule)
if mac_type == self.WILD_MAC and ip_type == self.WILD_IP and p > 0:
continue
if is_permit:
- macip_rule = ({
- 'is_permit': is_permit,
- 'is_ipv6': is_ip6,
- 'src_ip_addr': ip_rule,
- 'src_ip_prefix_len': prefix_len,
- 'src_mac': binascii.unhexlify(mac_rule.replace(':', '')),
- 'src_mac_mask': binascii.unhexlify(
- mac_mask.replace(':', ''))})
+ macip_rule = MacipRule(
+ is_permit=is_permit,
+ src_prefix=ip_network(
+ (ip_rule, prefix_len)),
+ src_mac=MACAddress(mac_rule).packed,
+ src_mac_mask=MACAddress(mac_mask).packed)
macip_rules.append(macip_rule)
# deny all other packets
if not (mac_type == self.WILD_MAC and ip_type == self.WILD_IP):
- macip_rule = ({'is_permit': 0,
- 'is_ipv6': is_ip6,
- 'src_ip_addr': "",
- 'src_ip_prefix_len': 0,
- 'src_mac': "",
- 'src_mac_mask': ""})
+ network = IPv6Network((0, 0)) if is_ip6 else IPv4Network((0, 0))
+ macip_rule = MacipRule(
+ is_permit=0,
+ src_prefix=network,
+ src_mac=MACAddress("00:00:00:00:00:00").packed,
+ src_mac_mask=MACAddress("00:00:00:00:00:00").packed)
macip_rules.append(macip_rule)
- acl_rule = {'is_permit': 0,
- 'is_ipv6': is_ip6}
+ network = IPv6Network((0, 0)) if is_ip6 else IPv4Network((0, 0))
+ acl_rule = AclRule(is_permit=0, src_prefix=network, dst_prefix=network,
+ sport_from=0, sport_to=0, dport_from=0, dport_to=0)
acl_rules.append(acl_rule)
return {'stream': packets,
'macip_rules': macip_rules,
@@ -644,36 +626,32 @@ class MethodHolder(VppTestCase):
if apply_rules:
if isMACIP:
- reply = self.vapi.macip_acl_add(test_dict['macip_rules'])
+ self.acl = VppMacipAcl(self, rules=test_dict['macip_rules'])
else:
- reply = self.vapi.acl_add_replace(acl_index=4294967295,
- r=test_dict['acl_rules'])
- self.assertEqual(reply.retval, 0)
- acl_index = reply.acl_index
+ self.acl = VppAcl(self, rules=test_dict['acl_rules'])
+ self.acl.add_vpp_config()
if isMACIP:
- self.vapi.macip_acl_interface_add_del(
- sw_if_index=tx_if.sw_if_index,
- acl_index=acl_index)
- reply = self.vapi.macip_acl_interface_get()
- self.assertEqual(reply.acls[tx_if.sw_if_index], acl_index)
- self.ACLS.append(reply.acls[tx_if.sw_if_index])
+ self.acl_if = VppMacipAclInterface(
+ self, sw_if_index=tx_if.sw_if_index, acls=[self.acl])
+ self.acl_if.add_vpp_config()
+
+ dump = self.acl_if.dump()
+ self.assertTrue(dump)
+ self.assertEqual(dump[0].acls[0], self.acl.acl_index)
else:
- self.vapi.acl_interface_add_del(
- sw_if_index=tx_if.sw_if_index, acl_index=acl_index)
+ self.acl_if = VppAclInterface(
+ self, sw_if_index=tx_if.sw_if_index, n_input=1,
+ acls=[self.acl])
+ self.acl_if.add_vpp_config()
else:
- self.vapi.macip_acl_interface_add_del(
- sw_if_index=tx_if.sw_if_index,
- acl_index=0)
- if try_replace:
+ if hasattr(self, "acl_if"):
+ self.acl_if.remove_vpp_config()
+ if try_replace and hasattr(self, "acl"):
if isMACIP:
- reply = self.vapi.macip_acl_add_replace(
- test_dict['macip_rules'],
- acl_index)
+ self.acl.modify_vpp_config(test_dict['macip_rules'])
else:
- reply = self.vapi.acl_add_replace(acl_index=acl_index,
- r=test_dict['acl_rules'])
- self.assertEqual(reply.retval, 0)
+ self.acl.modify_vpp_config(test_dict['acl_rules'])
if not isinstance(src_if, VppSubInterface):
tx_if.add_stream(test_dict['stream'])
@@ -693,9 +671,10 @@ class MethodHolder(VppTestCase):
self.get_packet_count_for_if_idx(dst_if.sw_if_index))
self.verify_capture(test_dict['stream'], capture, is_ip6)
if not isMACIP:
- self.vapi.acl_interface_add_del(sw_if_index=tx_if.sw_if_index,
- acl_index=acl_index, is_add=0)
- self.vapi.acl_del(acl_index)
+ if hasattr(self, "acl_if"):
+ self.acl_if.remove_vpp_config()
+ if hasattr(self, "acl"):
+ self.acl.remove_vpp_config()
def run_test_acls(self, mac_type, ip_type, acl_count,
rules_count, traffic=None, ip=None):
@@ -1077,17 +1056,13 @@ class TestMACIP(MethodHolder):
r1 = self.create_rules(acl_count=3, rules_count=[2, 2, 2])
r2 = self.create_rules(mac_type=self.OUI_MAC, ip_type=self.SUBNET_IP)
- self.apply_macip_rules(r1)
+ macip_acls = self.apply_macip_rules(r1)
acls_before = self.macip_acl_dump_debug()
# replace acls #2, #3 with new
- reply = self.vapi.macip_acl_add_replace(r2[0], 2)
- self.assertEqual(reply.retval, 0)
- self.assertEqual(reply.acl_index, 2)
- reply = self.vapi.macip_acl_add_replace(r2[1], 3)
- self.assertEqual(reply.retval, 0)
- self.assertEqual(reply.acl_index, 3)
+ macip_acls[2].modify_vpp_config(r2[0])
+ macip_acls[3].modify_vpp_config(r2[1])
acls_after = self.macip_acl_dump_debug()
@@ -1118,21 +1093,25 @@ class TestMACIP(MethodHolder):
intf_count = len(self.interfaces)+1
intf = []
- self.apply_macip_rules(self.create_rules(acl_count=3,
- rules_count=[3, 5, 4]))
+ macip_alcs = self.apply_macip_rules(
+ self.create_rules(acl_count=3, rules_count=[3, 5, 4]))
intf.append(VppLoInterface(self))
intf.append(VppLoInterface(self))
sw_if_index0 = intf[0].sw_if_index
- self.vapi.macip_acl_interface_add_del(sw_if_index0, 1)
+ macip_acl_if0 = VppMacipAclInterface(
+ self, sw_if_index=sw_if_index0, acls=[macip_alcs[1]])
+ macip_acl_if0.add_vpp_config()
reply = self.vapi.macip_acl_interface_get()
self.assertEqual(reply.count, intf_count+1)
self.assertEqual(reply.acls[sw_if_index0], 1)
sw_if_index1 = intf[1].sw_if_index
- self.vapi.macip_acl_interface_add_del(sw_if_index1, 0)
+ macip_acl_if1 = VppMacipAclInterface(
+ self, sw_if_index=sw_if_index1, acls=[macip_alcs[0]])
+ macip_acl_if1.add_vpp_config()
reply = self.vapi.macip_acl_interface_get()
self.assertEqual(reply.count, intf_count+2)
@@ -1148,8 +1127,12 @@ class TestMACIP(MethodHolder):
intf.append(VppLoInterface(self))
sw_if_index2 = intf[2].sw_if_index
sw_if_index3 = intf[3].sw_if_index
- self.vapi.macip_acl_interface_add_del(sw_if_index2, 1)
- self.vapi.macip_acl_interface_add_del(sw_if_index3, 1)
+ macip_acl_if2 = VppMacipAclInterface(
+ self, sw_if_index=sw_if_index2, acls=[macip_alcs[1]])
+ macip_acl_if2.add_vpp_config()
+ macip_acl_if3 = VppMacipAclInterface(
+ self, sw_if_index=sw_if_index3, acls=[macip_alcs[1]])
+ macip_acl_if3.add_vpp_config()
reply = self.vapi.macip_acl_interface_get()
self.assertEqual(reply.count, intf_count+3)
diff --git a/src/plugins/acl/test/test_classify_l2_acl.py b/src/plugins/acl/test/test_classify_l2_acl.py
index 0cba6c83cba..b1309881e58 100644
--- a/src/plugins/acl/test/test_classify_l2_acl.py
+++ b/src/plugins/acl/test/test_classify_l2_acl.py
@@ -603,5 +603,6 @@ class TestClassifyAcl(TestClassifier):
self.acl_active_table = key
self.run_verify_test(self.IP, self.IPV4, -1)
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)
diff --git a/src/plugins/acl/test/vpp_acl.py b/src/plugins/acl/test/vpp_acl.py
new file mode 100644
index 00000000000..2d2f7ca257b
--- /dev/null
+++ b/src/plugins/acl/test/vpp_acl.py
@@ -0,0 +1,476 @@
+from ipaddress import IPv4Network
+
+from vpp_object import VppObject
+from vpp_papi import VppEnum
+from vpp_ip import INVALID_INDEX
+from vpp_papi_provider import UnexpectedApiReturnValueError
+
+
+class VppAclPlugin(VppObject):
+
+ def __init__(self, test, enable_intf_counters=False):
+ self._test = test
+ self.enable_intf_counters = enable_intf_counters
+
+ @property
+ def enable_intf_counters(self):
+ return self._enable_intf_counters
+
+ @enable_intf_counters.setter
+ def enable_intf_counters(self, enable):
+ self.vapi.acl_stats_intf_counters_enable(enable=enable)
+
+ def add_vpp_config(self):
+ pass
+
+ def remove_vpp_config(self):
+ pass
+
+ def query_vpp_config(self):
+ pass
+
+ def object_id(self):
+ return ("acl-plugin-%d" % (self._sw_if_index))
+
+
+class AclRule():
+ """ ACL Rule """
+
+ # port ranges
+ PORTS_ALL = -1
+ PORTS_RANGE = 0
+ PORTS_RANGE_2 = 1
+ udp_sport_from = 10
+ udp_sport_to = udp_sport_from + 5
+ udp_dport_from = 20000
+ udp_dport_to = udp_dport_from + 5000
+ tcp_sport_from = 30
+ tcp_sport_to = tcp_sport_from + 5
+ tcp_dport_from = 40000
+ tcp_dport_to = tcp_dport_from + 5000
+
+ udp_sport_from_2 = 90
+ udp_sport_to_2 = udp_sport_from_2 + 5
+ udp_dport_from_2 = 30000
+ udp_dport_to_2 = udp_dport_from_2 + 5000
+ tcp_sport_from_2 = 130
+ tcp_sport_to_2 = tcp_sport_from_2 + 5
+ tcp_dport_from_2 = 20000
+ tcp_dport_to_2 = tcp_dport_from_2 + 5000
+
+ icmp4_type = 8 # echo request
+ icmp4_code = 3
+ icmp6_type = 128 # echo request
+ icmp6_code = 3
+
+ icmp4_type_2 = 8
+ icmp4_code_from_2 = 5
+ icmp4_code_to_2 = 20
+ icmp6_type_2 = 128
+ icmp6_code_from_2 = 8
+ icmp6_code_to_2 = 42
+
+ def __init__(self, is_permit, src_prefix=IPv4Network('0.0.0.0/0'),
+ dst_prefix=IPv4Network('0.0.0.0/0'),
+ proto=0, ports=PORTS_ALL, sport_from=None, sport_to=None,
+ dport_from=None, dport_to=None):
+ self.is_permit = is_permit
+ self.src_prefix = src_prefix
+ self.dst_prefix = dst_prefix
+ self._proto = proto
+ self._ports = ports
+ # assign ports by range
+ self.update_ports()
+ # assign specified ports
+ if sport_from:
+ self.sport_from = sport_from
+ if sport_to:
+ self.sport_to = sport_to
+ if dport_from:
+ self.dport_from = dport_from
+ if dport_to:
+ self.dport_to = dport_to
+
+ def __copy__(self):
+ new_rule = AclRule(self.is_permit, self.src_prefix, self.dst_prefix,
+ self._proto, self._ports, self.sport_from,
+ self.sport_to, self.dport_from, self.dport_to)
+ return new_rule
+
+ def update_ports(self):
+ if self._ports == self.PORTS_ALL:
+ self.sport_from = 0
+ self.dport_from = 0
+ self.sport_to = 65535
+ if self._proto == 1 or self._proto == 58:
+ self.sport_to = 255
+ self.dport_to = self.sport_to
+ elif self._ports == self.PORTS_RANGE:
+ if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
+ self.sport_from = self.icmp4_type
+ self.sport_to = self.icmp4_type
+ self.dport_from = self.icmp4_code
+ self.dport_to = self.icmp4_code
+ elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
+ self.sport_from = self.icmp6_type
+ self.sport_to = self.icmp6_type
+ self.dport_from = self.icmp6_code
+ self.dport_to = self.icmp6_code
+ elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
+ self.sport_from = self.tcp_sport_from
+ self.sport_to = self.tcp_sport_to
+ self.dport_from = self.tcp_dport_from
+ self.dport_to = self.tcp_dport_to
+ elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
+ self.sport_from = self.udp_sport_from
+ self.sport_to = self.udp_sport_to
+ self.dport_from = self.udp_dport_from
+ self.dport_to = self.udp_dport_to
+ elif self._ports == self.PORTS_RANGE_2:
+ if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
+ self.sport_from = self.icmp4_type_2
+ self.sport_to = self.icmp4_type_2
+ self.dport_from = self.icmp4_code_from_2
+ self.dport_to = self.icmp4_code_to_2
+ elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
+ self.sport_from = self.icmp6_type_2
+ self.sport_to = self.icmp6_type_2
+ self.dport_from = self.icmp6_code_from_2
+ self.dport_to = self.icmp6_code_to_2
+ elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
+ self.sport_from = self.tcp_sport_from_2
+ self.sport_to = self.tcp_sport_to_2
+ self.dport_from = self.tcp_dport_from_2
+ self.dport_to = self.tcp_dport_to_2
+ elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
+ self.sport_from = self.udp_sport_from_2
+ self.sport_to = self.udp_sport_to_2
+ self.dport_from = self.udp_dport_from_2
+ self.dport_to = self.udp_dport_to_2
+ else:
+ self.sport_from = self._ports
+ self.sport_to = self._ports
+ self.dport_from = self._ports
+ self.dport_to = self._ports
+
+ @property
+ def proto(self):
+ return self._proto
+
+ @proto.setter
+ def proto(self, proto):
+ self._proto = proto
+ self.update_ports()
+
+ @property
+ def ports(self):
+ return self._ports
+
+ @ports.setter
+ def ports(self, ports):
+ self._ports = ports
+ self.update_ports()
+
+ def encode(self):
+ return {'is_permit': self.is_permit, 'proto': self.proto,
+ 'srcport_or_icmptype_first': self.sport_from,
+ 'srcport_or_icmptype_last': self.sport_to,
+ 'src_prefix': self.src_prefix,
+ 'dstport_or_icmpcode_first': self.dport_from,
+ 'dstport_or_icmpcode_last': self.dport_to,
+ 'dst_prefix': self.dst_prefix}
+
+
+class VppAcl(VppObject):
+ """ VPP ACL """
+
+ def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
+ self._test = test
+ self._acl_index = acl_index
+ self.tag = tag
+ self._rules = rules
+
+ @property
+ def rules(self):
+ return self._rules
+
+ @property
+ def acl_index(self):
+ return self._acl_index
+
+ @property
+ def count(self):
+ return len(self._rules)
+
+ def encode_rules(self):
+ rules = []
+ for rule in self._rules:
+ rules.append(rule.encode())
+ return rules
+
+ def add_vpp_config(self, expect_error=False):
+ try:
+ reply = self._test.vapi.acl_add_replace(
+ acl_index=self._acl_index, tag=self.tag, count=self.count,
+ r=self.encode_rules())
+ self._acl_index = reply.acl_index
+ self._test.registry.register(self, self._test.logger)
+ if expect_error:
+ self._test.fail("Unexpected api reply")
+ return self
+ except UnexpectedApiReturnValueError:
+ if not expect_error:
+ self._test.fail("Unexpected api reply")
+ return None
+
+ def modify_vpp_config(self, rules):
+ self._rules = rules
+ self.add_vpp_config()
+
+ def remove_vpp_config(self, expect_error=False):
+ try:
+ self._test.vapi.acl_del(acl_index=self._acl_index)
+ if expect_error:
+ self._test.fail("Unexpected api reply")
+ except UnexpectedApiReturnValueError:
+ if not expect_error:
+ self._test.fail("Unexpected api reply")
+
+ def dump(self):
+ return self._test.vapi.acl_dump(acl_index=self._acl_index)
+
+ def query_vpp_config(self):
+ dump = self.dump()
+ for rule in dump:
+ if rule.acl_index == self._acl_index:
+ return True
+ return False
+
+ def object_id(self):
+ return ("acl-%s-%d" % (self.tag, self._acl_index))
+
+
+class VppEtypeWhitelist(VppObject):
+ """ VPP Etype Whitelist """
+
+ def __init__(self, test, sw_if_index, whitelist, n_input=0):
+ self._test = test
+ self.whitelist = whitelist
+ self.n_input = n_input
+ self._sw_if_index = sw_if_index
+
+ @property
+ def sw_if_index(self):
+ return self._sw_if_index
+
+ @property
+ def count(self):
+ return len(self.whitelist)
+
+ def add_vpp_config(self):
+ self._test.vapi.acl_interface_set_etype_whitelist(
+ sw_if_index=self._sw_if_index, count=self.count,
+ n_input=self.n_input, whitelist=self.whitelist)
+ self._test.registry.register(self, self._test.logger)
+ return self
+
+ def remove_vpp_config(self):
+ self._test.vapi.acl_interface_set_etype_whitelist(
+ sw_if_index=self._sw_if_index, count=0, n_input=0, whitelist=[])
+
+ def query_vpp_config(self):
+ self._test.vapi.acl_interface_etype_whitelist_dump(
+ sw_if_index=self._sw_if_index)
+ return False
+
+ def object_id(self):
+ return ("acl-etype_wl-%d" % (self._sw_if_index))
+
+
+class VppAclInterface(VppObject):
+ """ VPP ACL Interface """
+
+ def __init__(self, test, sw_if_index, acls, n_input=0):
+ self._test = test
+ self._sw_if_index = sw_if_index
+ self.n_input = n_input
+ self.acls = acls
+
+ @property
+ def sw_if_index(self):
+ return self._sw_if_index
+
+ @property
+ def count(self):
+ return len(self.acls)
+
+ def encode_acls(self):
+ acls = []
+ for acl in self.acls:
+ acls.append(acl.acl_index)
+ return acls
+
+ def add_vpp_config(self, expect_error=False):
+ try:
+ reply = self._test.vapi.acl_interface_set_acl_list(
+ sw_if_index=self._sw_if_index, n_input=self.n_input,
+ count=self.count, acls=self.encode_acls())
+ self._test.registry.register(self, self._test.logger)
+ if expect_error:
+ self._test.fail("Unexpected api reply")
+ return self
+ except UnexpectedApiReturnValueError:
+ if not expect_error:
+ self._test.fail("Unexpected api reply")
+ return None
+
+ def remove_vpp_config(self, expect_error=False):
+ try:
+ reply = self._test.vapi.acl_interface_set_acl_list(
+ sw_if_index=self._sw_if_index, n_input=0, count=0, acls=[])
+ if expect_error:
+ self._test.fail("Unexpected api reply")
+ except UnexpectedApiReturnValueError:
+ if not expect_error:
+ self._test.fail("Unexpected api reply")
+
+ def query_vpp_config(self):
+ dump = self._test.vapi.acl_interface_list_dump(
+ sw_if_index=self._sw_if_index)
+ for acl_list in dump:
+ if acl_list.count > 0:
+ return True
+ return False
+
+ def object_id(self):
+ return ("acl-if-list-%d" % (self._sw_if_index))
+
+
+class MacipRule():
+ """ Mac Ip rule """
+
+ def __init__(self, is_permit, src_mac=0, src_mac_mask=0,
+ src_prefix=IPv4Network('0.0.0.0/0')):
+ self.is_permit = is_permit
+ self.src_mac = src_mac
+ self.src_mac_mask = src_mac_mask
+ self.src_prefix = src_prefix
+
+ def encode(self):
+ return {'is_permit': self.is_permit, 'src_mac': self.src_mac,
+ 'src_mac_mask': self.src_mac_mask,
+ 'src_prefix': self.src_prefix}
+
+
+class VppMacipAcl(VppObject):
+ """ Vpp Mac Ip ACL """
+
+ def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
+ self._test = test
+ self._acl_index = acl_index
+ self.tag = tag
+ self._rules = rules
+
+ @property
+ def acl_index(self):
+ return self._acl_index
+
+ @property
+ def rules(self):
+ return self._rules
+
+ @property
+ def count(self):
+ return len(self._rules)
+
+ def encode_rules(self):
+ rules = []
+ for rule in self._rules:
+ rules.append(rule.encode())
+ return rules
+
+ def add_vpp_config(self, expect_error=False):
+ try:
+ reply = self._test.vapi.macip_acl_add_replace(
+ acl_index=self._acl_index, tag=self.tag, count=self.count,
+ r=self.encode_rules())
+ self._acl_index = reply.acl_index
+ self._test.registry.register(self, self._test.logger)
+ if expect_error:
+ self._test.fail("Unexpected api reply")
+ return self
+ except UnexpectedApiReturnValueError:
+ if not expect_error:
+ self._test.fail("Unexpected api reply")
+ return None
+
+ def modify_vpp_config(self, rules):
+ self._rules = rules
+ self.add_vpp_config()
+
+ def remove_vpp_config(self, expect_error=False):
+ try:
+ self._test.vapi.macip_acl_del(acl_index=self._acl_index)
+ if expect_error:
+ self._test.fail("Unexpected api reply")
+ except UnexpectedApiReturnValueError:
+ if not expect_error:
+ self._test.fail("Unexpected api reply")
+
+ def dump(self):
+ return self._test.vapi.macip_acl_dump(acl_index=self._acl_index)
+
+ def query_vpp_config(self):
+ dump = self.dump()
+ for rule in dump:
+ if rule.acl_index == self._acl_index:
+ return True
+ return False
+
+ def object_id(self):
+ return ("macip-acl-%s-%d" % (self.tag, self._acl_index))
+
+
+class VppMacipAclInterface(VppObject):
+ """ VPP Mac Ip ACL Interface """
+
+ def __init__(self, test, sw_if_index, acls):
+ self._test = test
+ self._sw_if_index = sw_if_index
+ self.acls = acls
+
+ @property
+ def sw_if_index(self):
+ return self._sw_if_index
+
+ @property
+ def count(self):
+ return len(self.acls)
+
+ def add_vpp_config(self):
+ for acl in self.acls:
+ self._test.vapi.macip_acl_interface_add_del(
+ is_add=True, sw_if_index=self._sw_if_index,
+ acl_index=acl.acl_index)
+ self._test.registry.register(self, self._test.logger)
+
+ def remove_vpp_config(self):
+ for acl in self.acls:
+ self._test.vapi.macip_acl_interface_add_del(
+ is_add=False, sw_if_index=self._sw_if_index,
+ acl_index=acl.acl_index)
+
+ def dump(self):
+ return self._test.vapi.macip_acl_interface_list_dump(
+ sw_if_index=self._sw_if_index)
+
+ def query_vpp_config(self):
+ dump = self.dump()
+ for acl_list in dump:
+ for acl_index in acl_list.acls:
+ if acl_index != INVALID_INDEX:
+ return True
+ return False
+
+ def object_id(self):
+ return ("macip-acl-if-list-%d" % (self._sw_if_index))