From 2f8cd914514fe54f91974c6d465d4769dfac8de8 Mon Sep 17 00:00:00 2001 From: Jakub Grajciar Date: Fri, 27 Mar 2020 06:55:06 +0100 Subject: acl: API cleanup Use consistent API types. Type: fix Signed-off-by: Jakub Grajciar Change-Id: I09fa6c1b6917936351bd376b56c414ce24488095 Signed-off-by: Jakub Grajciar --- src/plugins/acl/test/test_acl_plugin.py | 363 ++++++++------------ src/plugins/acl/test/test_acl_plugin_conns.py | 86 +++-- src/plugins/acl/test/test_acl_plugin_l2l3.py | 213 ++++++------ src/plugins/acl/test/test_acl_plugin_macip.py | 191 +++++------ src/plugins/acl/test/test_classify_l2_acl.py | 1 + src/plugins/acl/test/vpp_acl.py | 476 ++++++++++++++++++++++++++ 6 files changed, 847 insertions(+), 483 deletions(-) create mode 100644 src/plugins/acl/test/vpp_acl.py (limited to 'src/plugins/acl/test') diff --git a/src/plugins/acl/test/test_acl_plugin.py b/src/plugins/acl/test/test_acl_plugin.py index f07d37548fe..d5e195fe44b 100644 --- a/src/plugins/acl/test/test_acl_plugin.py +++ b/src/plugins/acl/test/test_acl_plugin.py @@ -12,8 +12,11 @@ from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest from scapy.layers.inet6 import IPv6ExtHdrFragment from framework import VppTestCase, VppTestRunner from util import Host, ppp +from ipaddress import IPv4Network, IPv6Network from vpp_lo_interface import VppLoInterface +from vpp_acl import AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist +from vpp_ip import INVALID_INDEX class TestACLplugin(VppTestCase): @@ -175,105 +178,49 @@ class TestACLplugin(VppTestCase): % self.bd_id)) def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1, - s_prefix=0, s_ip=b'\x00\x00\x00\x00', - d_prefix=0, d_ip=b'\x00\x00\x00\x00'): - if proto == -1: - return - if ports == self.PORTS_ALL: - sport_from = 0 - dport_from = 0 - sport_to = 65535 if proto != 1 and proto != 58 else 255 - dport_to = sport_to - elif ports == self.PORTS_RANGE: - if proto == 1: - sport_from = self.icmp4_type - sport_to = self.icmp4_type - dport_from = self.icmp4_code - dport_to = self.icmp4_code - elif proto == 58: - sport_from = self.icmp6_type - sport_to = self.icmp6_type - dport_from = self.icmp6_code - dport_to = self.icmp6_code - elif proto == self.proto[self.IP][self.TCP]: - sport_from = self.tcp_sport_from - sport_to = self.tcp_sport_to - dport_from = self.tcp_dport_from - dport_to = self.tcp_dport_to - elif proto == self.proto[self.IP][self.UDP]: - sport_from = self.udp_sport_from - sport_to = self.udp_sport_to - dport_from = self.udp_dport_from - dport_to = self.udp_dport_to - elif ports == self.PORTS_RANGE_2: - if proto == 1: - sport_from = self.icmp4_type_2 - sport_to = self.icmp4_type_2 - dport_from = self.icmp4_code_from_2 - dport_to = self.icmp4_code_to_2 - elif proto == 58: - sport_from = self.icmp6_type_2 - sport_to = self.icmp6_type_2 - dport_from = self.icmp6_code_from_2 - dport_to = self.icmp6_code_to_2 - elif proto == self.proto[self.IP][self.TCP]: - sport_from = self.tcp_sport_from_2 - sport_to = self.tcp_sport_to_2 - dport_from = self.tcp_dport_from_2 - dport_to = self.tcp_dport_to_2 - elif proto == self.proto[self.IP][self.UDP]: - sport_from = self.udp_sport_from_2 - sport_to = self.udp_sport_to_2 - dport_from = self.udp_dport_from_2 - dport_to = self.udp_dport_to_2 + s_prefix=0, s_ip=0, + d_prefix=0, d_ip=0): + if ip: + src_prefix = IPv6Network((s_ip, s_prefix)) + dst_prefix = IPv6Network((d_ip, d_prefix)) else: - sport_from = ports - sport_to = ports - dport_from = ports - dport_to = ports - - rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto, - 'srcport_or_icmptype_first': sport_from, - 'srcport_or_icmptype_last': sport_to, - 'src_ip_prefix_len': s_prefix, - 'src_ip_addr': s_ip, - 'dstport_or_icmpcode_first': dport_from, - 'dstport_or_icmpcode_last': dport_to, - 'dst_ip_prefix_len': d_prefix, - 'dst_ip_addr': d_ip}) - return rule - - def apply_rules(self, rules, tag=b''): - reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules, - tag=tag) - self.logger.info("Dumped ACL: " + str( - self.vapi.acl_dump(reply.acl_index))) + src_prefix = IPv4Network((s_ip, s_prefix)) + dst_prefix = IPv4Network((d_ip, d_prefix)) + return AclRule(is_permit=permit_deny, ports=ports, proto=proto, + src_prefix=src_prefix, dst_prefix=dst_prefix) + + def apply_rules(self, rules, tag=None): + acl = VppAcl(self, rules, tag=tag) + acl.add_vpp_config() + self.logger.info("Dumped ACL: " + str(acl.dump())) # Apply a ACL on the interface as inbound for i in self.pg_interfaces: - self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index, - n_input=1, - acls=[reply.acl_index]) - return reply.acl_index - - def apply_rules_to(self, rules, tag=b'', sw_if_index=0xFFFFFFFF): - reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules, - tag=tag) - self.logger.info("Dumped ACL: " + str( - self.vapi.acl_dump(reply.acl_index))) + acl_if = VppAclInterface( + self, sw_if_index=i.sw_if_index, n_input=1, acls=[acl]) + acl_if.add_vpp_config() + return acl.acl_index + + def apply_rules_to(self, rules, tag=None, sw_if_index=INVALID_INDEX): + acl = VppAcl(self, rules, tag=tag) + acl.add_vpp_config() + self.logger.info("Dumped ACL: " + str(acl.dump())) # Apply a ACL on the interface as inbound - self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index, - n_input=1, - acls=[reply.acl_index]) - return reply.acl_index + acl_if = VppAclInterface(self, sw_if_index=sw_if_index, n_input=1, + acls=[acl]) + return acl.acl_index - def etype_whitelist(self, whitelist, n_input): + def etype_whitelist(self, whitelist, n_input, add=True): # Apply whitelists on all the interfaces - for i in self.pg_interfaces: - # checkstyle can't read long names. Help them. - fun = self.vapi.acl_interface_set_etype_whitelist - fun(sw_if_index=i.sw_if_index, n_input=n_input, - whitelist=whitelist) - return + if add: + self._wl = [] + for i in self.pg_interfaces: + self._wl.append(VppEtypeWhitelist( + self, sw_if_index=i.sw_if_index, whitelist=whitelist, + n_input=n_input).add_vpp_config()) + else: + if hasattr(self, "_wl"): + for wl in self._wl: + wl.remove_vpp_config() def create_upper_layer(self, packet_index, proto, ports=0): p = self.proto_map[proto] @@ -542,24 +489,15 @@ class TestACLplugin(VppTestCase): """ self.logger.info("ACLP_TEST_START_0001") - # Add an ACL - r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17, - 'srcport_or_icmptype_first': 1234, - 'srcport_or_icmptype_last': 1235, - 'src_ip_prefix_len': 0, - 'src_ip_addr': b'\x00\x00\x00\x00', - 'dstport_or_icmpcode_first': 1234, - 'dstport_or_icmpcode_last': 1234, - 'dst_ip_addr': b'\x00\x00\x00\x00', - 'dst_ip_prefix_len': 0}] + # Create a permit-1234 ACL + r = [AclRule(is_permit=1, proto=17, ports=1234, sport_to=1235)] # Test 1: add a new ACL - reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r, - tag=b"permit 1234") - self.assertEqual(reply.retval, 0) + first_acl = VppAcl(self, rules=r, tag="permit 1234") + first_acl.add_vpp_config() + self.assertTrue(first_acl.query_vpp_config()) # The very first ACL gets #0 - self.assertEqual(reply.acl_index, 0) - first_acl = reply.acl_index - rr = self.vapi.acl_dump(reply.acl_index) + self.assertEqual(first_acl.acl_index, 0) + rr = first_acl.dump() self.logger.info("Dumped ACL: " + str(rr)) self.assertEqual(len(rr), 1) # We should have the same number of ACL entries as we had asked @@ -568,70 +506,49 @@ class TestACLplugin(VppTestCase): # are different types, we need to iterate over rules and keys to get # to basic values. for i_rule in range(0, len(r) - 1): - for rule_key in r[i_rule]: + encoded_rule = r[i_rule].encode() + for rule_key in encoded_rule: self.assertEqual(rr[0].r[i_rule][rule_key], - r[i_rule][rule_key]) - - # Add a deny-1234 ACL - r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17, - 'srcport_or_icmptype_first': 1234, - 'srcport_or_icmptype_last': 1235, - 'src_ip_prefix_len': 0, - 'src_ip_addr': b'\x00\x00\x00\x00', - 'dstport_or_icmpcode_first': 1234, - 'dstport_or_icmpcode_last': 1234, - 'dst_ip_addr': b'\x00\x00\x00\x00', - 'dst_ip_prefix_len': 0}, - {'is_permit': 1, 'is_ipv6': 0, 'proto': 17, - 'srcport_or_icmptype_first': 0, - 'srcport_or_icmptype_last': 0, - 'src_ip_prefix_len': 0, - 'src_ip_addr': b'\x00\x00\x00\x00', - 'dstport_or_icmpcode_first': 0, - 'dstport_or_icmpcode_last': 0, - 'dst_ip_addr': b'\x00\x00\x00\x00', - 'dst_ip_prefix_len': 0}] - - reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny, - tag=b"deny 1234;permit all") - self.assertEqual(reply.retval, 0) + encoded_rule[rule_key]) + + # Create a deny-1234 ACL + r_deny = [AclRule(is_permit=0, proto=17, ports=1234, sport_to=1235), + AclRule(is_permit=1, proto=17, ports=0)] + second_acl = VppAcl(self, rules=r_deny, tag="deny 1234;permit all") + second_acl.add_vpp_config() + self.assertTrue(second_acl.query_vpp_config()) # The second ACL gets #1 - self.assertEqual(reply.acl_index, 1) - second_acl = reply.acl_index + self.assertEqual(second_acl.acl_index, 1) # Test 2: try to modify a nonexistent ACL - reply = self.vapi.acl_add_replace(acl_index=432, r=r, - tag=b"FFFF:FFFF", expected_retval=-6) - self.assertEqual(reply.retval, -6) - # The ACL number should pass through - self.assertEqual(reply.acl_index, 432) + invalid_acl = VppAcl(self, acl_index=432, rules=r, tag="FFFF:FFFF") + reply = invalid_acl.add_vpp_config(expect_error=True) + + # apply an ACL on an interface inbound, try to delete ACL, must fail + acl_if_list = VppAclInterface( + self, sw_if_index=self.pg0.sw_if_index, n_input=1, + acls=[first_acl]) + acl_if_list.add_vpp_config() + first_acl.remove_vpp_config(expect_error=True) + # Unapply an ACL and then try to delete it - must be ok + acl_if_list.remove_vpp_config() + first_acl.remove_vpp_config() + # apply an ACL on an interface inbound, try to delete ACL, must fail - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, - n_input=1, - acls=[first_acl]) - reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142) + acl_if_list = VppAclInterface( + self, sw_if_index=self.pg0.sw_if_index, n_input=0, + acls=[second_acl]) + acl_if_list.add_vpp_config() + second_acl.remove_vpp_config(expect_error=True) # Unapply an ACL and then try to delete it - must be ok - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, - n_input=0, - acls=[]) - reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0) - - # apply an ACL on an interface outbound, try to delete ACL, must fail - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, - n_input=0, - acls=[second_acl]) - reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143) - # Unapply the ACL and then try to delete it - must be ok - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, - n_input=0, - acls=[]) - reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0) + acl_if_list.remove_vpp_config() + second_acl.remove_vpp_config() # try to apply a nonexistent ACL - must fail - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, - n_input=1, - acls=[first_acl], - expected_retval=-6) + acl_if_list = VppAclInterface( + self, sw_if_index=self.pg0.sw_if_index, n_input=0, + acls=[invalid_acl]) + acl_if_list.add_vpp_config(expect_error=True) self.logger.info("ACLP_TEST_FINISH_0001") @@ -642,12 +559,12 @@ class TestACLplugin(VppTestCase): rules = [] rules.append(self.create_rule(self.IPV4, self.PERMIT, - 0, self.proto[self.IP][self.UDP])) + 0, self.proto[self.IP][self.UDP])) rules.append(self.create_rule(self.IPV4, self.PERMIT, - 0, self.proto[self.IP][self.TCP])) + 0, self.proto[self.IP][self.TCP])) # Apply rules - acl_idx = self.apply_rules(rules, b"permit per-flow") + acl_idx = self.apply_rules(rules, "permit per-flow") # enable counters reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1) @@ -676,14 +593,15 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_START_0003") # Add a deny-flows ACL rules = [] - rules.append(self.create_rule(self.IPV4, self.DENY, - self.PORTS_ALL, self.proto[self.IP][self.UDP])) + rules.append(self.create_rule( + self.IPV4, self.DENY, self.PORTS_ALL, + self.proto[self.IP][self.UDP])) # Permit ip any any in the end rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0)) # Apply rules - acl_idx = self.apply_rules(rules, b"deny per-flow;permit all") + acl_idx = self.apply_rules(rules, "deny per-flow;permit all") # enable counters reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1) @@ -717,7 +635,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"permit icmpv4") + self.apply_rules(rules, "permit icmpv4") # Traffic should still pass self.run_verify_test(self.ICMP, self.IPV4, @@ -738,7 +656,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"permit icmpv6") + self.apply_rules(rules, "permit icmpv6") # Traffic should still pass self.run_verify_test(self.ICMP, self.IPV6, @@ -759,7 +677,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"deny icmpv4") + self.apply_rules(rules, "deny icmpv4") # Traffic should not pass self.run_verify_negat_test(self.ICMP, self.IPV4, 0) @@ -779,7 +697,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"deny icmpv6") + self.apply_rules(rules, "deny icmpv6") # Traffic should not pass self.run_verify_negat_test(self.ICMP, self.IPV6, 0) @@ -794,12 +712,12 @@ class TestACLplugin(VppTestCase): # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"permit ipv4 tcp") + self.apply_rules(rules, "permit ipv4 tcp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP]) @@ -819,7 +737,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"permit ip6 tcp") + self.apply_rules(rules, "permit ip6 tcp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP]) @@ -839,7 +757,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"permit ipv udp") + self.apply_rules(rules, "permit ipv udp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP]) @@ -859,7 +777,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"permit ip6 udp") + self.apply_rules(rules, "permit ip6 udp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP]) @@ -884,7 +802,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"deny ip4/ip6 tcp") + self.apply_rules(rules, "deny ip4/ip6 tcp") # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -910,7 +828,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"deny ip4/ip6 udp") + self.apply_rules(rules, "deny ip4/ip6 udp") # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -948,13 +866,13 @@ class TestACLplugin(VppTestCase): for i in range(len(r)): rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3])) - reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules) - result = self.vapi.acl_dump(reply.acl_index) + acl = VppAcl(self, rules=rules) + acl.add_vpp_config() + result = acl.dump() i = 0 for drules in result: for dr in drules.r: - self.assertEqual(dr.is_ipv6, r[i][0]) self.assertEqual(dr.is_permit, r[i][1]) self.assertEqual(dr.proto, r[i][3]) @@ -1001,7 +919,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"permit ip4 tcp %d" % port) + self.apply_rules(rules, "permit ip4 tcp %d" % port) # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, @@ -1023,7 +941,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"permit ip4 tcp %d" % port) + self.apply_rules(rules, "permit ip4 tcp %d" % port) # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, @@ -1045,7 +963,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"permit ip4 tcp %d" % port) + self.apply_rules(rules, "permit ip4 tcp %d" % port) # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, @@ -1054,7 +972,7 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0017") def test_0018_udp_permit_port_v6(self): - """ permit single UPPv6 + """ permit single UDPv6 """ self.logger.info("ACLP_TEST_START_0018") @@ -1068,7 +986,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"permit ip4 tcp %d" % port) + self.apply_rules(rules, "permit ip4 tcp %d" % port) # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, @@ -1095,7 +1013,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port) + self.apply_rules(rules, "deny ip4/ip6 udp %d" % port) # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -1122,7 +1040,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port) + self.apply_rules(rules, "deny ip4/ip6 udp %d" % port) # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -1150,7 +1068,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port) + self.apply_rules(rules, "deny ip4/ip6 udp %d" % port) # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -1172,7 +1090,7 @@ class TestACLplugin(VppTestCase): self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"permit empty udp ip4 %d" % port) + self.apply_rules(rules, "permit empty udp ip4 %d" % port) # Traffic should still pass # Create incoming packet streams for packet-generator interfaces @@ -1206,7 +1124,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"permit empty udp ip6 %d" % port) + self.apply_rules(rules, "permit empty udp ip6 %d" % port) # Traffic should still pass # Create incoming packet streams for packet-generator interfaces @@ -1236,14 +1154,14 @@ class TestACLplugin(VppTestCase): # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"permit ipv4 tcp") + self.apply_rules(rules, "permit ipv4 tcp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP]) @@ -1265,7 +1183,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"permit ip6 tcp") + self.apply_rules(rules, "permit ip6 tcp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP]) @@ -1287,7 +1205,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"permit ipv4 udp") + self.apply_rules(rules, "permit ipv4 udp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP]) @@ -1309,7 +1227,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"permit ip6 udp") + self.apply_rules(rules, "permit ip6 udp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP]) @@ -1340,7 +1258,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"deny ip4/ip6 tcp") + self.apply_rules(rules, "deny ip4/ip6 tcp") # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -1372,7 +1290,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"deny ip4/ip6 udp") + self.apply_rules(rules, "deny ip4/ip6 udp") # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -1388,14 +1306,14 @@ class TestACLplugin(VppTestCase): # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"permit ipv4 tcp") + self.apply_rules(rules, "permit ipv4 tcp") # Traffic should still pass also for an odd ethertype self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], @@ -1410,15 +1328,14 @@ class TestACLplugin(VppTestCase): # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"permit ipv4 tcp") - + self.apply_rules(rules, "permit ipv4 tcp") # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked self.etype_whitelist([0xbbb], 1) @@ -1428,7 +1345,7 @@ class TestACLplugin(VppTestCase): 0, False, 0xaaaa) # remove the whitelist - self.etype_whitelist([], 0) + self.etype_whitelist([], 0, add=False) self.logger.info("ACLP_TEST_FINISH_0305") @@ -1440,15 +1357,14 @@ class TestACLplugin(VppTestCase): # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"permit ipv4 tcp") - + self.apply_rules(rules, "permit ipv4 tcp") # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked self.etype_whitelist([0xbbb], 1) @@ -1457,7 +1373,7 @@ class TestACLplugin(VppTestCase): 0, False, True, 0x0bbb) # remove the whitelist, the previously blocked 0xAAAA should pass now - self.etype_whitelist([], 0) + self.etype_whitelist([], 0, add=False) self.logger.info("ACLP_TEST_FINISH_0306") @@ -1469,19 +1385,19 @@ class TestACLplugin(VppTestCase): # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, b"permit ipv4 tcp") + self.apply_rules(rules, "permit ipv4 tcp") # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked self.etype_whitelist([0xbbb], 1) # remove the whitelist, the previously blocked 0xAAAA should pass now - self.etype_whitelist([], 0) + self.etype_whitelist([], 0, add=False) # The whitelisted traffic, should pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], @@ -1497,9 +1413,9 @@ class TestACLplugin(VppTestCase): # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) @@ -1508,12 +1424,13 @@ class TestACLplugin(VppTestCase): intf.append(VppLoInterface(self)) # Apply rules - self.apply_rules_to(rules, b"permit ipv4 tcp", intf[0].sw_if_index) + self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index) # Remove the interface intf[0].remove_vpp_config() self.logger.info("ACLP_TEST_FINISH_0315") + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner) diff --git a/src/plugins/acl/test/test_acl_plugin_conns.py b/src/plugins/acl/test/test_acl_plugin_conns.py index f4cf5947043..c7941fa150b 100644 --- a/src/plugins/acl/test/test_acl_plugin_conns.py +++ b/src/plugins/acl/test/test_acl_plugin_conns.py @@ -14,6 +14,9 @@ from scapy.layers.inet6 import IPv6ExtHdrFragment from pprint import pprint from random import randint from util import L4_Conn +from ipaddress import ip_network + +from vpp_acl import AclRule, VppAcl, VppAclInterface def to_acl_rule(self, is_permit, wildcard_sport=False): @@ -35,23 +38,18 @@ def to_acl_rule(self, is_permit, wildcard_sport=False): rule_l4_sport_first = rule_l4_sport rule_l4_sport_last = rule_l4_sport - new_rule = { - 'is_permit': is_permit, - 'is_ipv6': p.haslayer(IPv6), - 'src_ip_addr': inet_pton(rule_family, - p[rule_l3_layer].src), - 'src_ip_prefix_len': rule_prefix_len, - 'dst_ip_addr': inet_pton(rule_family, - p[rule_l3_layer].dst), - 'dst_ip_prefix_len': rule_prefix_len, - 'srcport_or_icmptype_first': rule_l4_sport_first, - 'srcport_or_icmptype_last': rule_l4_sport_last, - 'dstport_or_icmpcode_first': rule_l4_dport, - 'dstport_or_icmpcode_last': rule_l4_dport, - 'proto': rule_l4_proto, - } + new_rule = AclRule(is_permit=is_permit, proto=rule_l4_proto, + src_prefix=ip_network( + (p[rule_l3_layer].src, rule_prefix_len)), + dst_prefix=ip_network( + (p[rule_l3_layer].dst, rule_prefix_len)), + sport_from=rule_l4_sport_first, + sport_to=rule_l4_sport_last, + dport_from=rule_l4_dport, dport_to=rule_l4_dport) + return new_rule + Packet.to_acl_rule = to_acl_rule @@ -79,48 +77,44 @@ class Conn(L4_Conn): r = [] r.append(pkt.to_acl_rule(2, wildcard_sport=True)) r.append(self.wildcard_rule(0)) - res = self.testcase.vapi.acl_add_replace(0xffffffff, r) - self.testcase.assert_equal(res.retval, 0, "error adding ACL") - reflect_acl_index = res.acl_index + reflect_acl = VppAcl(self.testcase, r) + reflect_acl.add_vpp_config() r = [] r.append(self.wildcard_rule(0)) - res = self.testcase.vapi.acl_add_replace(0xffffffff, r) - self.testcase.assert_equal(res.retval, 0, "error adding deny ACL") - deny_acl_index = res.acl_index + deny_acl = VppAcl(self.testcase, r) + deny_acl.add_vpp_config() if reflect_side == acl_side: - self.testcase.vapi.acl_interface_set_acl_list( - self.ifs[acl_side].sw_if_index, 1, - [reflect_acl_index, - deny_acl_index]) - self.testcase.vapi.acl_interface_set_acl_list( - self.ifs[1-acl_side].sw_if_index, 0, []) + acl_if0 = VppAclInterface(self.testcase, + self.ifs[acl_side].sw_if_index, + [reflect_acl, deny_acl], n_input=1) + acl_if1 = VppAclInterface(self.testcase, + self.ifs[1-acl_side].sw_if_index, [], + n_input=0) + acl_if0.add_vpp_config() + acl_if1.add_vpp_config() else: - self.testcase.vapi.acl_interface_set_acl_list( - self.ifs[acl_side].sw_if_index, 1, - [deny_acl_index, - reflect_acl_index]) - self.testcase.vapi.acl_interface_set_acl_list( - self.ifs[1-acl_side].sw_if_index, 0, []) + acl_if0 = VppAclInterface(self.testcase, + self.ifs[acl_side].sw_if_index, + [deny_acl, reflect_acl], n_input=1) + acl_if1 = VppAclInterface(self.testcase, + self.ifs[1-acl_side].sw_if_index, [], + n_input=0) + acl_if0.add_vpp_config() + acl_if1.add_vpp_config() def wildcard_rule(self, is_permit): any_addr = ["0.0.0.0", "::"] rule_family = self.address_family is_ip6 = 1 if rule_family == AF_INET6 else 0 - new_rule = { - 'is_permit': is_permit, - 'is_ipv6': is_ip6, - 'src_ip_addr': inet_pton(rule_family, any_addr[is_ip6]), - 'src_ip_prefix_len': 0, - 'dst_ip_addr': inet_pton(rule_family, any_addr[is_ip6]), - 'dst_ip_prefix_len': 0, - 'srcport_or_icmptype_first': 0, - 'srcport_or_icmptype_last': 65535, - 'dstport_or_icmpcode_first': 0, - 'dstport_or_icmpcode_last': 65535, - 'proto': 0, - } + new_rule = AclRule(is_permit=is_permit, proto=0, + src_prefix=ip_network( + (any_addr[is_ip6], 0)), + dst_prefix=ip_network( + (any_addr[is_ip6], 0)), + sport_from=0, sport_to=65535, dport_from=0, + dport_to=65535) return new_rule diff --git a/src/plugins/acl/test/test_acl_plugin_l2l3.py b/src/plugins/acl/test/test_acl_plugin_l2l3.py index 3379871e0b9..30b53728c63 100644 --- a/src/plugins/acl/test/test_acl_plugin_l2l3.py +++ b/src/plugins/acl/test/test_acl_plugin_l2l3.py @@ -23,10 +23,12 @@ """ +import copy import unittest from socket import inet_pton, AF_INET, AF_INET6 from random import choice, shuffle from pprint import pprint +from ipaddress import ip_network import scapy.compat from scapy.packet import Raw @@ -40,6 +42,8 @@ from framework import VppTestCase, VppTestRunner from vpp_l2 import L2_PORT_TYPE import time +from vpp_acl import AclRule, VppAcl, VppAclInterface + class TestACLpluginL2L3(VppTestCase): """TestACLpluginL2L3 Test Case""" @@ -259,31 +263,26 @@ class TestACLpluginL2L3(VppTestCase): else: rule_l4_proto = p[IP].proto - new_rule = { - 'is_permit': is_permit, - 'is_ipv6': p.haslayer(IPv6), - 'src_ip_addr': inet_pton(rule_family, - p[rule_l3_layer].src), - 'src_ip_prefix_len': rule_prefix_len, - 'dst_ip_addr': inet_pton(rule_family, - p[rule_l3_layer].dst), - 'dst_ip_prefix_len': rule_prefix_len, - 'srcport_or_icmptype_first': rule_l4_sport, - 'srcport_or_icmptype_last': rule_l4_sport, - 'dstport_or_icmpcode_first': rule_l4_dport, - 'dstport_or_icmpcode_last': rule_l4_dport, - 'proto': rule_l4_proto, - } + new_rule = AclRule(is_permit=is_permit, proto=rule_l4_proto, + src_prefix=ip_network( + (p[rule_l3_layer].src, rule_prefix_len)), + dst_prefix=ip_network( + (p[rule_l3_layer].dst, rule_prefix_len)), + sport_from=rule_l4_sport, + sport_to=rule_l4_sport, + dport_from=rule_l4_dport, + dport_to=rule_l4_dport) + rules.append(new_rule) - new_rule_permit = new_rule.copy() - new_rule_permit['is_permit'] = 1 + new_rule_permit = copy.copy(new_rule) + new_rule_permit.is_permit = 1 permit_rules.append(new_rule_permit) - new_rule_permit_and_reflect = new_rule.copy() + new_rule_permit_and_reflect = copy.copy(new_rule) if can_reflect_this_packet: - new_rule_permit_and_reflect['is_permit'] = 2 + new_rule_permit_and_reflect.is_permit = 2 else: - new_rule_permit_and_reflect['is_permit'] = is_permit + new_rule_permit_and_reflect.is_permit = is_permit permit_and_reflect_rules.append(new_rule_permit_and_reflect) self.logger.info("create_stream pkt#%d: %s" % (i, payload)) @@ -356,79 +355,67 @@ class TestACLpluginL2L3(VppTestCase): # UDP: - def applied_acl_shuffle(self, sw_if_index): - # first collect what ACLs are applied and what they look like - r = self.vapi.acl_interface_list_dump(sw_if_index=sw_if_index) - orig_applied_acls = r[0] - - # we will collect these just to save and generate additional rulesets - orig_acls = [] - for acl_num in orig_applied_acls.acls: - rr = self.vapi.acl_dump(acl_num) - orig_acls.append(rr[0]) + def applied_acl_shuffle(self, acl_if): + saved_n_input = acl_if.n_input + # TOTO: maybe copy each one?? + saved_acls = acl_if.acls # now create a list of all the rules in all ACLs all_rules = [] - for old_acl in orig_acls: - for rule in old_acl.r: - all_rules.append(dict(rule._asdict())) + for old_acl in saved_acls: + for rule in old_acl.rules: + all_rules.append(rule) # Add a few ACLs made from shuffled rules shuffle(all_rules) - reply = self.vapi.acl_add_replace(acl_index=4294967295, - r=all_rules[::2], - tag=b"shuffle 1. acl") - shuffle_acl_1 = reply.acl_index + acl1 = VppAcl(self, rules=all_rules[::2], tag="shuffle 1. acl") + acl1.add_vpp_config() + shuffle(all_rules) - reply = self.vapi.acl_add_replace(acl_index=4294967295, - r=all_rules[::3], - tag=b"shuffle 2. acl") - shuffle_acl_2 = reply.acl_index + acl2 = VppAcl(self, rules=all_rules[::3], tag="shuffle 2. acl") + acl2.add_vpp_config() + shuffle(all_rules) - reply = self.vapi.acl_add_replace(acl_index=4294967295, - r=all_rules[::2], - tag=b"shuffle 3. acl") - shuffle_acl_3 = reply.acl_index + acl3 = VppAcl(self, rules=all_rules[::2], tag="shuffle 3. acl") + acl3.add_vpp_config() # apply the shuffle ACLs in front - input_acls = [shuffle_acl_1, shuffle_acl_2] - output_acls = [shuffle_acl_1, shuffle_acl_2] + input_acls = [acl1, acl2] + output_acls = [acl1, acl2] # add the currently applied ACLs - n_input = orig_applied_acls.n_input - input_acls.extend(orig_applied_acls.acls[:n_input]) - output_acls.extend(orig_applied_acls.acls[n_input:]) + n_input = acl_if.n_input + input_acls.extend(saved_acls[:n_input]) + output_acls.extend(saved_acls[n_input:]) # and the trailing shuffle ACL(s) - input_acls.extend([shuffle_acl_3]) - output_acls.extend([shuffle_acl_3]) + input_acls.extend([acl3]) + output_acls.extend([acl3]) # set the interface ACL list to the result - self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index, - n_input=len(input_acls), - acls=input_acls + output_acls) + acl_if.n_input = len(input_acls) + acl_if.acls = input_acls + output_acls + acl_if.add_vpp_config() + # change the ACLs a few times for i in range(1, 10): shuffle(all_rules) - reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_1, - r=all_rules[::1+(i % 2)], - tag=b"shuffle 1. acl") + acl1.modify_vpp_config(all_rules[::1+(i % 2)]) + shuffle(all_rules) - reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2, - r=all_rules[::1+(i % 3)], - tag=b"shuffle 2. acl") + acl2.modify_vpp_config(all_rules[::1+(i % 3)]) + shuffle(all_rules) - reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2, - r=all_rules[::1+(i % 5)], - tag=b"shuffle 3. acl") + acl3.modify_vpp_config(all_rules[::1+(i % 5)]) # restore to how it was before and clean up - self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index, - n_input=orig_applied_acls.n_input, - acls=orig_applied_acls.acls) - reply = self.vapi.acl_del(acl_index=shuffle_acl_1) - reply = self.vapi.acl_del(acl_index=shuffle_acl_2) - reply = self.vapi.acl_del(acl_index=shuffle_acl_3) + acl_if.n_input = saved_n_input + acl_if.acls = saved_acls + acl_if.add_vpp_config() + + acl1.remove_vpp_config() + acl2.remove_vpp_config() + acl3.remove_vpp_config() def create_acls_for_a_stream(self, stream_dict, test_l2_action, is_reflect): @@ -436,15 +423,14 @@ class TestACLpluginL2L3(VppTestCase): r_permit = stream_dict['permit_rules'] r_permit_reflect = stream_dict['permit_and_reflect_rules'] r_action = r_permit_reflect if is_reflect else r - reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_action, - tag=b"act. acl") - action_acl_index = reply.acl_index - reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_permit, - tag=b"perm. acl") - permit_acl_index = reply.acl_index - return {'L2': action_acl_index if test_l2_action else permit_acl_index, - 'L3': permit_acl_index if test_l2_action else action_acl_index, - 'permit': permit_acl_index, 'action': action_acl_index} + action_acl = VppAcl(self, rules=r_action, tag="act. acl") + action_acl.add_vpp_config() + permit_acl = VppAcl(self, rules=r_permit, tag="perm. acl") + permit_acl.add_vpp_config() + + return {'L2': action_acl if test_l2_action else permit_acl, + 'L3': permit_acl if test_l2_action else action_acl, + 'permit': permit_acl, 'action': action_acl} def apply_acl_ip46_x_to_y(self, bridged_to_routed, test_l2_deny, is_ip6, is_reflect, add_eh): @@ -452,26 +438,30 @@ class TestACLpluginL2L3(VppTestCase): """ self.reset_packet_infos() stream_dict = self.create_stream( - self.pg2, self.loop0, - bridged_to_routed, - self.pg_if_packet_sizes, is_ip6, - not is_reflect, False, add_eh) + self.pg2, self.loop0, + bridged_to_routed, + self.pg_if_packet_sizes, is_ip6, + not is_reflect, False, add_eh) stream = stream_dict['stream'] acl_idx = self.create_acls_for_a_stream(stream_dict, test_l2_deny, is_reflect) n_input_l3 = 0 if bridged_to_routed else 1 n_input_l2 = 1 if bridged_to_routed else 0 - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index, - n_input=n_input_l3, - acls=[acl_idx['L3']]) - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, - n_input=n_input_l2, - acls=[acl_idx['L2']]) - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index, - n_input=n_input_l2, - acls=[acl_idx['L2']]) - self.applied_acl_shuffle(self.pg0.sw_if_index) - self.applied_acl_shuffle(self.pg2.sw_if_index) + + acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index, + n_input=n_input_l3, acls=[acl_idx['L3']]) + acl_if_pg2.add_vpp_config() + + acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index, + n_input=n_input_l2, acls=[acl_idx['L2']]) + acl_if_pg0.add_vpp_config() + + acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index, + n_input=n_input_l2, acls=[acl_idx['L2']]) + acl_if_pg1.add_vpp_config() + + self.applied_acl_shuffle(acl_if_pg0) + self.applied_acl_shuffle(acl_if_pg1) return {'L2': acl_idx['L2'], 'L3': acl_idx['L3']} def apply_acl_ip46_both_directions_reflect(self, @@ -516,20 +506,23 @@ class TestACLpluginL2L3(VppTestCase): else: outbound_l3_acl = acl_idx_rev['L3'] - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index, - n_input=1, - acls=[inbound_l3_acl, - outbound_l3_acl]) - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, - n_input=1, - acls=[inbound_l2_acl, - outbound_l2_acl]) - self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index, - n_input=1, - acls=[inbound_l2_acl, - outbound_l2_acl]) - self.applied_acl_shuffle(self.pg0.sw_if_index) - self.applied_acl_shuffle(self.pg2.sw_if_index) + acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index, + n_input=1, + acls=[inbound_l3_acl, outbound_l3_acl]) + acl_if_pg2.add_vpp_config() + + acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index, + n_input=1, + acls=[inbound_l2_acl, outbound_l2_acl]) + acl_if_pg0.add_vpp_config() + + acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index, + n_input=1, + acls=[inbound_l2_acl, outbound_l2_acl]) + acl_if_pg1.add_vpp_config() + + self.applied_acl_shuffle(acl_if_pg0) + self.applied_acl_shuffle(acl_if_pg2) def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6, is_reflect, add_eh): @@ -594,7 +587,7 @@ class TestACLpluginL2L3(VppTestCase): pkts = self.run_traffic_ip46_routed_to_bridged(test_l2_deny, is_ip6, is_reflect, False, add_eh) - self.verify_acl_packet_count(acls['L3'], pkts) + self.verify_acl_packet_count(acls['L3'].acl_index, pkts) def run_test_ip46_bridged_to_routed(self, test_l2_deny, is_ip6, is_reflect, add_eh): @@ -604,7 +597,7 @@ class TestACLpluginL2L3(VppTestCase): pkts = self.run_traffic_ip46_bridged_to_routed(test_l2_deny, is_ip6, is_reflect, False, add_eh) - self.verify_acl_packet_count(acls['L2'], pkts) + self.verify_acl_packet_count(acls['L2'].acl_index, pkts) def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action, is_ip6, add_eh, diff --git a/src/plugins/acl/test/test_acl_plugin_macip.py b/src/plugins/acl/test/test_acl_plugin_macip.py index 0f178a36b69..5edd7b03258 100644 --- a/src/plugins/acl/test/test_acl_plugin_macip.py +++ b/src/plugins/acl/test/test_acl_plugin_macip.py @@ -9,6 +9,7 @@ from socket import inet_ntop, inet_pton, AF_INET, AF_INET6 from struct import pack, unpack import re import unittest +from ipaddress import ip_network, IPv4Network, IPv6Network import scapy.compat from scapy.packet import Raw @@ -21,6 +22,9 @@ from vpp_lo_interface import VppLoInterface from vpp_l2 import L2_PORT_TYPE from vpp_sub_interface import L2_VTR_OP, VppSubInterface, VppDot1QSubint, \ VppDot1ADSubint +from vpp_acl import AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist, \ + VppMacipAclInterface, VppMacipAcl, MacipRule +from vpp_papi import MACAddress class MethodHolder(VppTestCase): @@ -72,10 +76,10 @@ class MethodHolder(VppTestCase): # create 2 subinterfaces cls.subifs = [ - VppDot1QSubint(cls, cls.pg1, 10), - VppDot1ADSubint(cls, cls.pg2, 20, 300, 400), - VppDot1QSubint(cls, cls.pg3, 30), - VppDot1ADSubint(cls, cls.pg3, 40, 600, 700)] + VppDot1QSubint(cls, cls.pg1, 10), + VppDot1ADSubint(cls, cls.pg2, 20, 300, 400), + VppDot1QSubint(cls, cls.pg3, 30), + VppDot1ADSubint(cls, cls.pg3, 40, 600, 700)] cls.subifs[0].set_vtr(L2_VTR_OP.L2_POP_1, inner=10, push1q=1) @@ -158,11 +162,6 @@ class MethodHolder(VppTestCase): def setUp(self): super(MethodHolder, self).setUp() self.reset_packet_infos() - del self.ACLS[:] - - def tearDown(self): - super(MethodHolder, self).tearDown() - self.delete_acls() def show_commands_at_teardown(self): self.logger.info(self.vapi.ppcli("show interface address")) @@ -182,19 +181,21 @@ class MethodHolder(VppTestCase): acls = self.vapi.macip_acl_dump() if self.DEBUG: for acl in acls: - print("ACL #"+str(acl.acl_index)) + # print("ACL #"+str(acl.acl_index)) for r in acl.r: rule = "ACTION" if r.is_permit == 1: rule = "PERMIT" elif r.is_permit == 0: rule = "DENY " + """ print(" IP6" if r.is_ipv6 else " IP4", rule, binascii.hexlify(r.src_mac), binascii.hexlify(r.src_mac_mask), unpack('<16B', r.src_ip_addr), r.src_ip_prefix_len) + """ return acls def create_rules(self, mac_type=EXACT_MAC, ip_type=EXACT_IP, @@ -252,19 +253,16 @@ class MethodHolder(VppTestCase): elif ip_type == self.SUBNET_IP: ip4[2] = random.randint(100, 200) ip4[3] = 0 - ip6[8] = random.randint(100, 200) + ip6[7] = random.randint(100, 200) ip6[15] = 0 ip_pack = b'' for j in range(0, len(ip)): ip_pack += pack(' 0: continue if is_permit: - macip_rule = ({ - 'is_permit': is_permit, - 'is_ipv6': is_ip6, - 'src_ip_addr': ip_rule, - 'src_ip_prefix_len': prefix_len, - 'src_mac': binascii.unhexlify(mac_rule.replace(':', '')), - 'src_mac_mask': binascii.unhexlify( - mac_mask.replace(':', ''))}) + macip_rule = MacipRule( + is_permit=is_permit, + src_prefix=ip_network( + (ip_rule, prefix_len)), + src_mac=MACAddress(mac_rule).packed, + src_mac_mask=MACAddress(mac_mask).packed) macip_rules.append(macip_rule) # deny all other packets if not (mac_type == self.WILD_MAC and ip_type == self.WILD_IP): - macip_rule = ({'is_permit': 0, - 'is_ipv6': is_ip6, - 'src_ip_addr': "", - 'src_ip_prefix_len': 0, - 'src_mac': "", - 'src_mac_mask': ""}) + network = IPv6Network((0, 0)) if is_ip6 else IPv4Network((0, 0)) + macip_rule = MacipRule( + is_permit=0, + src_prefix=network, + src_mac=MACAddress("00:00:00:00:00:00").packed, + src_mac_mask=MACAddress("00:00:00:00:00:00").packed) macip_rules.append(macip_rule) - acl_rule = {'is_permit': 0, - 'is_ipv6': is_ip6} + network = IPv6Network((0, 0)) if is_ip6 else IPv4Network((0, 0)) + acl_rule = AclRule(is_permit=0, src_prefix=network, dst_prefix=network, + sport_from=0, sport_to=0, dport_from=0, dport_to=0) acl_rules.append(acl_rule) return {'stream': packets, 'macip_rules': macip_rules, @@ -644,36 +626,32 @@ class MethodHolder(VppTestCase): if apply_rules: if isMACIP: - reply = self.vapi.macip_acl_add(test_dict['macip_rules']) + self.acl = VppMacipAcl(self, rules=test_dict['macip_rules']) else: - reply = self.vapi.acl_add_replace(acl_index=4294967295, - r=test_dict['acl_rules']) - self.assertEqual(reply.retval, 0) - acl_index = reply.acl_index + self.acl = VppAcl(self, rules=test_dict['acl_rules']) + self.acl.add_vpp_config() if isMACIP: - self.vapi.macip_acl_interface_add_del( - sw_if_index=tx_if.sw_if_index, - acl_index=acl_index) - reply = self.vapi.macip_acl_interface_get() - self.assertEqual(reply.acls[tx_if.sw_if_index], acl_index) - self.ACLS.append(reply.acls[tx_if.sw_if_index]) + self.acl_if = VppMacipAclInterface( + self, sw_if_index=tx_if.sw_if_index, acls=[self.acl]) + self.acl_if.add_vpp_config() + + dump = self.acl_if.dump() + self.assertTrue(dump) + self.assertEqual(dump[0].acls[0], self.acl.acl_index) else: - self.vapi.acl_interface_add_del( - sw_if_index=tx_if.sw_if_index, acl_index=acl_index) + self.acl_if = VppAclInterface( + self, sw_if_index=tx_if.sw_if_index, n_input=1, + acls=[self.acl]) + self.acl_if.add_vpp_config() else: - self.vapi.macip_acl_interface_add_del( - sw_if_index=tx_if.sw_if_index, - acl_index=0) - if try_replace: + if hasattr(self, "acl_if"): + self.acl_if.remove_vpp_config() + if try_replace and hasattr(self, "acl"): if isMACIP: - reply = self.vapi.macip_acl_add_replace( - test_dict['macip_rules'], - acl_index) + self.acl.modify_vpp_config(test_dict['macip_rules']) else: - reply = self.vapi.acl_add_replace(acl_index=acl_index, - r=test_dict['acl_rules']) - self.assertEqual(reply.retval, 0) + self.acl.modify_vpp_config(test_dict['acl_rules']) if not isinstance(src_if, VppSubInterface): tx_if.add_stream(test_dict['stream']) @@ -693,9 +671,10 @@ class MethodHolder(VppTestCase): self.get_packet_count_for_if_idx(dst_if.sw_if_index)) self.verify_capture(test_dict['stream'], capture, is_ip6) if not isMACIP: - self.vapi.acl_interface_add_del(sw_if_index=tx_if.sw_if_index, - acl_index=acl_index, is_add=0) - self.vapi.acl_del(acl_index) + if hasattr(self, "acl_if"): + self.acl_if.remove_vpp_config() + if hasattr(self, "acl"): + self.acl.remove_vpp_config() def run_test_acls(self, mac_type, ip_type, acl_count, rules_count, traffic=None, ip=None): @@ -1077,17 +1056,13 @@ class TestMACIP(MethodHolder): r1 = self.create_rules(acl_count=3, rules_count=[2, 2, 2]) r2 = self.create_rules(mac_type=self.OUI_MAC, ip_type=self.SUBNET_IP) - self.apply_macip_rules(r1) + macip_acls = self.apply_macip_rules(r1) acls_before = self.macip_acl_dump_debug() # replace acls #2, #3 with new - reply = self.vapi.macip_acl_add_replace(r2[0], 2) - self.assertEqual(reply.retval, 0) - self.assertEqual(reply.acl_index, 2) - reply = self.vapi.macip_acl_add_replace(r2[1], 3) - self.assertEqual(reply.retval, 0) - self.assertEqual(reply.acl_index, 3) + macip_acls[2].modify_vpp_config(r2[0]) + macip_acls[3].modify_vpp_config(r2[1]) acls_after = self.macip_acl_dump_debug() @@ -1118,21 +1093,25 @@ class TestMACIP(MethodHolder): intf_count = len(self.interfaces)+1 intf = [] - self.apply_macip_rules(self.create_rules(acl_count=3, - rules_count=[3, 5, 4])) + macip_alcs = self.apply_macip_rules( + self.create_rules(acl_count=3, rules_count=[3, 5, 4])) intf.append(VppLoInterface(self)) intf.append(VppLoInterface(self)) sw_if_index0 = intf[0].sw_if_index - self.vapi.macip_acl_interface_add_del(sw_if_index0, 1) + macip_acl_if0 = VppMacipAclInterface( + self, sw_if_index=sw_if_index0, acls=[macip_alcs[1]]) + macip_acl_if0.add_vpp_config() reply = self.vapi.macip_acl_interface_get() self.assertEqual(reply.count, intf_count+1) self.assertEqual(reply.acls[sw_if_index0], 1) sw_if_index1 = intf[1].sw_if_index - self.vapi.macip_acl_interface_add_del(sw_if_index1, 0) + macip_acl_if1 = VppMacipAclInterface( + self, sw_if_index=sw_if_index1, acls=[macip_alcs[0]]) + macip_acl_if1.add_vpp_config() reply = self.vapi.macip_acl_interface_get() self.assertEqual(reply.count, intf_count+2) @@ -1148,8 +1127,12 @@ class TestMACIP(MethodHolder): intf.append(VppLoInterface(self)) sw_if_index2 = intf[2].sw_if_index sw_if_index3 = intf[3].sw_if_index - self.vapi.macip_acl_interface_add_del(sw_if_index2, 1) - self.vapi.macip_acl_interface_add_del(sw_if_index3, 1) + macip_acl_if2 = VppMacipAclInterface( + self, sw_if_index=sw_if_index2, acls=[macip_alcs[1]]) + macip_acl_if2.add_vpp_config() + macip_acl_if3 = VppMacipAclInterface( + self, sw_if_index=sw_if_index3, acls=[macip_alcs[1]]) + macip_acl_if3.add_vpp_config() reply = self.vapi.macip_acl_interface_get() self.assertEqual(reply.count, intf_count+3) diff --git a/src/plugins/acl/test/test_classify_l2_acl.py b/src/plugins/acl/test/test_classify_l2_acl.py index 0cba6c83cba..b1309881e58 100644 --- a/src/plugins/acl/test/test_classify_l2_acl.py +++ b/src/plugins/acl/test/test_classify_l2_acl.py @@ -603,5 +603,6 @@ class TestClassifyAcl(TestClassifier): self.acl_active_table = key self.run_verify_test(self.IP, self.IPV4, -1) + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner) diff --git a/src/plugins/acl/test/vpp_acl.py b/src/plugins/acl/test/vpp_acl.py new file mode 100644 index 00000000000..2d2f7ca257b --- /dev/null +++ b/src/plugins/acl/test/vpp_acl.py @@ -0,0 +1,476 @@ +from ipaddress import IPv4Network + +from vpp_object import VppObject +from vpp_papi import VppEnum +from vpp_ip import INVALID_INDEX +from vpp_papi_provider import UnexpectedApiReturnValueError + + +class VppAclPlugin(VppObject): + + def __init__(self, test, enable_intf_counters=False): + self._test = test + self.enable_intf_counters = enable_intf_counters + + @property + def enable_intf_counters(self): + return self._enable_intf_counters + + @enable_intf_counters.setter + def enable_intf_counters(self, enable): + self.vapi.acl_stats_intf_counters_enable(enable=enable) + + def add_vpp_config(self): + pass + + def remove_vpp_config(self): + pass + + def query_vpp_config(self): + pass + + def object_id(self): + return ("acl-plugin-%d" % (self._sw_if_index)) + + +class AclRule(): + """ ACL Rule """ + + # port ranges + PORTS_ALL = -1 + PORTS_RANGE = 0 + PORTS_RANGE_2 = 1 + udp_sport_from = 10 + udp_sport_to = udp_sport_from + 5 + udp_dport_from = 20000 + udp_dport_to = udp_dport_from + 5000 + tcp_sport_from = 30 + tcp_sport_to = tcp_sport_from + 5 + tcp_dport_from = 40000 + tcp_dport_to = tcp_dport_from + 5000 + + udp_sport_from_2 = 90 + udp_sport_to_2 = udp_sport_from_2 + 5 + udp_dport_from_2 = 30000 + udp_dport_to_2 = udp_dport_from_2 + 5000 + tcp_sport_from_2 = 130 + tcp_sport_to_2 = tcp_sport_from_2 + 5 + tcp_dport_from_2 = 20000 + tcp_dport_to_2 = tcp_dport_from_2 + 5000 + + icmp4_type = 8 # echo request + icmp4_code = 3 + icmp6_type = 128 # echo request + icmp6_code = 3 + + icmp4_type_2 = 8 + icmp4_code_from_2 = 5 + icmp4_code_to_2 = 20 + icmp6_type_2 = 128 + icmp6_code_from_2 = 8 + icmp6_code_to_2 = 42 + + def __init__(self, is_permit, src_prefix=IPv4Network('0.0.0.0/0'), + dst_prefix=IPv4Network('0.0.0.0/0'), + proto=0, ports=PORTS_ALL, sport_from=None, sport_to=None, + dport_from=None, dport_to=None): + self.is_permit = is_permit + self.src_prefix = src_prefix + self.dst_prefix = dst_prefix + self._proto = proto + self._ports = ports + # assign ports by range + self.update_ports() + # assign specified ports + if sport_from: + self.sport_from = sport_from + if sport_to: + self.sport_to = sport_to + if dport_from: + self.dport_from = dport_from + if dport_to: + self.dport_to = dport_to + + def __copy__(self): + new_rule = AclRule(self.is_permit, self.src_prefix, self.dst_prefix, + self._proto, self._ports, self.sport_from, + self.sport_to, self.dport_from, self.dport_to) + return new_rule + + def update_ports(self): + if self._ports == self.PORTS_ALL: + self.sport_from = 0 + self.dport_from = 0 + self.sport_to = 65535 + if self._proto == 1 or self._proto == 58: + self.sport_to = 255 + self.dport_to = self.sport_to + elif self._ports == self.PORTS_RANGE: + if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP: + self.sport_from = self.icmp4_type + self.sport_to = self.icmp4_type + self.dport_from = self.icmp4_code + self.dport_to = self.icmp4_code + elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6: + self.sport_from = self.icmp6_type + self.sport_to = self.icmp6_type + self.dport_from = self.icmp6_code + self.dport_to = self.icmp6_code + elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP: + self.sport_from = self.tcp_sport_from + self.sport_to = self.tcp_sport_to + self.dport_from = self.tcp_dport_from + self.dport_to = self.tcp_dport_to + elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP: + self.sport_from = self.udp_sport_from + self.sport_to = self.udp_sport_to + self.dport_from = self.udp_dport_from + self.dport_to = self.udp_dport_to + elif self._ports == self.PORTS_RANGE_2: + if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP: + self.sport_from = self.icmp4_type_2 + self.sport_to = self.icmp4_type_2 + self.dport_from = self.icmp4_code_from_2 + self.dport_to = self.icmp4_code_to_2 + elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6: + self.sport_from = self.icmp6_type_2 + self.sport_to = self.icmp6_type_2 + self.dport_from = self.icmp6_code_from_2 + self.dport_to = self.icmp6_code_to_2 + elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP: + self.sport_from = self.tcp_sport_from_2 + self.sport_to = self.tcp_sport_to_2 + self.dport_from = self.tcp_dport_from_2 + self.dport_to = self.tcp_dport_to_2 + elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP: + self.sport_from = self.udp_sport_from_2 + self.sport_to = self.udp_sport_to_2 + self.dport_from = self.udp_dport_from_2 + self.dport_to = self.udp_dport_to_2 + else: + self.sport_from = self._ports + self.sport_to = self._ports + self.dport_from = self._ports + self.dport_to = self._ports + + @property + def proto(self): + return self._proto + + @proto.setter + def proto(self, proto): + self._proto = proto + self.update_ports() + + @property + def ports(self): + return self._ports + + @ports.setter + def ports(self, ports): + self._ports = ports + self.update_ports() + + def encode(self): + return {'is_permit': self.is_permit, 'proto': self.proto, + 'srcport_or_icmptype_first': self.sport_from, + 'srcport_or_icmptype_last': self.sport_to, + 'src_prefix': self.src_prefix, + 'dstport_or_icmpcode_first': self.dport_from, + 'dstport_or_icmpcode_last': self.dport_to, + 'dst_prefix': self.dst_prefix} + + +class VppAcl(VppObject): + """ VPP ACL """ + + def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None): + self._test = test + self._acl_index = acl_index + self.tag = tag + self._rules = rules + + @property + def rules(self): + return self._rules + + @property + def acl_index(self): + return self._acl_index + + @property + def count(self): + return len(self._rules) + + def encode_rules(self): + rules = [] + for rule in self._rules: + rules.append(rule.encode()) + return rules + + def add_vpp_config(self, expect_error=False): + try: + reply = self._test.vapi.acl_add_replace( + acl_index=self._acl_index, tag=self.tag, count=self.count, + r=self.encode_rules()) + self._acl_index = reply.acl_index + self._test.registry.register(self, self._test.logger) + if expect_error: + self._test.fail("Unexpected api reply") + return self + except UnexpectedApiReturnValueError: + if not expect_error: + self._test.fail("Unexpected api reply") + return None + + def modify_vpp_config(self, rules): + self._rules = rules + self.add_vpp_config() + + def remove_vpp_config(self, expect_error=False): + try: + self._test.vapi.acl_del(acl_index=self._acl_index) + if expect_error: + self._test.fail("Unexpected api reply") + except UnexpectedApiReturnValueError: + if not expect_error: + self._test.fail("Unexpected api reply") + + def dump(self): + return self._test.vapi.acl_dump(acl_index=self._acl_index) + + def query_vpp_config(self): + dump = self.dump() + for rule in dump: + if rule.acl_index == self._acl_index: + return True + return False + + def object_id(self): + return ("acl-%s-%d" % (self.tag, self._acl_index)) + + +class VppEtypeWhitelist(VppObject): + """ VPP Etype Whitelist """ + + def __init__(self, test, sw_if_index, whitelist, n_input=0): + self._test = test + self.whitelist = whitelist + self.n_input = n_input + self._sw_if_index = sw_if_index + + @property + def sw_if_index(self): + return self._sw_if_index + + @property + def count(self): + return len(self.whitelist) + + def add_vpp_config(self): + self._test.vapi.acl_interface_set_etype_whitelist( + sw_if_index=self._sw_if_index, count=self.count, + n_input=self.n_input, whitelist=self.whitelist) + self._test.registry.register(self, self._test.logger) + return self + + def remove_vpp_config(self): + self._test.vapi.acl_interface_set_etype_whitelist( + sw_if_index=self._sw_if_index, count=0, n_input=0, whitelist=[]) + + def query_vpp_config(self): + self._test.vapi.acl_interface_etype_whitelist_dump( + sw_if_index=self._sw_if_index) + return False + + def object_id(self): + return ("acl-etype_wl-%d" % (self._sw_if_index)) + + +class VppAclInterface(VppObject): + """ VPP ACL Interface """ + + def __init__(self, test, sw_if_index, acls, n_input=0): + self._test = test + self._sw_if_index = sw_if_index + self.n_input = n_input + self.acls = acls + + @property + def sw_if_index(self): + return self._sw_if_index + + @property + def count(self): + return len(self.acls) + + def encode_acls(self): + acls = [] + for acl in self.acls: + acls.append(acl.acl_index) + return acls + + def add_vpp_config(self, expect_error=False): + try: + reply = self._test.vapi.acl_interface_set_acl_list( + sw_if_index=self._sw_if_index, n_input=self.n_input, + count=self.count, acls=self.encode_acls()) + self._test.registry.register(self, self._test.logger) + if expect_error: + self._test.fail("Unexpected api reply") + return self + except UnexpectedApiReturnValueError: + if not expect_error: + self._test.fail("Unexpected api reply") + return None + + def remove_vpp_config(self, expect_error=False): + try: + reply = self._test.vapi.acl_interface_set_acl_list( + sw_if_index=self._sw_if_index, n_input=0, count=0, acls=[]) + if expect_error: + self._test.fail("Unexpected api reply") + except UnexpectedApiReturnValueError: + if not expect_error: + self._test.fail("Unexpected api reply") + + def query_vpp_config(self): + dump = self._test.vapi.acl_interface_list_dump( + sw_if_index=self._sw_if_index) + for acl_list in dump: + if acl_list.count > 0: + return True + return False + + def object_id(self): + return ("acl-if-list-%d" % (self._sw_if_index)) + + +class MacipRule(): + """ Mac Ip rule """ + + def __init__(self, is_permit, src_mac=0, src_mac_mask=0, + src_prefix=IPv4Network('0.0.0.0/0')): + self.is_permit = is_permit + self.src_mac = src_mac + self.src_mac_mask = src_mac_mask + self.src_prefix = src_prefix + + def encode(self): + return {'is_permit': self.is_permit, 'src_mac': self.src_mac, + 'src_mac_mask': self.src_mac_mask, + 'src_prefix': self.src_prefix} + + +class VppMacipAcl(VppObject): + """ Vpp Mac Ip ACL """ + + def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None): + self._test = test + self._acl_index = acl_index + self.tag = tag + self._rules = rules + + @property + def acl_index(self): + return self._acl_index + + @property + def rules(self): + return self._rules + + @property + def count(self): + return len(self._rules) + + def encode_rules(self): + rules = [] + for rule in self._rules: + rules.append(rule.encode()) + return rules + + def add_vpp_config(self, expect_error=False): + try: + reply = self._test.vapi.macip_acl_add_replace( + acl_index=self._acl_index, tag=self.tag, count=self.count, + r=self.encode_rules()) + self._acl_index = reply.acl_index + self._test.registry.register(self, self._test.logger) + if expect_error: + self._test.fail("Unexpected api reply") + return self + except UnexpectedApiReturnValueError: + if not expect_error: + self._test.fail("Unexpected api reply") + return None + + def modify_vpp_config(self, rules): + self._rules = rules + self.add_vpp_config() + + def remove_vpp_config(self, expect_error=False): + try: + self._test.vapi.macip_acl_del(acl_index=self._acl_index) + if expect_error: + self._test.fail("Unexpected api reply") + except UnexpectedApiReturnValueError: + if not expect_error: + self._test.fail("Unexpected api reply") + + def dump(self): + return self._test.vapi.macip_acl_dump(acl_index=self._acl_index) + + def query_vpp_config(self): + dump = self.dump() + for rule in dump: + if rule.acl_index == self._acl_index: + return True + return False + + def object_id(self): + return ("macip-acl-%s-%d" % (self.tag, self._acl_index)) + + +class VppMacipAclInterface(VppObject): + """ VPP Mac Ip ACL Interface """ + + def __init__(self, test, sw_if_index, acls): + self._test = test + self._sw_if_index = sw_if_index + self.acls = acls + + @property + def sw_if_index(self): + return self._sw_if_index + + @property + def count(self): + return len(self.acls) + + def add_vpp_config(self): + for acl in self.acls: + self._test.vapi.macip_acl_interface_add_del( + is_add=True, sw_if_index=self._sw_if_index, + acl_index=acl.acl_index) + self._test.registry.register(self, self._test.logger) + + def remove_vpp_config(self): + for acl in self.acls: + self._test.vapi.macip_acl_interface_add_del( + is_add=False, sw_if_index=self._sw_if_index, + acl_index=acl.acl_index) + + def dump(self): + return self._test.vapi.macip_acl_interface_list_dump( + sw_if_index=self._sw_if_index) + + def query_vpp_config(self): + dump = self.dump() + for acl_list in dump: + for acl_index in acl_list.acls: + if acl_index != INVALID_INDEX: + return True + return False + + def object_id(self): + return ("macip-acl-if-list-%d" % (self._sw_if_index)) -- cgit 1.2.3-korg