summaryrefslogtreecommitdiffstats
path: root/src/plugins/acl/test/test_acl_plugin.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins/acl/test/test_acl_plugin.py')
-rw-r--r--src/plugins/acl/test/test_acl_plugin.py363
1 files changed, 140 insertions, 223 deletions
diff --git a/src/plugins/acl/test/test_acl_plugin.py b/src/plugins/acl/test/test_acl_plugin.py
index f07d37548fe..d5e195fe44b 100644
--- a/src/plugins/acl/test/test_acl_plugin.py
+++ b/src/plugins/acl/test/test_acl_plugin.py
@@ -12,8 +12,11 @@ from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
from scapy.layers.inet6 import IPv6ExtHdrFragment
from framework import VppTestCase, VppTestRunner
from util import Host, ppp
+from ipaddress import IPv4Network, IPv6Network
from vpp_lo_interface import VppLoInterface
+from vpp_acl import AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist
+from vpp_ip import INVALID_INDEX
class TestACLplugin(VppTestCase):
@@ -175,105 +178,49 @@ class TestACLplugin(VppTestCase):
% self.bd_id))
def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
- s_prefix=0, s_ip=b'\x00\x00\x00\x00',
- d_prefix=0, d_ip=b'\x00\x00\x00\x00'):
- if proto == -1:
- return
- if ports == self.PORTS_ALL:
- sport_from = 0
- dport_from = 0
- sport_to = 65535 if proto != 1 and proto != 58 else 255
- dport_to = sport_to
- elif ports == self.PORTS_RANGE:
- if proto == 1:
- sport_from = self.icmp4_type
- sport_to = self.icmp4_type
- dport_from = self.icmp4_code
- dport_to = self.icmp4_code
- elif proto == 58:
- sport_from = self.icmp6_type
- sport_to = self.icmp6_type
- dport_from = self.icmp6_code
- dport_to = self.icmp6_code
- elif proto == self.proto[self.IP][self.TCP]:
- sport_from = self.tcp_sport_from
- sport_to = self.tcp_sport_to
- dport_from = self.tcp_dport_from
- dport_to = self.tcp_dport_to
- elif proto == self.proto[self.IP][self.UDP]:
- sport_from = self.udp_sport_from
- sport_to = self.udp_sport_to
- dport_from = self.udp_dport_from
- dport_to = self.udp_dport_to
- elif ports == self.PORTS_RANGE_2:
- if proto == 1:
- sport_from = self.icmp4_type_2
- sport_to = self.icmp4_type_2
- dport_from = self.icmp4_code_from_2
- dport_to = self.icmp4_code_to_2
- elif proto == 58:
- sport_from = self.icmp6_type_2
- sport_to = self.icmp6_type_2
- dport_from = self.icmp6_code_from_2
- dport_to = self.icmp6_code_to_2
- elif proto == self.proto[self.IP][self.TCP]:
- sport_from = self.tcp_sport_from_2
- sport_to = self.tcp_sport_to_2
- dport_from = self.tcp_dport_from_2
- dport_to = self.tcp_dport_to_2
- elif proto == self.proto[self.IP][self.UDP]:
- sport_from = self.udp_sport_from_2
- sport_to = self.udp_sport_to_2
- dport_from = self.udp_dport_from_2
- dport_to = self.udp_dport_to_2
+ s_prefix=0, s_ip=0,
+ d_prefix=0, d_ip=0):
+ if ip:
+ src_prefix = IPv6Network((s_ip, s_prefix))
+ dst_prefix = IPv6Network((d_ip, d_prefix))
else:
- sport_from = ports
- sport_to = ports
- dport_from = ports
- dport_to = ports
-
- rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto,
- 'srcport_or_icmptype_first': sport_from,
- 'srcport_or_icmptype_last': sport_to,
- 'src_ip_prefix_len': s_prefix,
- 'src_ip_addr': s_ip,
- 'dstport_or_icmpcode_first': dport_from,
- 'dstport_or_icmpcode_last': dport_to,
- 'dst_ip_prefix_len': d_prefix,
- 'dst_ip_addr': d_ip})
- return rule
-
- def apply_rules(self, rules, tag=b''):
- reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
- tag=tag)
- self.logger.info("Dumped ACL: " + str(
- self.vapi.acl_dump(reply.acl_index)))
+ src_prefix = IPv4Network((s_ip, s_prefix))
+ dst_prefix = IPv4Network((d_ip, d_prefix))
+ return AclRule(is_permit=permit_deny, ports=ports, proto=proto,
+ src_prefix=src_prefix, dst_prefix=dst_prefix)
+
+ def apply_rules(self, rules, tag=None):
+ acl = VppAcl(self, rules, tag=tag)
+ acl.add_vpp_config()
+ self.logger.info("Dumped ACL: " + str(acl.dump()))
# Apply a ACL on the interface as inbound
for i in self.pg_interfaces:
- self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index,
- n_input=1,
- acls=[reply.acl_index])
- return reply.acl_index
-
- def apply_rules_to(self, rules, tag=b'', sw_if_index=0xFFFFFFFF):
- reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules,
- tag=tag)
- self.logger.info("Dumped ACL: " + str(
- self.vapi.acl_dump(reply.acl_index)))
+ acl_if = VppAclInterface(
+ self, sw_if_index=i.sw_if_index, n_input=1, acls=[acl])
+ acl_if.add_vpp_config()
+ return acl.acl_index
+
+ def apply_rules_to(self, rules, tag=None, sw_if_index=INVALID_INDEX):
+ acl = VppAcl(self, rules, tag=tag)
+ acl.add_vpp_config()
+ self.logger.info("Dumped ACL: " + str(acl.dump()))
# Apply a ACL on the interface as inbound
- self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
- n_input=1,
- acls=[reply.acl_index])
- return reply.acl_index
+ acl_if = VppAclInterface(self, sw_if_index=sw_if_index, n_input=1,
+ acls=[acl])
+ return acl.acl_index
- def etype_whitelist(self, whitelist, n_input):
+ def etype_whitelist(self, whitelist, n_input, add=True):
# Apply whitelists on all the interfaces
- for i in self.pg_interfaces:
- # checkstyle can't read long names. Help them.
- fun = self.vapi.acl_interface_set_etype_whitelist
- fun(sw_if_index=i.sw_if_index, n_input=n_input,
- whitelist=whitelist)
- return
+ if add:
+ self._wl = []
+ for i in self.pg_interfaces:
+ self._wl.append(VppEtypeWhitelist(
+ self, sw_if_index=i.sw_if_index, whitelist=whitelist,
+ n_input=n_input).add_vpp_config())
+ else:
+ if hasattr(self, "_wl"):
+ for wl in self._wl:
+ wl.remove_vpp_config()
def create_upper_layer(self, packet_index, proto, ports=0):
p = self.proto_map[proto]
@@ -542,24 +489,15 @@ class TestACLplugin(VppTestCase):
"""
self.logger.info("ACLP_TEST_START_0001")
- # Add an ACL
- r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
- 'srcport_or_icmptype_first': 1234,
- 'srcport_or_icmptype_last': 1235,
- 'src_ip_prefix_len': 0,
- 'src_ip_addr': b'\x00\x00\x00\x00',
- 'dstport_or_icmpcode_first': 1234,
- 'dstport_or_icmpcode_last': 1234,
- 'dst_ip_addr': b'\x00\x00\x00\x00',
- 'dst_ip_prefix_len': 0}]
+ # Create a permit-1234 ACL
+ r = [AclRule(is_permit=1, proto=17, ports=1234, sport_to=1235)]
# Test 1: add a new ACL
- reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r,
- tag=b"permit 1234")
- self.assertEqual(reply.retval, 0)
+ first_acl = VppAcl(self, rules=r, tag="permit 1234")
+ first_acl.add_vpp_config()
+ self.assertTrue(first_acl.query_vpp_config())
# The very first ACL gets #0
- self.assertEqual(reply.acl_index, 0)
- first_acl = reply.acl_index
- rr = self.vapi.acl_dump(reply.acl_index)
+ self.assertEqual(first_acl.acl_index, 0)
+ rr = first_acl.dump()
self.logger.info("Dumped ACL: " + str(rr))
self.assertEqual(len(rr), 1)
# We should have the same number of ACL entries as we had asked
@@ -568,70 +506,49 @@ class TestACLplugin(VppTestCase):
# are different types, we need to iterate over rules and keys to get
# to basic values.
for i_rule in range(0, len(r) - 1):
- for rule_key in r[i_rule]:
+ encoded_rule = r[i_rule].encode()
+ for rule_key in encoded_rule:
self.assertEqual(rr[0].r[i_rule][rule_key],
- r[i_rule][rule_key])
-
- # Add a deny-1234 ACL
- r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17,
- 'srcport_or_icmptype_first': 1234,
- 'srcport_or_icmptype_last': 1235,
- 'src_ip_prefix_len': 0,
- 'src_ip_addr': b'\x00\x00\x00\x00',
- 'dstport_or_icmpcode_first': 1234,
- 'dstport_or_icmpcode_last': 1234,
- 'dst_ip_addr': b'\x00\x00\x00\x00',
- 'dst_ip_prefix_len': 0},
- {'is_permit': 1, 'is_ipv6': 0, 'proto': 17,
- 'srcport_or_icmptype_first': 0,
- 'srcport_or_icmptype_last': 0,
- 'src_ip_prefix_len': 0,
- 'src_ip_addr': b'\x00\x00\x00\x00',
- 'dstport_or_icmpcode_first': 0,
- 'dstport_or_icmpcode_last': 0,
- 'dst_ip_addr': b'\x00\x00\x00\x00',
- 'dst_ip_prefix_len': 0}]
-
- reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny,
- tag=b"deny 1234;permit all")
- self.assertEqual(reply.retval, 0)
+ encoded_rule[rule_key])
+
+ # Create a deny-1234 ACL
+ r_deny = [AclRule(is_permit=0, proto=17, ports=1234, sport_to=1235),
+ AclRule(is_permit=1, proto=17, ports=0)]
+ second_acl = VppAcl(self, rules=r_deny, tag="deny 1234;permit all")
+ second_acl.add_vpp_config()
+ self.assertTrue(second_acl.query_vpp_config())
# The second ACL gets #1
- self.assertEqual(reply.acl_index, 1)
- second_acl = reply.acl_index
+ self.assertEqual(second_acl.acl_index, 1)
# Test 2: try to modify a nonexistent ACL
- reply = self.vapi.acl_add_replace(acl_index=432, r=r,
- tag=b"FFFF:FFFF", expected_retval=-6)
- self.assertEqual(reply.retval, -6)
- # The ACL number should pass through
- self.assertEqual(reply.acl_index, 432)
+ invalid_acl = VppAcl(self, acl_index=432, rules=r, tag="FFFF:FFFF")
+ reply = invalid_acl.add_vpp_config(expect_error=True)
+
+ # apply an ACL on an interface inbound, try to delete ACL, must fail
+ acl_if_list = VppAclInterface(
+ self, sw_if_index=self.pg0.sw_if_index, n_input=1,
+ acls=[first_acl])
+ acl_if_list.add_vpp_config()
+ first_acl.remove_vpp_config(expect_error=True)
+ # Unapply an ACL and then try to delete it - must be ok
+ acl_if_list.remove_vpp_config()
+ first_acl.remove_vpp_config()
+
# apply an ACL on an interface inbound, try to delete ACL, must fail
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- n_input=1,
- acls=[first_acl])
- reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142)
+ acl_if_list = VppAclInterface(
+ self, sw_if_index=self.pg0.sw_if_index, n_input=0,
+ acls=[second_acl])
+ acl_if_list.add_vpp_config()
+ second_acl.remove_vpp_config(expect_error=True)
# Unapply an ACL and then try to delete it - must be ok
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- n_input=0,
- acls=[])
- reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0)
-
- # apply an ACL on an interface outbound, try to delete ACL, must fail
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- n_input=0,
- acls=[second_acl])
- reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143)
- # Unapply the ACL and then try to delete it - must be ok
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- n_input=0,
- acls=[])
- reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0)
+ acl_if_list.remove_vpp_config()
+ second_acl.remove_vpp_config()
# try to apply a nonexistent ACL - must fail
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- n_input=1,
- acls=[first_acl],
- expected_retval=-6)
+ acl_if_list = VppAclInterface(
+ self, sw_if_index=self.pg0.sw_if_index, n_input=0,
+ acls=[invalid_acl])
+ acl_if_list.add_vpp_config(expect_error=True)
self.logger.info("ACLP_TEST_FINISH_0001")
@@ -642,12 +559,12 @@ class TestACLplugin(VppTestCase):
rules = []
rules.append(self.create_rule(self.IPV4, self.PERMIT,
- 0, self.proto[self.IP][self.UDP]))
+ 0, self.proto[self.IP][self.UDP]))
rules.append(self.create_rule(self.IPV4, self.PERMIT,
- 0, self.proto[self.IP][self.TCP]))
+ 0, self.proto[self.IP][self.TCP]))
# Apply rules
- acl_idx = self.apply_rules(rules, b"permit per-flow")
+ acl_idx = self.apply_rules(rules, "permit per-flow")
# enable counters
reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
@@ -676,14 +593,15 @@ class TestACLplugin(VppTestCase):
self.logger.info("ACLP_TEST_START_0003")
# Add a deny-flows ACL
rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY,
- self.PORTS_ALL, self.proto[self.IP][self.UDP]))
+ rules.append(self.create_rule(
+ self.IPV4, self.DENY, self.PORTS_ALL,
+ self.proto[self.IP][self.UDP]))
# Permit ip any any in the end
rules.append(self.create_rule(self.IPV4, self.PERMIT,
self.PORTS_ALL, 0))
# Apply rules
- acl_idx = self.apply_rules(rules, b"deny per-flow;permit all")
+ acl_idx = self.apply_rules(rules, "deny per-flow;permit all")
# enable counters
reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
@@ -717,7 +635,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit icmpv4")
+ self.apply_rules(rules, "permit icmpv4")
# Traffic should still pass
self.run_verify_test(self.ICMP, self.IPV4,
@@ -738,7 +656,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit icmpv6")
+ self.apply_rules(rules, "permit icmpv6")
# Traffic should still pass
self.run_verify_test(self.ICMP, self.IPV6,
@@ -759,7 +677,7 @@ class TestACLplugin(VppTestCase):
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny icmpv4")
+ self.apply_rules(rules, "deny icmpv4")
# Traffic should not pass
self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
@@ -779,7 +697,7 @@ class TestACLplugin(VppTestCase):
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny icmpv6")
+ self.apply_rules(rules, "deny icmpv6")
# Traffic should not pass
self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
@@ -794,12 +712,12 @@ class TestACLplugin(VppTestCase):
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv4 tcp")
+ self.apply_rules(rules, "permit ipv4 tcp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
@@ -819,7 +737,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip6 tcp")
+ self.apply_rules(rules, "permit ip6 tcp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
@@ -839,7 +757,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv udp")
+ self.apply_rules(rules, "permit ipv udp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
@@ -859,7 +777,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip6 udp")
+ self.apply_rules(rules, "permit ip6 udp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
@@ -884,7 +802,7 @@ class TestACLplugin(VppTestCase):
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny ip4/ip6 tcp")
+ self.apply_rules(rules, "deny ip4/ip6 tcp")
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
@@ -910,7 +828,7 @@ class TestACLplugin(VppTestCase):
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny ip4/ip6 udp")
+ self.apply_rules(rules, "deny ip4/ip6 udp")
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
@@ -948,13 +866,13 @@ class TestACLplugin(VppTestCase):
for i in range(len(r)):
rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
- reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules)
- result = self.vapi.acl_dump(reply.acl_index)
+ acl = VppAcl(self, rules=rules)
+ acl.add_vpp_config()
+ result = acl.dump()
i = 0
for drules in result:
for dr in drules.r:
- self.assertEqual(dr.is_ipv6, r[i][0])
self.assertEqual(dr.is_permit, r[i][1])
self.assertEqual(dr.proto, r[i][3])
@@ -1001,7 +919,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip4 tcp %d" % port)
+ self.apply_rules(rules, "permit ip4 tcp %d" % port)
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4,
@@ -1023,7 +941,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip4 tcp %d" % port)
+ self.apply_rules(rules, "permit ip4 tcp %d" % port)
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4,
@@ -1045,7 +963,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip4 tcp %d" % port)
+ self.apply_rules(rules, "permit ip4 tcp %d" % port)
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6,
@@ -1054,7 +972,7 @@ class TestACLplugin(VppTestCase):
self.logger.info("ACLP_TEST_FINISH_0017")
def test_0018_udp_permit_port_v6(self):
- """ permit single UPPv6
+ """ permit single UDPv6
"""
self.logger.info("ACLP_TEST_START_0018")
@@ -1068,7 +986,7 @@ class TestACLplugin(VppTestCase):
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip4 tcp %d" % port)
+ self.apply_rules(rules, "permit ip4 tcp %d" % port)
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6,
@@ -1095,7 +1013,7 @@ class TestACLplugin(VppTestCase):
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
+ self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
@@ -1122,7 +1040,7 @@ class TestACLplugin(VppTestCase):
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
+ self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
@@ -1150,7 +1068,7 @@ class TestACLplugin(VppTestCase):
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port)
+ self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
@@ -1172,7 +1090,7 @@ class TestACLplugin(VppTestCase):
self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit empty udp ip4 %d" % port)
+ self.apply_rules(rules, "permit empty udp ip4 %d" % port)
# Traffic should still pass
# Create incoming packet streams for packet-generator interfaces
@@ -1206,7 +1124,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit empty udp ip6 %d" % port)
+ self.apply_rules(rules, "permit empty udp ip6 %d" % port)
# Traffic should still pass
# Create incoming packet streams for packet-generator interfaces
@@ -1236,14 +1154,14 @@ class TestACLplugin(VppTestCase):
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv4 tcp")
+ self.apply_rules(rules, "permit ipv4 tcp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
@@ -1265,7 +1183,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip6 tcp")
+ self.apply_rules(rules, "permit ip6 tcp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
@@ -1287,7 +1205,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv4 udp")
+ self.apply_rules(rules, "permit ipv4 udp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
@@ -1309,7 +1227,7 @@ class TestACLplugin(VppTestCase):
rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ip6 udp")
+ self.apply_rules(rules, "permit ip6 udp")
# Traffic should still pass
self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
@@ -1340,7 +1258,7 @@ class TestACLplugin(VppTestCase):
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny ip4/ip6 tcp")
+ self.apply_rules(rules, "deny ip4/ip6 tcp")
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
@@ -1372,7 +1290,7 @@ class TestACLplugin(VppTestCase):
self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"deny ip4/ip6 udp")
+ self.apply_rules(rules, "deny ip4/ip6 udp")
# Traffic should not pass
self.run_verify_negat_test(self.IP, self.IPRANDOM,
@@ -1388,14 +1306,14 @@ class TestACLplugin(VppTestCase):
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv4 tcp")
+ self.apply_rules(rules, "permit ipv4 tcp")
# Traffic should still pass also for an odd ethertype
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
@@ -1410,15 +1328,14 @@ class TestACLplugin(VppTestCase):
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv4 tcp")
-
+ self.apply_rules(rules, "permit ipv4 tcp")
# whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
self.etype_whitelist([0xbbb], 1)
@@ -1428,7 +1345,7 @@ class TestACLplugin(VppTestCase):
0, False, 0xaaaa)
# remove the whitelist
- self.etype_whitelist([], 0)
+ self.etype_whitelist([], 0, add=False)
self.logger.info("ACLP_TEST_FINISH_0305")
@@ -1440,15 +1357,14 @@ class TestACLplugin(VppTestCase):
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv4 tcp")
-
+ self.apply_rules(rules, "permit ipv4 tcp")
# whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
self.etype_whitelist([0xbbb], 1)
@@ -1457,7 +1373,7 @@ class TestACLplugin(VppTestCase):
0, False, True, 0x0bbb)
# remove the whitelist, the previously blocked 0xAAAA should pass now
- self.etype_whitelist([], 0)
+ self.etype_whitelist([], 0, add=False)
self.logger.info("ACLP_TEST_FINISH_0306")
@@ -1469,19 +1385,19 @@ class TestACLplugin(VppTestCase):
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
# Apply rules
- self.apply_rules(rules, b"permit ipv4 tcp")
+ self.apply_rules(rules, "permit ipv4 tcp")
# whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
self.etype_whitelist([0xbbb], 1)
# remove the whitelist, the previously blocked 0xAAAA should pass now
- self.etype_whitelist([], 0)
+ self.etype_whitelist([], 0, add=False)
# The whitelisted traffic, should pass
self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
@@ -1497,9 +1413,9 @@ class TestACLplugin(VppTestCase):
# Add an ACL
rules = []
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
+ self.proto[self.IP][self.TCP]))
# deny ip any any in the end
rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
@@ -1508,12 +1424,13 @@ class TestACLplugin(VppTestCase):
intf.append(VppLoInterface(self))
# Apply rules
- self.apply_rules_to(rules, b"permit ipv4 tcp", intf[0].sw_if_index)
+ self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
# Remove the interface
intf[0].remove_vpp_config()
self.logger.info("ACLP_TEST_FINISH_0315")
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)