diff options
author | Ole Trøan <otroan@employees.org> | 2020-03-26 18:08:35 +0000 |
---|---|---|
committer | Ole Troan <ot@cisco.com> | 2020-03-26 19:36:31 +0100 |
commit | 492a5d0bd79c3c0913f1b8fb4ad35d9ad23d821b (patch) | |
tree | e6993408aefb98434598fd4302b66c50eee7b3a4 /src/plugins/acl/test | |
parent | aad1ee149403994194cf37cef4530b042ba7df3a (diff) |
acl: revert acl: api cleanup
This reverts commit aad1ee149403994194cf37cef4530b042ba7df3a.
Reason for revert: Verify failure. Doesn't build.
Type: fix
Change-Id: I91b1b26ac43edde4853e4561a0083d0b3a06efee
Signed-off-by: Ole Troan <ot@cisco.com>
Diffstat (limited to 'src/plugins/acl/test')
-rw-r--r-- | src/plugins/acl/test/test_acl_plugin.py | 365 | ||||
-rw-r--r-- | src/plugins/acl/test/test_acl_plugin_conns.py | 65 | ||||
-rw-r--r-- | src/plugins/acl/test/test_acl_plugin_l2l3.py | 216 | ||||
-rw-r--r-- | src/plugins/acl/test/test_acl_plugin_macip.py | 198 | ||||
-rw-r--r-- | src/plugins/acl/test/test_classify_l2_acl.py | 1 | ||||
-rw-r--r-- | src/plugins/acl/test/vpp_acl.py | 460 |
6 files changed, 469 insertions, 836 deletions
diff --git a/src/plugins/acl/test/test_acl_plugin.py b/src/plugins/acl/test/test_acl_plugin.py index 8cac81cdb9f..f07d37548fe 100644 --- a/src/plugins/acl/test/test_acl_plugin.py +++ b/src/plugins/acl/test/test_acl_plugin.py @@ -12,11 +12,8 @@ from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest from scapy.layers.inet6 import IPv6ExtHdrFragment from framework import VppTestCase, VppTestRunner from util import Host, ppp -from ipaddress import IPv4Network, IPv6Network from vpp_lo_interface import VppLoInterface -from vpp_acl import AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist -from vpp_ip import INVALID_INDEX class TestACLplugin(VppTestCase): @@ -178,49 +175,105 @@ class TestACLplugin(VppTestCase): % self.bd_id)) def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1, - s_prefix=0, s_ip=0, - d_prefix=0, d_ip=0): - if ip: - src_prefix = IPv6Network((s_ip, s_prefix)) - dst_prefix = IPv6Network((d_ip, d_prefix)) + s_prefix=0, s_ip=b'\x00\x00\x00\x00', + d_prefix=0, d_ip=b'\x00\x00\x00\x00'): + if proto == -1: + return + if ports == self.PORTS_ALL: + sport_from = 0 + dport_from = 0 + sport_to = 65535 if proto != 1 and proto != 58 else 255 + dport_to = sport_to + elif ports == self.PORTS_RANGE: + if proto == 1: + sport_from = self.icmp4_type + sport_to = self.icmp4_type + dport_from = self.icmp4_code + dport_to = self.icmp4_code + elif proto == 58: + sport_from = self.icmp6_type + sport_to = self.icmp6_type + dport_from = self.icmp6_code + dport_to = self.icmp6_code + elif proto == self.proto[self.IP][self.TCP]: + sport_from = self.tcp_sport_from + sport_to = self.tcp_sport_to + dport_from = self.tcp_dport_from + dport_to = self.tcp_dport_to + elif proto == self.proto[self.IP][self.UDP]: + sport_from = self.udp_sport_from + sport_to = self.udp_sport_to + dport_from = self.udp_dport_from + dport_to = self.udp_dport_to + elif ports == self.PORTS_RANGE_2: + if proto == 1: + sport_from = self.icmp4_type_2 + sport_to = self.icmp4_type_2 + dport_from = self.icmp4_code_from_2 + dport_to = self.icmp4_code_to_2 + elif proto == 58: + sport_from = self.icmp6_type_2 + sport_to = self.icmp6_type_2 + dport_from = self.icmp6_code_from_2 + dport_to = self.icmp6_code_to_2 + elif proto == self.proto[self.IP][self.TCP]: + sport_from = self.tcp_sport_from_2 + sport_to = self.tcp_sport_to_2 + dport_from = self.tcp_dport_from_2 + dport_to = self.tcp_dport_to_2 + elif proto == self.proto[self.IP][self.UDP]: + sport_from = self.udp_sport_from_2 + sport_to = self.udp_sport_to_2 + dport_from = self.udp_dport_from_2 + dport_to = self.udp_dport_to_2 else: - src_prefix = IPv4Network((s_ip, s_prefix)) - dst_prefix = IPv4Network((d_ip, d_prefix)) - return AclRule(is_permit=permit_deny, ports=ports, proto=proto, - src_prefix=src_prefix, dst_prefix=dst_prefix) - - def apply_rules(self, rules, tag=None): - acl = VppAcl(self, rules, tag=tag) - acl.add_vpp_config() - self.logger.info("Dumped ACL: " + str(acl.dump())) + sport_from = ports + sport_to = ports + dport_from = ports + dport_to = ports + + rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto, + 'srcport_or_icmptype_first': sport_from, + 'srcport_or_icmptype_last': sport_to, + 'src_ip_prefix_len': s_prefix, + 'src_ip_addr': s_ip, + 'dstport_or_icmpcode_first': dport_from, + 'dstport_or_icmpcode_last': dport_to, + 'dst_ip_prefix_len': d_prefix, + 'dst_ip_addr': d_ip}) + return rule + + def apply_rules(self, rules, tag=b''): + reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules, + tag=tag) + self.logger.info("Dumped ACL: " + str( + self.vapi.acl_dump(reply.acl_index))) # Apply a ACL on the interface as inbound for i in self.pg_interfaces: - acl_if = VppAclInterface( - self, sw_if_index=i.sw_if_index, n_input=1, acls=[acl]) - acl_if.add_vpp_config() - return acl.acl_index - - def apply_rules_to(self, rules, tag=None, sw_if_index=INVALID_INDEX): - acl = VppAcl(self, rules, tag=tag) - acl.add_vpp_config() - self.logger.info("Dumped ACL: " + str(acl.dump())) + self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index, + n_input=1, + acls=[reply.acl_index]) + return reply.acl_index + + def apply_rules_to(self, rules, tag=b'', sw_if_index=0xFFFFFFFF): + reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules, + tag=tag) + self.logger.info("Dumped ACL: " + str( + self.vapi.acl_dump(reply.acl_index))) # Apply a ACL on the interface as inbound - acl_if = VppAclInterface(self, sw_if_index=sw_if_index, n_input=1, - acls=[acl]) - return acl.acl_index + self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index, + n_input=1, + acls=[reply.acl_index]) + return reply.acl_index - def etype_whitelist(self, whitelist, n_input, add=True): + def etype_whitelist(self, whitelist, n_input): # Apply whitelists on all the interfaces - if add: - self._wl = [] - for i in self.pg_interfaces: - self._wl.append(VppEtypeWhitelist( - self, sw_if_index=i.sw_if_index, whitelist=whitelist, - n_input=n_input).add_vpp_config()) - else: - if hasattr(self, "_wl"): - for wl in self._wl: - wl.remove_vpp_config() + for i in self.pg_interfaces: + # checkstyle can't read long names. Help them. + fun = self.vapi.acl_interface_set_etype_whitelist + fun(sw_if_index=i.sw_if_index, n_input=n_input, + whitelist=whitelist) + return def create_upper_layer(self, packet_index, proto, ports=0): p = self.proto_map[proto] @@ -489,16 +542,24 @@ class TestACLplugin(VppTestCase): """ self.logger.info("ACLP_TEST_START_0001") - # Create a permit-1234 ACL - r = [AclRule(is_permit=1, proto=17, ports=1234)] - r[0].sport_to = 1235 + # Add an ACL + r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17, + 'srcport_or_icmptype_first': 1234, + 'srcport_or_icmptype_last': 1235, + 'src_ip_prefix_len': 0, + 'src_ip_addr': b'\x00\x00\x00\x00', + 'dstport_or_icmpcode_first': 1234, + 'dstport_or_icmpcode_last': 1234, + 'dst_ip_addr': b'\x00\x00\x00\x00', + 'dst_ip_prefix_len': 0}] # Test 1: add a new ACL - first_acl = VppAcl(self, rules=r, tag="permit 1234") - first_acl.add_vpp_config() - self.assertTrue(first_acl.query_vpp_config()) + reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r, + tag=b"permit 1234") + self.assertEqual(reply.retval, 0) # The very first ACL gets #0 - self.assertEqual(first_acl.acl_index, 0) - rr = first_acl.dump() + self.assertEqual(reply.acl_index, 0) + first_acl = reply.acl_index + rr = self.vapi.acl_dump(reply.acl_index) self.logger.info("Dumped ACL: " + str(rr)) self.assertEqual(len(rr), 1) # We should have the same number of ACL entries as we had asked @@ -507,50 +568,70 @@ class TestACLplugin(VppTestCase): # are different types, we need to iterate over rules and keys to get # to basic values. for i_rule in range(0, len(r) - 1): - encoded_rule = r[i_rule].encode() - for rule_key in encoded_rule: + for rule_key in r[i_rule]: self.assertEqual(rr[0].r[i_rule][rule_key], - encoded_rule[rule_key]) - - # Create a deny-1234 ACL - r_deny = [AclRule(is_permit=0, proto=17, ports=1234), - AclRule(is_permit=1, proto=17, ports=0)] - r_deny[0].sport_to = 1235 - second_acl = VppAcl(self, rules=r_deny, tag="deny 1234;permit all") - second_acl.add_vpp_config() - self.assertTrue(second_acl.query_vpp_config()) + r[i_rule][rule_key]) + + # Add a deny-1234 ACL + r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17, + 'srcport_or_icmptype_first': 1234, + 'srcport_or_icmptype_last': 1235, + 'src_ip_prefix_len': 0, + 'src_ip_addr': b'\x00\x00\x00\x00', + 'dstport_or_icmpcode_first': 1234, + 'dstport_or_icmpcode_last': 1234, + 'dst_ip_addr': b'\x00\x00\x00\x00', + 'dst_ip_prefix_len': 0}, + {'is_permit': 1, 'is_ipv6': 0, 'proto': 17, + 'srcport_or_icmptype_first': 0, + 'srcport_or_icmptype_last': 0, + 'src_ip_prefix_len': 0, + 'src_ip_addr': b'\x00\x00\x00\x00', + 'dstport_or_icmpcode_first': 0, + 'dstport_or_icmpcode_last': 0, + 'dst_ip_addr': b'\x00\x00\x00\x00', + 'dst_ip_prefix_len': 0}] + + reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny, + tag=b"deny 1234;permit all") + self.assertEqual(reply.retval, 0) # The second ACL gets #1 - self.assertEqual(second_acl.acl_index, 1) + self.assertEqual(reply.acl_index, 1) + second_acl = reply.acl_index # Test 2: try to modify a nonexistent ACL - invalid_acl = VppAcl(self, acl_index=432, rules=r, tag="FFFF:FFFF") - reply = invalid_acl.add_vpp_config(expect_error=True) - - # apply an ACL on an interface inbound, try to delete ACL, must fail - acl_if_list = VppAclInterface( - self, sw_if_index=self.pg0.sw_if_index, n_input=1, - acls=[first_acl]) - acl_if_list.add_vpp_config() - first_acl.remove_vpp_config(expect_error=True) - # Unapply an ACL and then try to delete it - must be ok - acl_if_list.remove_vpp_config() - first_acl.remove_vpp_config() - + reply = self.vapi.acl_add_replace(acl_index=432, r=r, + tag=b"FFFF:FFFF", expected_retval=-6) + self.assertEqual(reply.retval, -6) + # The ACL number should pass through + self.assertEqual(reply.acl_index, 432) # apply an ACL on an interface inbound, try to delete ACL, must fail - acl_if_list = VppAclInterface( - self, sw_if_index=self.pg0.sw_if_index, n_input=0, - acls=[second_acl]) - acl_if_list.add_vpp_config() - second_acl.remove_vpp_config(expect_error=True) + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, + n_input=1, + acls=[first_acl]) + reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142) # Unapply an ACL and then try to delete it - must be ok - acl_if_list.remove_vpp_config() - second_acl.remove_vpp_config() + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, + n_input=0, + acls=[]) + reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0) + + # apply an ACL on an interface outbound, try to delete ACL, must fail + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, + n_input=0, + acls=[second_acl]) + reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143) + # Unapply the ACL and then try to delete it - must be ok + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, + n_input=0, + acls=[]) + reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0) # try to apply a nonexistent ACL - must fail - acl_if_list = VppAclInterface( - self, sw_if_index=self.pg0.sw_if_index, n_input=0, - acls=[invalid_acl]) - acl_if_list.add_vpp_config(expect_error=True) + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, + n_input=1, + acls=[first_acl], + expected_retval=-6) self.logger.info("ACLP_TEST_FINISH_0001") @@ -561,12 +642,12 @@ class TestACLplugin(VppTestCase): rules = [] rules.append(self.create_rule(self.IPV4, self.PERMIT, - 0, self.proto[self.IP][self.UDP])) + 0, self.proto[self.IP][self.UDP])) rules.append(self.create_rule(self.IPV4, self.PERMIT, - 0, self.proto[self.IP][self.TCP])) + 0, self.proto[self.IP][self.TCP])) # Apply rules - acl_idx = self.apply_rules(rules, "permit per-flow") + acl_idx = self.apply_rules(rules, b"permit per-flow") # enable counters reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1) @@ -595,15 +676,14 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_START_0003") # Add a deny-flows ACL rules = [] - rules.append(self.create_rule( - self.IPV4, self.DENY, self.PORTS_ALL, - self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV4, self.DENY, + self.PORTS_ALL, self.proto[self.IP][self.UDP])) # Permit ip any any in the end rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_ALL, 0)) # Apply rules - acl_idx = self.apply_rules(rules, "deny per-flow;permit all") + acl_idx = self.apply_rules(rules, b"deny per-flow;permit all") # enable counters reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1) @@ -637,7 +717,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit icmpv4") + self.apply_rules(rules, b"permit icmpv4") # Traffic should still pass self.run_verify_test(self.ICMP, self.IPV4, @@ -658,7 +738,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit icmpv6") + self.apply_rules(rules, b"permit icmpv6") # Traffic should still pass self.run_verify_test(self.ICMP, self.IPV6, @@ -679,7 +759,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny icmpv4") + self.apply_rules(rules, b"deny icmpv4") # Traffic should not pass self.run_verify_negat_test(self.ICMP, self.IPV4, 0) @@ -699,7 +779,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny icmpv6") + self.apply_rules(rules, b"deny icmpv6") # Traffic should not pass self.run_verify_negat_test(self.ICMP, self.IPV6, 0) @@ -714,12 +794,12 @@ class TestACLplugin(VppTestCase): # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ipv4 tcp") + self.apply_rules(rules, b"permit ipv4 tcp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP]) @@ -739,7 +819,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip6 tcp") + self.apply_rules(rules, b"permit ip6 tcp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP]) @@ -759,7 +839,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ipv udp") + self.apply_rules(rules, b"permit ipv udp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP]) @@ -779,7 +859,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip6 udp") + self.apply_rules(rules, b"permit ip6 udp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP]) @@ -804,7 +884,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 tcp") + self.apply_rules(rules, b"deny ip4/ip6 tcp") # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -830,7 +910,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 udp") + self.apply_rules(rules, b"deny ip4/ip6 udp") # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -868,13 +948,13 @@ class TestACLplugin(VppTestCase): for i in range(len(r)): rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3])) - acl = VppAcl(self, rules=rules) - acl.add_vpp_config() - result = acl.dump() + reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules) + result = self.vapi.acl_dump(reply.acl_index) i = 0 for drules in result: for dr in drules.r: + self.assertEqual(dr.is_ipv6, r[i][0]) self.assertEqual(dr.is_permit, r[i][1]) self.assertEqual(dr.proto, r[i][3]) @@ -921,7 +1001,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip4 tcp %d" % port) + self.apply_rules(rules, b"permit ip4 tcp %d" % port) # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, @@ -943,7 +1023,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip4 tcp %d" % port) + self.apply_rules(rules, b"permit ip4 tcp %d" % port) # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, @@ -965,7 +1045,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip4 tcp %d" % port) + self.apply_rules(rules, b"permit ip4 tcp %d" % port) # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, @@ -974,7 +1054,7 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0017") def test_0018_udp_permit_port_v6(self): - """ permit single UDPv6 + """ permit single UPPv6 """ self.logger.info("ACLP_TEST_START_0018") @@ -988,7 +1068,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip4 tcp %d" % port) + self.apply_rules(rules, b"permit ip4 tcp %d" % port) # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, @@ -1015,7 +1095,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 udp %d" % port) + self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port) # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -1042,7 +1122,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 udp %d" % port) + self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port) # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -1070,7 +1150,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 udp %d" % port) + self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port) # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -1092,7 +1172,7 @@ class TestACLplugin(VppTestCase): self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit empty udp ip4 %d" % port) + self.apply_rules(rules, b"permit empty udp ip4 %d" % port) # Traffic should still pass # Create incoming packet streams for packet-generator interfaces @@ -1126,7 +1206,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit empty udp ip6 %d" % port) + self.apply_rules(rules, b"permit empty udp ip6 %d" % port) # Traffic should still pass # Create incoming packet streams for packet-generator interfaces @@ -1156,14 +1236,14 @@ class TestACLplugin(VppTestCase): # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ipv4 tcp") + self.apply_rules(rules, b"permit ipv4 tcp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP]) @@ -1185,7 +1265,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip6 tcp") + self.apply_rules(rules, b"permit ip6 tcp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP]) @@ -1207,7 +1287,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ipv4 udp") + self.apply_rules(rules, b"permit ipv4 udp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP]) @@ -1229,7 +1309,7 @@ class TestACLplugin(VppTestCase): rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ip6 udp") + self.apply_rules(rules, b"permit ip6 udp") # Traffic should still pass self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP]) @@ -1260,7 +1340,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 tcp") + self.apply_rules(rules, b"deny ip4/ip6 tcp") # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -1292,7 +1372,7 @@ class TestACLplugin(VppTestCase): self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "deny ip4/ip6 udp") + self.apply_rules(rules, b"deny ip4/ip6 udp") # Traffic should not pass self.run_verify_negat_test(self.IP, self.IPRANDOM, @@ -1308,14 +1388,14 @@ class TestACLplugin(VppTestCase): # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ipv4 tcp") + self.apply_rules(rules, b"permit ipv4 tcp") # Traffic should still pass also for an odd ethertype self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], @@ -1330,14 +1410,15 @@ class TestACLplugin(VppTestCase): # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ipv4 tcp") + self.apply_rules(rules, b"permit ipv4 tcp") + # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked self.etype_whitelist([0xbbb], 1) @@ -1347,7 +1428,7 @@ class TestACLplugin(VppTestCase): 0, False, 0xaaaa) # remove the whitelist - self.etype_whitelist([], 0, add=False) + self.etype_whitelist([], 0) self.logger.info("ACLP_TEST_FINISH_0305") @@ -1359,14 +1440,15 @@ class TestACLplugin(VppTestCase): # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ipv4 tcp") + self.apply_rules(rules, b"permit ipv4 tcp") + # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked self.etype_whitelist([0xbbb], 1) @@ -1375,7 +1457,7 @@ class TestACLplugin(VppTestCase): 0, False, True, 0x0bbb) # remove the whitelist, the previously blocked 0xAAAA should pass now - self.etype_whitelist([], 0, add=False) + self.etype_whitelist([], 0) self.logger.info("ACLP_TEST_FINISH_0306") @@ -1387,19 +1469,19 @@ class TestACLplugin(VppTestCase): # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) # Apply rules - self.apply_rules(rules, "permit ipv4 tcp") + self.apply_rules(rules, b"permit ipv4 tcp") # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked self.etype_whitelist([0xbbb], 1) # remove the whitelist, the previously blocked 0xAAAA should pass now - self.etype_whitelist([], 0, add=False) + self.etype_whitelist([], 0) # The whitelisted traffic, should pass self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], @@ -1415,9 +1497,9 @@ class TestACLplugin(VppTestCase): # Add an ACL rules = [] rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, - self.proto[self.IP][self.TCP])) + self.proto[self.IP][self.TCP])) # deny ip any any in the end rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) @@ -1426,13 +1508,12 @@ class TestACLplugin(VppTestCase): intf.append(VppLoInterface(self)) # Apply rules - self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index) + self.apply_rules_to(rules, b"permit ipv4 tcp", intf[0].sw_if_index) # Remove the interface intf[0].remove_vpp_config() self.logger.info("ACLP_TEST_FINISH_0315") - if __name__ == '__main__': unittest.main(testRunner=VppTestRunner) diff --git a/src/plugins/acl/test/test_acl_plugin_conns.py b/src/plugins/acl/test/test_acl_plugin_conns.py index 386992af2b0..f4cf5947043 100644 --- a/src/plugins/acl/test/test_acl_plugin_conns.py +++ b/src/plugins/acl/test/test_acl_plugin_conns.py @@ -36,23 +36,22 @@ def to_acl_rule(self, is_permit, wildcard_sport=False): rule_l4_sport_last = rule_l4_sport new_rule = { - 'is_permit': is_permit, - 'is_ipv6': p.haslayer(IPv6), - 'src_ip_addr': inet_pton(rule_family, - p[rule_l3_layer].src), - 'src_ip_prefix_len': rule_prefix_len, - 'dst_ip_addr': inet_pton(rule_family, - p[rule_l3_layer].dst), - 'dst_ip_prefix_len': rule_prefix_len, - 'srcport_or_icmptype_first': rule_l4_sport_first, - 'srcport_or_icmptype_last': rule_l4_sport_last, - 'dstport_or_icmpcode_first': rule_l4_dport, - 'dstport_or_icmpcode_last': rule_l4_dport, - 'proto': rule_l4_proto, - } + 'is_permit': is_permit, + 'is_ipv6': p.haslayer(IPv6), + 'src_ip_addr': inet_pton(rule_family, + p[rule_l3_layer].src), + 'src_ip_prefix_len': rule_prefix_len, + 'dst_ip_addr': inet_pton(rule_family, + p[rule_l3_layer].dst), + 'dst_ip_prefix_len': rule_prefix_len, + 'srcport_or_icmptype_first': rule_l4_sport_first, + 'srcport_or_icmptype_last': rule_l4_sport_last, + 'dstport_or_icmpcode_first': rule_l4_dport, + 'dstport_or_icmpcode_last': rule_l4_dport, + 'proto': rule_l4_proto, + } return new_rule - Packet.to_acl_rule = to_acl_rule @@ -92,36 +91,36 @@ class Conn(L4_Conn): if reflect_side == acl_side: self.testcase.vapi.acl_interface_set_acl_list( - self.ifs[acl_side].sw_if_index, 1, - [reflect_acl_index, + self.ifs[acl_side].sw_if_index, 1, + [reflect_acl_index, deny_acl_index]) self.testcase.vapi.acl_interface_set_acl_list( - self.ifs[1-acl_side].sw_if_index, 0, []) + self.ifs[1-acl_side].sw_if_index, 0, []) else: self.testcase.vapi.acl_interface_set_acl_list( - self.ifs[acl_side].sw_if_index, 1, - [deny_acl_index, + self.ifs[acl_side].sw_if_index, 1, + [deny_acl_index, reflect_acl_index]) self.testcase.vapi.acl_interface_set_acl_list( - self.ifs[1-acl_side].sw_if_index, 0, []) + self.ifs[1-acl_side].sw_if_index, 0, []) def wildcard_rule(self, is_permit): any_addr = ["0.0.0.0", "::"] rule_family = self.address_family is_ip6 = 1 if rule_family == AF_INET6 else 0 new_rule = { - 'is_permit': is_permit, - 'is_ipv6': is_ip6, - 'src_ip_addr': inet_pton(rule_family, any_addr[is_ip6]), - 'src_ip_prefix_len': 0, - 'dst_ip_addr': inet_pton(rule_family, any_addr[is_ip6]), - 'dst_ip_prefix_len': 0, - 'srcport_or_icmptype_first': 0, - 'srcport_or_icmptype_last': 65535, - 'dstport_or_icmpcode_first': 0, - 'dstport_or_icmpcode_last': 65535, - 'proto': 0, - } + 'is_permit': is_permit, + 'is_ipv6': is_ip6, + 'src_ip_addr': inet_pton(rule_family, any_addr[is_ip6]), + 'src_ip_prefix_len': 0, + 'dst_ip_addr': inet_pton(rule_family, any_addr[is_ip6]), + 'dst_ip_prefix_len': 0, + 'srcport_or_icmptype_first': 0, + 'srcport_or_icmptype_last': 65535, + 'dstport_or_icmpcode_first': 0, + 'dstport_or_icmpcode_last': 65535, + 'proto': 0, + } return new_rule diff --git a/src/plugins/acl/test/test_acl_plugin_l2l3.py b/src/plugins/acl/test/test_acl_plugin_l2l3.py index eaddd0f9bf4..3379871e0b9 100644 --- a/src/plugins/acl/test/test_acl_plugin_l2l3.py +++ b/src/plugins/acl/test/test_acl_plugin_l2l3.py @@ -23,12 +23,10 @@ """ -import copy import unittest from socket import inet_pton, AF_INET, AF_INET6 from random import choice, shuffle from pprint import pprint -from ipaddress import ip_network import scapy.compat from scapy.packet import Raw @@ -42,8 +40,6 @@ from framework import VppTestCase, VppTestRunner from vpp_l2 import L2_PORT_TYPE import time -from vpp_acl import AclRule, VppAcl, VppAclInterface - class TestACLpluginL2L3(VppTestCase): """TestACLpluginL2L3 Test Case""" @@ -263,26 +259,31 @@ class TestACLpluginL2L3(VppTestCase): else: rule_l4_proto = p[IP].proto - new_rule = AclRule(is_permit=is_permit, proto=rule_l4_proto, - src_prefix=ip_network( - (p[rule_l3_layer].src, rule_prefix_len)), - dst_prefix=ip_network( - (p[rule_l3_layer].dst, rule_prefix_len))) - new_rule.sport_from = rule_l4_sport - new_rule.sport_fo = rule_l4_sport - new_rule.dport_from = rule_l4_dport - new_rule.dport_to = rule_l4_dport - + new_rule = { + 'is_permit': is_permit, + 'is_ipv6': p.haslayer(IPv6), + 'src_ip_addr': inet_pton(rule_family, + p[rule_l3_layer].src), + 'src_ip_prefix_len': rule_prefix_len, + 'dst_ip_addr': inet_pton(rule_family, + p[rule_l3_layer].dst), + 'dst_ip_prefix_len': rule_prefix_len, + 'srcport_or_icmptype_first': rule_l4_sport, + 'srcport_or_icmptype_last': rule_l4_sport, + 'dstport_or_icmpcode_first': rule_l4_dport, + 'dstport_or_icmpcode_last': rule_l4_dport, + 'proto': rule_l4_proto, + } rules.append(new_rule) - new_rule_permit = copy.copy(new_rule) - new_rule_permit.is_permit = 1 + new_rule_permit = new_rule.copy() + new_rule_permit['is_permit'] = 1 permit_rules.append(new_rule_permit) - new_rule_permit_and_reflect = copy.copy(new_rule) + new_rule_permit_and_reflect = new_rule.copy() if can_reflect_this_packet: - new_rule_permit_and_reflect.is_permit = 2 + new_rule_permit_and_reflect['is_permit'] = 2 else: - new_rule_permit_and_reflect.is_permit = is_permit + new_rule_permit_and_reflect['is_permit'] = is_permit permit_and_reflect_rules.append(new_rule_permit_and_reflect) self.logger.info("create_stream pkt#%d: %s" % (i, payload)) @@ -355,70 +356,79 @@ class TestACLpluginL2L3(VppTestCase): # UDP: - def applied_acl_shuffle(self, acl_if): - saved_n_input = acl_if.n_input - # TOTO: maybe copy each one?? - saved_acls = acl_if.acls + def applied_acl_shuffle(self, sw_if_index): + # first collect what ACLs are applied and what they look like + r = self.vapi.acl_interface_list_dump(sw_if_index=sw_if_index) + orig_applied_acls = r[0] + + # we will collect these just to save and generate additional rulesets + orig_acls = [] + for acl_num in orig_applied_acls.acls: + rr = self.vapi.acl_dump(acl_num) + orig_acls.append(rr[0]) # now create a list of all the rules in all ACLs all_rules = [] - for old_acl in saved_acls: - for rule in old_acl.rules: - all_rules.append(rule) + for old_acl in orig_acls: + for rule in old_acl.r: + all_rules.append(dict(rule._asdict())) # Add a few ACLs made from shuffled rules shuffle(all_rules) - acl1 = VppAcl(self, rules=all_rules[::2], tag="shuffle 1. acl") - acl1.add_vpp_config() - + reply = self.vapi.acl_add_replace(acl_index=4294967295, + r=all_rules[::2], + tag=b"shuffle 1. acl") + shuffle_acl_1 = reply.acl_index shuffle(all_rules) - acl2 = VppAcl(self, rules=all_rules[::3], tag="shuffle 2. acl") - acl2.add_vpp_config() - + reply = self.vapi.acl_add_replace(acl_index=4294967295, + r=all_rules[::3], + tag=b"shuffle 2. acl") + shuffle_acl_2 = reply.acl_index shuffle(all_rules) - acl3 = VppAcl(self, rules=all_rules[::2], tag="shuffle 3. acl") - acl3.add_vpp_config() + reply = self.vapi.acl_add_replace(acl_index=4294967295, + r=all_rules[::2], + tag=b"shuffle 3. acl") + shuffle_acl_3 = reply.acl_index # apply the shuffle ACLs in front - input_acls = [acl1, acl2] - output_acls = [acl1, acl2] + input_acls = [shuffle_acl_1, shuffle_acl_2] + output_acls = [shuffle_acl_1, shuffle_acl_2] # add the currently applied ACLs - n_input = acl_if.n_input - input_acls.extend(saved_acls[:n_input]) - output_acls.extend(saved_acls[n_input:]) + n_input = orig_applied_acls.n_input + input_acls.extend(orig_applied_acls.acls[:n_input]) + output_acls.extend(orig_applied_acls.acls[n_input:]) # and the trailing shuffle ACL(s) - input_acls.extend([acl3]) - output_acls.extend([acl3]) + input_acls.extend([shuffle_acl_3]) + output_acls.extend([shuffle_acl_3]) # set the interface ACL list to the result - acl_if.n_input = len(input_acls) - acl_if.acls = input_acls + output_acls - acl_if.add_vpp_config() - + self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index, + n_input=len(input_acls), + acls=input_acls + output_acls) # change the ACLs a few times for i in range(1, 10): shuffle(all_rules) - acl1.rules = all_rules[::1+(i % 2)] - acl1.add_vpp_config() - + reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_1, + r=all_rules[::1+(i % 2)], + tag=b"shuffle 1. acl") shuffle(all_rules) - acl2.rules = all_rules[::1+(i % 3)] - acl2.add_vpp_config() - + reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2, + r=all_rules[::1+(i % 3)], + tag=b"shuffle 2. acl") shuffle(all_rules) - acl3.rules = all_rules[::1+(i % 5)] - acl3.add_vpp_config() + reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2, + r=all_rules[::1+(i % 5)], + tag=b"shuffle 3. acl") # restore to how it was before and clean up - acl_if.n_input = saved_n_input - acl_if.acls = saved_acls - acl_if.add_vpp_config() - - acl1.remove_vpp_config() - acl2.remove_vpp_config() - acl3.remove_vpp_config() + self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index, + n_input=orig_applied_acls.n_input, + acls=orig_applied_acls.acls) + reply = self.vapi.acl_del(acl_index=shuffle_acl_1) + reply = self.vapi.acl_del(acl_index=shuffle_acl_2) + reply = self.vapi.acl_del(acl_index=shuffle_acl_3) def create_acls_for_a_stream(self, stream_dict, test_l2_action, is_reflect): @@ -426,14 +436,15 @@ class TestACLpluginL2L3(VppTestCase): r_permit = stream_dict['permit_rules'] r_permit_reflect = stream_dict['permit_and_reflect_rules'] r_action = r_permit_reflect if is_reflect else r - action_acl = VppAcl(self, rules=r_action, tag="act. acl") - action_acl.add_vpp_config() - permit_acl = VppAcl(self, rules=r_permit, tag="perm. acl") - permit_acl.add_vpp_config() - - return {'L2': action_acl if test_l2_action else permit_acl, - 'L3': permit_acl if test_l2_action else action_acl, - 'permit': permit_acl, 'action': action_acl} + reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_action, + tag=b"act. acl") + action_acl_index = reply.acl_index + reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_permit, + tag=b"perm. acl") + permit_acl_index = reply.acl_index + return {'L2': action_acl_index if test_l2_action else permit_acl_index, + 'L3': permit_acl_index if test_l2_action else action_acl_index, + 'permit': permit_acl_index, 'action': action_acl_index} def apply_acl_ip46_x_to_y(self, bridged_to_routed, test_l2_deny, is_ip6, is_reflect, add_eh): @@ -441,30 +452,26 @@ class TestACLpluginL2L3(VppTestCase): """ self.reset_packet_infos() stream_dict = self.create_stream( - self.pg2, self.loop0, - bridged_to_routed, - self.pg_if_packet_sizes, is_ip6, - not is_reflect, False, add_eh) + self.pg2, self.loop0, + bridged_to_routed, + self.pg_if_packet_sizes, is_ip6, + not is_reflect, False, add_eh) stream = stream_dict['stream'] acl_idx = self.create_acls_for_a_stream(stream_dict, test_l2_deny, is_reflect) n_input_l3 = 0 if bridged_to_routed else 1 n_input_l2 = 1 if bridged_to_routed else 0 - - acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index, - n_input=n_input_l3, acls=[acl_idx['L3']]) - acl_if_pg2.add_vpp_config() - - acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index, - n_input=n_input_l2, acls=[acl_idx['L2']]) - acl_if_pg0.add_vpp_config() - - acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index, - n_input=n_input_l2, acls=[acl_idx['L2']]) - acl_if_pg1.add_vpp_config() - - self.applied_acl_shuffle(acl_if_pg0) - self.applied_acl_shuffle(acl_if_pg1) + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index, + n_input=n_input_l3, + acls=[acl_idx['L3']]) + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, + n_input=n_input_l2, + acls=[acl_idx['L2']]) + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index, + n_input=n_input_l2, + acls=[acl_idx['L2']]) + self.applied_acl_shuffle(self.pg0.sw_if_index) + self.applied_acl_shuffle(self.pg2.sw_if_index) return {'L2': acl_idx['L2'], 'L3': acl_idx['L3']} def apply_acl_ip46_both_directions_reflect(self, @@ -509,23 +516,20 @@ class TestACLpluginL2L3(VppTestCase): else: outbound_l3_acl = acl_idx_rev['L3'] - acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index, - n_input=1, - acls=[inbound_l3_acl, outbound_l3_acl]) - acl_if_pg2.add_vpp_config() - - acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index, - n_input=1, - acls=[inbound_l2_acl, outbound_l2_acl]) - acl_if_pg0.add_vpp_config() - - acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index, - n_input=1, - acls=[inbound_l2_acl, outbound_l2_acl]) - acl_if_pg1.add_vpp_config() - - self.applied_acl_shuffle(acl_if_pg0) - self.applied_acl_shuffle(acl_if_pg2) + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index, + n_input=1, + acls=[inbound_l3_acl, + outbound_l3_acl]) + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, + n_input=1, + acls=[inbound_l2_acl, + outbound_l2_acl]) + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index, + n_input=1, + acls=[inbound_l2_acl, + outbound_l2_acl]) + self.applied_acl_shuffle(self.pg0.sw_if_index) + self.applied_acl_shuffle(self.pg2.sw_if_index) def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6, is_reflect, add_eh): @@ -590,7 +594,7 @@ class TestACLpluginL2L3(VppTestCase): pkts = self.run_traffic_ip46_routed_to_bridged(test_l2_deny, is_ip6, is_reflect, False, add_eh) - self.verify_acl_packet_count(acls['L3'].acl_index, pkts) + self.verify_acl_packet_count(acls['L3'], pkts) def run_test_ip46_bridged_to_routed(self, test_l2_deny, is_ip6, is_reflect, add_eh): @@ -600,7 +604,7 @@ class TestACLpluginL2L3(VppTestCase): pkts = self.run_traffic_ip46_bridged_to_routed(test_l2_deny, is_ip6, is_reflect, False, add_eh) - self.verify_acl_packet_count(acls['L2'].acl_index, pkts) + self.verify_acl_packet_count(acls['L2'], pkts) def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action, is_ip6, add_eh, diff --git a/src/plugins/acl/test/test_acl_plugin_macip.py b/src/plugins/acl/test/test_acl_plugin_macip.py index 03ac16d4e12..0f178a36b69 100644 --- a/src/plugins/acl/test/test_acl_plugin_macip.py +++ b/src/plugins/acl/test/test_acl_plugin_macip.py @@ -9,7 +9,6 @@ from socket import inet_ntop, inet_pton, AF_INET, AF_INET6 from struct import pack, unpack import re import unittest -from ipaddress import ip_network, IPv4Network, IPv6Network import scapy.compat from scapy.packet import Raw @@ -22,9 +21,6 @@ from vpp_lo_interface import VppLoInterface from vpp_l2 import L2_PORT_TYPE from vpp_sub_interface import L2_VTR_OP, VppSubInterface, VppDot1QSubint, \ VppDot1ADSubint -from vpp_acl import AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist, \ - VppMacipAclInterface, VppMacipAcl, MacipRule -from vpp_papi import MACAddress class MethodHolder(VppTestCase): @@ -76,10 +72,10 @@ class MethodHolder(VppTestCase): # create 2 subinterfaces cls.subifs = [ - VppDot1QSubint(cls, cls.pg1, 10), - VppDot1ADSubint(cls, cls.pg2, 20, 300, 400), - VppDot1QSubint(cls, cls.pg3, 30), - VppDot1ADSubint(cls, cls.pg3, 40, 600, 700)] + VppDot1QSubint(cls, cls.pg1, 10), + VppDot1ADSubint(cls, cls.pg2, 20, 300, 400), + VppDot1QSubint(cls, cls.pg3, 30), + VppDot1ADSubint(cls, cls.pg3, 40, 600, 700)] cls.subifs[0].set_vtr(L2_VTR_OP.L2_POP_1, inner=10, push1q=1) @@ -162,6 +158,11 @@ class MethodHolder(VppTestCase): def setUp(self): super(MethodHolder, self).setUp() self.reset_packet_infos() + del self.ACLS[:] + + def tearDown(self): + super(MethodHolder, self).tearDown() + self.delete_acls() def show_commands_at_teardown(self): self.logger.info(self.vapi.ppcli("show interface address")) @@ -181,21 +182,19 @@ class MethodHolder(VppTestCase): acls = self.vapi.macip_acl_dump() if self.DEBUG: for acl in acls: - # print("ACL #"+str(acl.acl_index)) + print("ACL #"+str(acl.acl_index)) for r in acl.r: rule = "ACTION" if r.is_permit == 1: rule = "PERMIT" elif r.is_permit == 0: rule = "DENY " - """ print(" IP6" if r.is_ipv6 else " IP4", rule, binascii.hexlify(r.src_mac), binascii.hexlify(r.src_mac_mask), unpack('<16B', r.src_ip_addr), r.src_ip_prefix_len) - """ return acls def create_rules(self, mac_type=EXACT_MAC, ip_type=EXACT_IP, @@ -253,16 +252,19 @@ class MethodHolder(VppTestCase): elif ip_type == self.SUBNET_IP: ip4[2] = random.randint(100, 200) ip4[3] = 0 - ip6[7] = random.randint(100, 200) + ip6[8] = random.randint(100, 200) ip6[15] = 0 ip_pack = b'' for j in range(0, len(ip)): ip_pack += pack('<B', int(ip[j])) - rule = MacipRule(is_permit=self.PERMIT, - src_prefix=ip_network((ip_pack, ip_len)), - src_mac=MACAddress(mac).packed, - src_mac_mask=MACAddress(mask).packed) + rule = ({'is_permit': self.PERMIT, + 'is_ipv6': is_ip6, + 'src_ip_addr': ip_pack, + 'src_ip_prefix_len': ip_len, + 'src_mac': binascii.unhexlify(mac.replace(':', '')), + 'src_mac_mask': binascii.unhexlify( + mask.replace(':', ''))}) rules.append(rule) if ip_type == self.WILD_IP: break @@ -272,12 +274,10 @@ class MethodHolder(VppTestCase): return acls def apply_macip_rules(self, acls): - macip_acls = [] for acl in acls: - macip_acl = VppMacipAcl(self, rules=acl) - macip_acl.add_vpp_config() - macip_acls.append(macip_acl) - return macip_acls + reply = self.vapi.macip_acl_add(acl) + self.assertEqual(reply.retval, 0) + self.ACLS.append(reply.acl_index) def verify_macip_acls(self, acl_count, rules_count, expected_count=2): reply = self.macip_acl_dump_debug() @@ -292,6 +292,20 @@ class MethodHolder(VppTestCase): reply = self.vapi.macip_acl_interface_get() self.assertEqual(reply.count, expected_count) + def delete_acls(self): + for acl in range(len(self.ACLS)-1, -1, -1): + self.vapi.macip_acl_del(self.ACLS[acl]) + + reply = self.vapi.macip_acl_dump() + self.assertEqual(len(reply), 0) + + intf_acls = self.vapi.acl_interface_list_dump() + for i_a in intf_acls: + sw_if_index = i_a.sw_if_index + for acl_index in i_a.acls: + self.vapi.acl_interface_add_del(sw_if_index, acl_index, 0) + self.vapi.acl_del(acl_index) + def create_stream(self, mac_type, ip_type, packet_count, src_if, dst_if, traffic, is_ip6, tags=PERMIT_TAGS): # exact MAC and exact IP @@ -512,47 +526,48 @@ class MethodHolder(VppTestCase): else: rule_l4_proto = packet[IP].proto - src_network = ip_network( - (packet[rule_l3_layer].src, rule_prefix_len)) - dst_network = ip_network( - (packet[rule_l3_layer].dst, rule_prefix_len)) - acl_rule = AclRule(is_permit=is_permit, proto=rule_l4_proto, - src_prefix=src_network, - dst_prefix=dst_network) - acl_rule.sport_from = rule_l4_sport - acl_rule.sport_to = rule_l4_sport - acl_rule.dport_from = rule_l4_dport - acl_rule.dport_to = rule_l4_dport + acl_rule = { + 'is_permit': is_permit, + 'is_ipv6': is_ip6, + 'src_ip_addr': inet_pton(rule_family, + packet[rule_l3_layer].src), + 'src_ip_prefix_len': rule_prefix_len, + 'dst_ip_addr': inet_pton(rule_family, + packet[rule_l3_layer].dst), + 'dst_ip_prefix_len': rule_prefix_len, + 'srcport_or_icmptype_first': rule_l4_sport, + 'srcport_or_icmptype_last': rule_l4_sport, + 'dstport_or_icmpcode_first': rule_l4_dport, + 'dstport_or_icmpcode_last': rule_l4_dport, + 'proto': rule_l4_proto} acl_rules.append(acl_rule) if mac_type == self.WILD_MAC and ip_type == self.WILD_IP and p > 0: continue if is_permit: - macip_rule = MacipRule( - is_permit=is_permit, - src_prefix=ip_network( - (ip_rule, prefix_len)), - src_mac=MACAddress(mac_rule).packed, - src_mac_mask=MACAddress(mac_mask).packed) + macip_rule = ({ + 'is_permit': is_permit, + 'is_ipv6': is_ip6, + 'src_ip_addr': ip_rule, + 'src_ip_prefix_len': prefix_len, + 'src_mac': binascii.unhexlify(mac_rule.replace(':', '')), + 'src_mac_mask': binascii.unhexlify( + mac_mask.replace(':', ''))}) macip_rules.append(macip_rule) # deny all other packets if not (mac_type == self.WILD_MAC and ip_type == self.WILD_IP): - network = IPv6Network((0, 0)) if is_ip6 else IPv4Network((0, 0)) - macip_rule = MacipRule( - is_permit=0, - src_prefix=network, - src_mac=MACAddress("00:00:00:00:00:00").packed, - src_mac_mask=MACAddress("00:00:00:00:00:00").packed) + macip_rule = ({'is_permit': 0, + 'is_ipv6': is_ip6, + 'src_ip_addr': "", + 'src_ip_prefix_len': 0, + 'src_mac': "", + 'src_mac_mask': ""}) macip_rules.append(macip_rule) - network = IPv6Network((0, 0)) if is_ip6 else IPv4Network((0, 0)) - acl_rule = AclRule(is_permit=0, src_prefix=network, dst_prefix=network) - acl_rule.sport_from = 0 - acl_rule.sport_to = 0 - acl_rule.dport_from = 0 - acl_rule.dport_to = 0 + acl_rule = {'is_permit': 0, + 'is_ipv6': is_ip6} acl_rules.append(acl_rule) return {'stream': packets, 'macip_rules': macip_rules, @@ -629,34 +644,36 @@ class MethodHolder(VppTestCase): if apply_rules: if isMACIP: - self.acl = VppMacipAcl(self, rules=test_dict['macip_rules']) + reply = self.vapi.macip_acl_add(test_dict['macip_rules']) else: - self.acl = VppAcl(self, rules=test_dict['acl_rules']) - self.acl.add_vpp_config() + reply = self.vapi.acl_add_replace(acl_index=4294967295, + r=test_dict['acl_rules']) + self.assertEqual(reply.retval, 0) + acl_index = reply.acl_index if isMACIP: - self.acl_if = VppMacipAclInterface( - self, sw_if_index=tx_if.sw_if_index, acls=[self.acl]) - self.acl_if.add_vpp_config() - - dump = self.acl_if.dump() - self.assertTrue(dump) - self.assertEqual(dump[0].acls[0], self.acl.acl_index) + self.vapi.macip_acl_interface_add_del( + sw_if_index=tx_if.sw_if_index, + acl_index=acl_index) + reply = self.vapi.macip_acl_interface_get() + self.assertEqual(reply.acls[tx_if.sw_if_index], acl_index) + self.ACLS.append(reply.acls[tx_if.sw_if_index]) else: - self.acl_if = VppAclInterface( - self, sw_if_index=tx_if.sw_if_index, n_input=1, - acls=[self.acl]) - self.acl_if.add_vpp_config() + self.vapi.acl_interface_add_del( + sw_if_index=tx_if.sw_if_index, acl_index=acl_index) else: - if hasattr(self, "acl_if"): - self.acl_if.remove_vpp_config() - if try_replace and hasattr(self, "acl"): + self.vapi.macip_acl_interface_add_del( + sw_if_index=tx_if.sw_if_index, + acl_index=0) + if try_replace: if isMACIP: - self.acl.rules = test_dict['macip_rules'] - self.acl.add_vpp_config() + reply = self.vapi.macip_acl_add_replace( + test_dict['macip_rules'], + acl_index) else: - self.acl.rules = test_dict['acl_rules'] - self.acl.add_vpp_config() + reply = self.vapi.acl_add_replace(acl_index=acl_index, + r=test_dict['acl_rules']) + self.assertEqual(reply.retval, 0) if not isinstance(src_if, VppSubInterface): tx_if.add_stream(test_dict['stream']) @@ -676,10 +693,9 @@ class MethodHolder(VppTestCase): self.get_packet_count_for_if_idx(dst_if.sw_if_index)) self.verify_capture(test_dict['stream'], capture, is_ip6) if not isMACIP: - if hasattr(self, "acl_if"): - self.acl_if.remove_vpp_config() - if hasattr(self, "acl"): - self.acl.remove_vpp_config() + self.vapi.acl_interface_add_del(sw_if_index=tx_if.sw_if_index, + acl_index=acl_index, is_add=0) + self.vapi.acl_del(acl_index) def run_test_acls(self, mac_type, ip_type, acl_count, rules_count, traffic=None, ip=None): @@ -1061,15 +1077,17 @@ class TestMACIP(MethodHolder): r1 = self.create_rules(acl_count=3, rules_count=[2, 2, 2]) r2 = self.create_rules(mac_type=self.OUI_MAC, ip_type=self.SUBNET_IP) - macip_acls = self.apply_macip_rules(r1) + self.apply_macip_rules(r1) acls_before = self.macip_acl_dump_debug() # replace acls #2, #3 with new - macip_acls[2].rules = r2[0] - macip_acls[2].add_vpp_config() - macip_acls[3].rules = r2[1] - macip_acls[3].add_vpp_config() + reply = self.vapi.macip_acl_add_replace(r2[0], 2) + self.assertEqual(reply.retval, 0) + self.assertEqual(reply.acl_index, 2) + reply = self.vapi.macip_acl_add_replace(r2[1], 3) + self.assertEqual(reply.retval, 0) + self.assertEqual(reply.acl_index, 3) acls_after = self.macip_acl_dump_debug() @@ -1100,25 +1118,21 @@ class TestMACIP(MethodHolder): intf_count = len(self.interfaces)+1 intf = [] - macip_alcs = self.apply_macip_rules( - self.create_rules(acl_count=3, rules_count=[3, 5, 4])) + self.apply_macip_rules(self.create_rules(acl_count=3, + rules_count=[3, 5, 4])) intf.append(VppLoInterface(self)) intf.append(VppLoInterface(self)) sw_if_index0 = intf[0].sw_if_index - macip_acl_if0 = VppMacipAclInterface( - self, sw_if_index=sw_if_index0, acls=[macip_alcs[1]]) - macip_acl_if0.add_vpp_config() + self.vapi.macip_acl_interface_add_del(sw_if_index0, 1) reply = self.vapi.macip_acl_interface_get() self.assertEqual(reply.count, intf_count+1) self.assertEqual(reply.acls[sw_if_index0], 1) sw_if_index1 = intf[1].sw_if_index - macip_acl_if1 = VppMacipAclInterface( - self, sw_if_index=sw_if_index1, acls=[macip_alcs[0]]) - macip_acl_if1.add_vpp_config() + self.vapi.macip_acl_interface_add_del(sw_if_index1, 0) reply = self.vapi.macip_acl_interface_get() self.assertEqual(reply.count, intf_count+2) @@ -1134,12 +1148,8 @@ class TestMACIP(MethodHolder): intf.append(VppLoInterface(self)) sw_if_index2 = intf[2].sw_if_index sw_if_index3 = intf[3].sw_if_index - macip_acl_if2 = VppMacipAclInterface( - self, sw_if_index=sw_if_index2, acls=[macip_alcs[1]]) - macip_acl_if2.add_vpp_config() - macip_acl_if3 = VppMacipAclInterface( - self, sw_if_index=sw_if_index3, acls=[macip_alcs[1]]) - macip_acl_if3.add_vpp_config() + self.vapi.macip_acl_interface_add_del(sw_if_index2, 1) + self.vapi.macip_acl_interface_add_del(sw_if_index3, 1) reply = self.vapi.macip_acl_interface_get() self.assertEqual(reply.count, intf_count+3) diff --git a/src/plugins/acl/test/test_classify_l2_acl.py b/src/plugins/acl/test/test_classify_l2_acl.py index b1309881e58..0cba6c83cba 100644 --- a/src/plugins/acl/test/test_classify_l2_acl.py +++ b/src/plugins/acl/test/test_classify_l2_acl.py @@ -603,6 +603,5 @@ class TestClassifyAcl(TestClassifier): self.acl_active_table = key self.run_verify_test(self.IP, self.IPV4, -1) - if __name__ == '__main__': unittest.main(testRunner=VppTestRunner) diff --git a/src/plugins/acl/test/vpp_acl.py b/src/plugins/acl/test/vpp_acl.py deleted file mode 100644 index d4ed1674bf9..00000000000 --- a/src/plugins/acl/test/vpp_acl.py +++ /dev/null @@ -1,460 +0,0 @@ -from ipaddress import IPv4Network - -from vpp_object import VppObject -from vpp_papi import VppEnum -from vpp_ip import INVALID_INDEX -from vpp_papi_provider import UnexpectedApiReturnValueError - - -class VppAclPlugin(VppObject): - - def __init__(self, test, enable_intf_counters=False): - self._test = test - self.enable_intf_counters = enable_intf_counters - - @property - def enable_intf_counters(self): - return self._enable_intf_counters - - @enable_intf_counters.setter - def enable_intf_counters(self, enable): - self.vapi.acl_stats_intf_counters_enable(enable=enable) - - def add_vpp_config(self): - pass - - def remove_vpp_config(self): - pass - - def query_vpp_config(self): - pass - - def object_id(self): - return ("acl-plugin-%d" % (self._sw_if_index)) - - -class AclRule(): - """ ACL Rule """ - - # port ranges - PORTS_ALL = -1 - PORTS_RANGE = 0 - PORTS_RANGE_2 = 1 - udp_sport_from = 10 - udp_sport_to = udp_sport_from + 5 - udp_dport_from = 20000 - udp_dport_to = udp_dport_from + 5000 - tcp_sport_from = 30 - tcp_sport_to = tcp_sport_from + 5 - tcp_dport_from = 40000 - tcp_dport_to = tcp_dport_from + 5000 - - udp_sport_from_2 = 90 - udp_sport_to_2 = udp_sport_from_2 + 5 - udp_dport_from_2 = 30000 - udp_dport_to_2 = udp_dport_from_2 + 5000 - tcp_sport_from_2 = 130 - tcp_sport_to_2 = tcp_sport_from_2 + 5 - tcp_dport_from_2 = 20000 - tcp_dport_to_2 = tcp_dport_from_2 + 5000 - - icmp4_type = 8 # echo request - icmp4_code = 3 - icmp6_type = 128 # echo request - icmp6_code = 3 - - icmp4_type_2 = 8 - icmp4_code_from_2 = 5 - icmp4_code_to_2 = 20 - icmp6_type_2 = 128 - icmp6_code_from_2 = 8 - icmp6_code_to_2 = 42 - - def __init__(self, is_permit, src_prefix=IPv4Network('0.0.0.0/0'), - dst_prefix=IPv4Network('0.0.0.0/0'), - proto=0, ports=PORTS_ALL): - self.is_permit = is_permit - self.src_prefix = src_prefix - self.dst_prefix = dst_prefix - self._proto = proto - self._ports = ports - self.sport_from = 0 - self.sport_to = 0 - self.dport_from = 0 - self.dport_to = 0 - self.update_ports() - - def __copy__(self): - """ - ports are assigned implicitly based on _proto and _ports values, - so we need to set them manually in case they were user defined - """ - new_rule = AclRule(self.is_permit, self.src_prefix, self.dst_prefix, - self._proto, self._ports) - new_rule.sport_from = self.sport_from - new_rule.sport_to = self.sport_to - new_rule.dport_from = self.dport_from - new_rule.dport_to = self.dport_to - return new_rule - - def update_ports(self): - if self._ports == self.PORTS_ALL: - self.sport_from = 0 - self.dport_from = 0 - self.sport_to = 65535 - if self._proto == 1 or self._proto == 58: - self.sport_to = 255 - self.dport_to = self.sport_to - elif self._ports == self.PORTS_RANGE: - if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP: - self.sport_from = self.icmp4_type - self.sport_to = self.icmp4_type - self.dport_from = self.icmp4_code - self.dport_to = self.icmp4_code - elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6: - self.sport_from = self.icmp6_type - self.sport_to = self.icmp6_type - self.dport_from = self.icmp6_code - self.dport_to = self.icmp6_code - elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP: - self.sport_from = self.tcp_sport_from - self.sport_to = self.tcp_sport_to - self.dport_from = self.tcp_dport_from - self.dport_to = self.tcp_dport_to - elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP: - self.sport_from = self.udp_sport_from - self.sport_to = self.udp_sport_to - self.dport_from = self.udp_dport_from - self.dport_to = self.udp_dport_to - elif self._ports == self.PORTS_RANGE_2: - if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP: - self.sport_from = self.icmp4_type_2 - self.sport_to = self.icmp4_type_2 - self.dport_from = self.icmp4_code_from_2 - self.dport_to = self.icmp4_code_to_2 - elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6: - self.sport_from = self.icmp6_type_2 - self.sport_to = self.icmp6_type_2 - self.dport_from = self.icmp6_code_from_2 - self.dport_to = self.icmp6_code_to_2 - elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP: - self.sport_from = self.tcp_sport_from_2 - self.sport_to = self.tcp_sport_to_2 - self.dport_from = self.tcp_dport_from_2 - self.dport_to = self.tcp_dport_to_2 - elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP: - self.sport_from = self.udp_sport_from_2 - self.sport_to = self.udp_sport_to_2 - self.dport_from = self.udp_dport_from_2 - self.dport_to = self.udp_dport_to_2 - else: - self.sport_from = self._ports - self.sport_to = self._ports - self.dport_from = self._ports - self.dport_to = self._ports - - @property - def proto(self): - return self._proto - - @proto.setter - def proto(self, proto): - self._proto = proto - self.update_ports() - - @property - def ports(self): - return self._ports - - @ports.setter - def ports(self, ports): - self._ports = ports - self.update_ports() - - def encode(self): - return {'is_permit': self.is_permit, 'proto': self.proto, - 'srcport_or_icmptype_first': self.sport_from, - 'srcport_or_icmptype_last': self.sport_to, - 'src_prefix': self.src_prefix, - 'dstport_or_icmpcode_first': self.dport_from, - 'dstport_or_icmpcode_last': self.dport_to, - 'dst_prefix': self.dst_prefix} - - -class VppAcl(VppObject): - """ VPP ACL """ - - def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None): - self._test = test - self._acl_index = acl_index - self.tag = tag - self.rules = rules - - @property - def acl_index(self): - return self._acl_index - - @property - def count(self): - return len(self.rules) - - def encode_rules(self): - rules = [] - for rule in self.rules: - rules.append(rule.encode()) - return rules - - def add_vpp_config(self, expect_error=False): - try: - reply = self._test.vapi.acl_add_replace( - acl_index=self._acl_index, tag=self.tag, count=self.count, - r=self.encode_rules()) - self._acl_index = reply.acl_index - self._test.registry.register(self, self._test.logger) - if expect_error: - self._test.fail("Unexpected api reply") - return self - except UnexpectedApiReturnValueError: - if not expect_error: - self._test.fail("Unexpected api reply") - return None - - def remove_vpp_config(self, expect_error=False): - try: - self._test.vapi.acl_del(acl_index=self._acl_index) - if expect_error: - self._test.fail("Unexpected api reply") - except UnexpectedApiReturnValueError: - if not expect_error: - self._test.fail("Unexpected api reply") - - def dump(self): - return self._test.vapi.acl_dump(acl_index=self._acl_index) - - def query_vpp_config(self): - dump = self.dump() - for rule in dump: - if rule.acl_index == self._acl_index: - return True - return False - - def object_id(self): - return ("acl-%s-%d" % (self.tag, self._acl_index)) - - -class VppEtypeWhitelist(VppObject): - """ VPP Etype Whitelist """ - - def __init__(self, test, sw_if_index, whitelist, n_input=0): - self._test = test - self.whitelist = whitelist - self.n_input = n_input - self._sw_if_index = sw_if_index - - @property - def sw_if_index(self): - return self._sw_if_index - - @property - def count(self): - return len(self.whitelist) - - def add_vpp_config(self): - self._test.vapi.acl_interface_set_etype_whitelist( - sw_if_index=self._sw_if_index, count=self.count, - n_input=self.n_input, whitelist=self.whitelist) - self._test.registry.register(self, self._test.logger) - return self - - def remove_vpp_config(self): - self._test.vapi.acl_interface_set_etype_whitelist( - sw_if_index=self._sw_if_index, count=0, n_input=0, whitelist=[]) - - def query_vpp_config(self): - self._test.vapi.acl_interface_etype_whitelist_dump( - sw_if_index=self._sw_if_index) - return False - - def object_id(self): - return ("acl-etype_wl-%d" % (self._sw_if_index)) - - -class VppAclInterface(VppObject): - """ VPP ACL Interface """ - - def __init__(self, test, sw_if_index, acls, n_input=0): - self._test = test - self._sw_if_index = sw_if_index - self.n_input = n_input - self.acls = acls - - @property - def sw_if_index(self): - return self._sw_if_index - - @property - def count(self): - return len(self.acls) - - def encode_acls(self): - acls = [] - for acl in self.acls: - acls.append(acl.acl_index) - return acls - - def add_vpp_config(self, expect_error=False): - try: - reply = self._test.vapi.acl_interface_set_acl_list( - sw_if_index=self._sw_if_index, n_input=self.n_input, - count=self.count, acls=self.encode_acls()) - self._test.registry.register(self, self._test.logger) - if expect_error: - self._test.fail("Unexpected api reply") - return self - except UnexpectedApiReturnValueError: - if not expect_error: - self._test.fail("Unexpected api reply") - return None - - def remove_vpp_config(self, expect_error=False): - try: - reply = self._test.vapi.acl_interface_set_acl_list( - sw_if_index=self._sw_if_index, n_input=0, count=0, acls=[]) - if expect_error: - self._test.fail("Unexpected api reply") - except UnexpectedApiReturnValueError: - if not expect_error: - self._test.fail("Unexpected api reply") - - def query_vpp_config(self): - dump = self._test.vapi.acl_interface_list_dump( - sw_if_index=self._sw_if_index) - for acl_list in dump: - if acl_list.count > 0: - return True - return False - - def object_id(self): - return ("acl-if-list-%d" % (self._sw_if_index)) - - -class MacipRule(): - """ Mac Ip rule """ - - def __init__(self, is_permit, src_mac=0, src_mac_mask=0, - src_prefix=IPv4Network('0.0.0.0/0')): - self.is_permit = is_permit - self.src_mac = src_mac - self.src_mac_mask = src_mac_mask - self.src_prefix = src_prefix - - def encode(self): - return {'is_permit': self.is_permit, 'src_mac': self.src_mac, - 'src_mac_mask': self.src_mac_mask, - 'src_prefix': self.src_prefix} - - -class VppMacipAcl(VppObject): - """ Vpp Mac Ip ACL """ - - def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None): - self._test = test - self._acl_index = acl_index - self.tag = tag - self.rules = rules - - @property - def acl_index(self): - return self._acl_index - - @property - def count(self): - return len(self.rules) - - def encode_rules(self): - rules = [] - for rule in self.rules: - rules.append(rule.encode()) - return rules - - def add_vpp_config(self, expect_error=False): - try: - reply = self._test.vapi.macip_acl_add_replace( - acl_index=self._acl_index, tag=self.tag, count=self.count, - r=self.encode_rules()) - self._acl_index = reply.acl_index - self._test.registry.register(self, self._test.logger) - if expect_error: - self._test.fail("Unexpected api reply") - return self - except UnexpectedApiReturnValueError: - if not expect_error: - self._test.fail("Unexpected api reply") - return None - - def remove_vpp_config(self, expect_error=False): - try: - self._test.vapi.macip_acl_del(acl_index=self._acl_index) - if expect_error: - self._test.fail("Unexpected api reply") - except UnexpectedApiReturnValueError: - if not expect_error: - self._test.fail("Unexpected api reply") - - def dump(self): - return self._test.vapi.macip_acl_dump(acl_index=self._acl_index) - - def query_vpp_config(self): - dump = self.dump() - for rule in dump: - if rule.acl_index == self._acl_index: - return True - return False - - def object_id(self): - return ("macip-acl-%s-%d" % (self.tag, self._acl_index)) - - -class VppMacipAclInterface(VppObject): - """ VPP Mac Ip ACL Interface """ - - def __init__(self, test, sw_if_index, acls): - self._test = test - self._sw_if_index = sw_if_index - self.acls = acls - - @property - def sw_if_index(self): - return self._sw_if_index - - @property - def count(self): - return len(self.acls) - - def add_vpp_config(self): - for acl in self.acls: - self._test.vapi.macip_acl_interface_add_del( - is_add=True, sw_if_index=self._sw_if_index, - acl_index=acl.acl_index) - self._test.registry.register(self, self._test.logger) - - def remove_vpp_config(self): - for acl in self.acls: - self._test.vapi.macip_acl_interface_add_del( - is_add=False, sw_if_index=self._sw_if_index, - acl_index=acl.acl_index) - - def dump(self): - return self._test.vapi.macip_acl_interface_list_dump( - sw_if_index=self._sw_if_index) - - def query_vpp_config(self): - dump = self.dump() - for acl_list in dump: - for acl_index in acl_list.acls: - if acl_index != INVALID_INDEX: - return True - return False - - def object_id(self): - return ("macip-acl-if-list-%d" % (self._sw_if_index)) |