diff options
Diffstat (limited to 'src/plugins/acl/test/test_acl_plugin_l2l3.py')
-rw-r--r-- | src/plugins/acl/test/test_acl_plugin_l2l3.py | 216 |
1 files changed, 110 insertions, 106 deletions
diff --git a/src/plugins/acl/test/test_acl_plugin_l2l3.py b/src/plugins/acl/test/test_acl_plugin_l2l3.py index eaddd0f9bf4..3379871e0b9 100644 --- a/src/plugins/acl/test/test_acl_plugin_l2l3.py +++ b/src/plugins/acl/test/test_acl_plugin_l2l3.py @@ -23,12 +23,10 @@ """ -import copy import unittest from socket import inet_pton, AF_INET, AF_INET6 from random import choice, shuffle from pprint import pprint -from ipaddress import ip_network import scapy.compat from scapy.packet import Raw @@ -42,8 +40,6 @@ from framework import VppTestCase, VppTestRunner from vpp_l2 import L2_PORT_TYPE import time -from vpp_acl import AclRule, VppAcl, VppAclInterface - class TestACLpluginL2L3(VppTestCase): """TestACLpluginL2L3 Test Case""" @@ -263,26 +259,31 @@ class TestACLpluginL2L3(VppTestCase): else: rule_l4_proto = p[IP].proto - new_rule = AclRule(is_permit=is_permit, proto=rule_l4_proto, - src_prefix=ip_network( - (p[rule_l3_layer].src, rule_prefix_len)), - dst_prefix=ip_network( - (p[rule_l3_layer].dst, rule_prefix_len))) - new_rule.sport_from = rule_l4_sport - new_rule.sport_fo = rule_l4_sport - new_rule.dport_from = rule_l4_dport - new_rule.dport_to = rule_l4_dport - + new_rule = { + 'is_permit': is_permit, + 'is_ipv6': p.haslayer(IPv6), + 'src_ip_addr': inet_pton(rule_family, + p[rule_l3_layer].src), + 'src_ip_prefix_len': rule_prefix_len, + 'dst_ip_addr': inet_pton(rule_family, + p[rule_l3_layer].dst), + 'dst_ip_prefix_len': rule_prefix_len, + 'srcport_or_icmptype_first': rule_l4_sport, + 'srcport_or_icmptype_last': rule_l4_sport, + 'dstport_or_icmpcode_first': rule_l4_dport, + 'dstport_or_icmpcode_last': rule_l4_dport, + 'proto': rule_l4_proto, + } rules.append(new_rule) - new_rule_permit = copy.copy(new_rule) - new_rule_permit.is_permit = 1 + new_rule_permit = new_rule.copy() + new_rule_permit['is_permit'] = 1 permit_rules.append(new_rule_permit) - new_rule_permit_and_reflect = copy.copy(new_rule) + new_rule_permit_and_reflect = new_rule.copy() if can_reflect_this_packet: - new_rule_permit_and_reflect.is_permit = 2 + new_rule_permit_and_reflect['is_permit'] = 2 else: - new_rule_permit_and_reflect.is_permit = is_permit + new_rule_permit_and_reflect['is_permit'] = is_permit permit_and_reflect_rules.append(new_rule_permit_and_reflect) self.logger.info("create_stream pkt#%d: %s" % (i, payload)) @@ -355,70 +356,79 @@ class TestACLpluginL2L3(VppTestCase): # UDP: - def applied_acl_shuffle(self, acl_if): - saved_n_input = acl_if.n_input - # TOTO: maybe copy each one?? - saved_acls = acl_if.acls + def applied_acl_shuffle(self, sw_if_index): + # first collect what ACLs are applied and what they look like + r = self.vapi.acl_interface_list_dump(sw_if_index=sw_if_index) + orig_applied_acls = r[0] + + # we will collect these just to save and generate additional rulesets + orig_acls = [] + for acl_num in orig_applied_acls.acls: + rr = self.vapi.acl_dump(acl_num) + orig_acls.append(rr[0]) # now create a list of all the rules in all ACLs all_rules = [] - for old_acl in saved_acls: - for rule in old_acl.rules: - all_rules.append(rule) + for old_acl in orig_acls: + for rule in old_acl.r: + all_rules.append(dict(rule._asdict())) # Add a few ACLs made from shuffled rules shuffle(all_rules) - acl1 = VppAcl(self, rules=all_rules[::2], tag="shuffle 1. acl") - acl1.add_vpp_config() - + reply = self.vapi.acl_add_replace(acl_index=4294967295, + r=all_rules[::2], + tag=b"shuffle 1. acl") + shuffle_acl_1 = reply.acl_index shuffle(all_rules) - acl2 = VppAcl(self, rules=all_rules[::3], tag="shuffle 2. acl") - acl2.add_vpp_config() - + reply = self.vapi.acl_add_replace(acl_index=4294967295, + r=all_rules[::3], + tag=b"shuffle 2. acl") + shuffle_acl_2 = reply.acl_index shuffle(all_rules) - acl3 = VppAcl(self, rules=all_rules[::2], tag="shuffle 3. acl") - acl3.add_vpp_config() + reply = self.vapi.acl_add_replace(acl_index=4294967295, + r=all_rules[::2], + tag=b"shuffle 3. acl") + shuffle_acl_3 = reply.acl_index # apply the shuffle ACLs in front - input_acls = [acl1, acl2] - output_acls = [acl1, acl2] + input_acls = [shuffle_acl_1, shuffle_acl_2] + output_acls = [shuffle_acl_1, shuffle_acl_2] # add the currently applied ACLs - n_input = acl_if.n_input - input_acls.extend(saved_acls[:n_input]) - output_acls.extend(saved_acls[n_input:]) + n_input = orig_applied_acls.n_input + input_acls.extend(orig_applied_acls.acls[:n_input]) + output_acls.extend(orig_applied_acls.acls[n_input:]) # and the trailing shuffle ACL(s) - input_acls.extend([acl3]) - output_acls.extend([acl3]) + input_acls.extend([shuffle_acl_3]) + output_acls.extend([shuffle_acl_3]) # set the interface ACL list to the result - acl_if.n_input = len(input_acls) - acl_if.acls = input_acls + output_acls - acl_if.add_vpp_config() - + self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index, + n_input=len(input_acls), + acls=input_acls + output_acls) # change the ACLs a few times for i in range(1, 10): shuffle(all_rules) - acl1.rules = all_rules[::1+(i % 2)] - acl1.add_vpp_config() - + reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_1, + r=all_rules[::1+(i % 2)], + tag=b"shuffle 1. acl") shuffle(all_rules) - acl2.rules = all_rules[::1+(i % 3)] - acl2.add_vpp_config() - + reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2, + r=all_rules[::1+(i % 3)], + tag=b"shuffle 2. acl") shuffle(all_rules) - acl3.rules = all_rules[::1+(i % 5)] - acl3.add_vpp_config() + reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2, + r=all_rules[::1+(i % 5)], + tag=b"shuffle 3. acl") # restore to how it was before and clean up - acl_if.n_input = saved_n_input - acl_if.acls = saved_acls - acl_if.add_vpp_config() - - acl1.remove_vpp_config() - acl2.remove_vpp_config() - acl3.remove_vpp_config() + self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index, + n_input=orig_applied_acls.n_input, + acls=orig_applied_acls.acls) + reply = self.vapi.acl_del(acl_index=shuffle_acl_1) + reply = self.vapi.acl_del(acl_index=shuffle_acl_2) + reply = self.vapi.acl_del(acl_index=shuffle_acl_3) def create_acls_for_a_stream(self, stream_dict, test_l2_action, is_reflect): @@ -426,14 +436,15 @@ class TestACLpluginL2L3(VppTestCase): r_permit = stream_dict['permit_rules'] r_permit_reflect = stream_dict['permit_and_reflect_rules'] r_action = r_permit_reflect if is_reflect else r - action_acl = VppAcl(self, rules=r_action, tag="act. acl") - action_acl.add_vpp_config() - permit_acl = VppAcl(self, rules=r_permit, tag="perm. acl") - permit_acl.add_vpp_config() - - return {'L2': action_acl if test_l2_action else permit_acl, - 'L3': permit_acl if test_l2_action else action_acl, - 'permit': permit_acl, 'action': action_acl} + reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_action, + tag=b"act. acl") + action_acl_index = reply.acl_index + reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_permit, + tag=b"perm. acl") + permit_acl_index = reply.acl_index + return {'L2': action_acl_index if test_l2_action else permit_acl_index, + 'L3': permit_acl_index if test_l2_action else action_acl_index, + 'permit': permit_acl_index, 'action': action_acl_index} def apply_acl_ip46_x_to_y(self, bridged_to_routed, test_l2_deny, is_ip6, is_reflect, add_eh): @@ -441,30 +452,26 @@ class TestACLpluginL2L3(VppTestCase): """ self.reset_packet_infos() stream_dict = self.create_stream( - self.pg2, self.loop0, - bridged_to_routed, - self.pg_if_packet_sizes, is_ip6, - not is_reflect, False, add_eh) + self.pg2, self.loop0, + bridged_to_routed, + self.pg_if_packet_sizes, is_ip6, + not is_reflect, False, add_eh) stream = stream_dict['stream'] acl_idx = self.create_acls_for_a_stream(stream_dict, test_l2_deny, is_reflect) n_input_l3 = 0 if bridged_to_routed else 1 n_input_l2 = 1 if bridged_to_routed else 0 - - acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index, - n_input=n_input_l3, acls=[acl_idx['L3']]) - acl_if_pg2.add_vpp_config() - - acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index, - n_input=n_input_l2, acls=[acl_idx['L2']]) - acl_if_pg0.add_vpp_config() - - acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index, - n_input=n_input_l2, acls=[acl_idx['L2']]) - acl_if_pg1.add_vpp_config() - - self.applied_acl_shuffle(acl_if_pg0) - self.applied_acl_shuffle(acl_if_pg1) + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index, + n_input=n_input_l3, + acls=[acl_idx['L3']]) + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, + n_input=n_input_l2, + acls=[acl_idx['L2']]) + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index, + n_input=n_input_l2, + acls=[acl_idx['L2']]) + self.applied_acl_shuffle(self.pg0.sw_if_index) + self.applied_acl_shuffle(self.pg2.sw_if_index) return {'L2': acl_idx['L2'], 'L3': acl_idx['L3']} def apply_acl_ip46_both_directions_reflect(self, @@ -509,23 +516,20 @@ class TestACLpluginL2L3(VppTestCase): else: outbound_l3_acl = acl_idx_rev['L3'] - acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index, - n_input=1, - acls=[inbound_l3_acl, outbound_l3_acl]) - acl_if_pg2.add_vpp_config() - - acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index, - n_input=1, - acls=[inbound_l2_acl, outbound_l2_acl]) - acl_if_pg0.add_vpp_config() - - acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index, - n_input=1, - acls=[inbound_l2_acl, outbound_l2_acl]) - acl_if_pg1.add_vpp_config() - - self.applied_acl_shuffle(acl_if_pg0) - self.applied_acl_shuffle(acl_if_pg2) + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index, + n_input=1, + acls=[inbound_l3_acl, + outbound_l3_acl]) + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, + n_input=1, + acls=[inbound_l2_acl, + outbound_l2_acl]) + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index, + n_input=1, + acls=[inbound_l2_acl, + outbound_l2_acl]) + self.applied_acl_shuffle(self.pg0.sw_if_index) + self.applied_acl_shuffle(self.pg2.sw_if_index) def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6, is_reflect, add_eh): @@ -590,7 +594,7 @@ class TestACLpluginL2L3(VppTestCase): pkts = self.run_traffic_ip46_routed_to_bridged(test_l2_deny, is_ip6, is_reflect, False, add_eh) - self.verify_acl_packet_count(acls['L3'].acl_index, pkts) + self.verify_acl_packet_count(acls['L3'], pkts) def run_test_ip46_bridged_to_routed(self, test_l2_deny, is_ip6, is_reflect, add_eh): @@ -600,7 +604,7 @@ class TestACLpluginL2L3(VppTestCase): pkts = self.run_traffic_ip46_bridged_to_routed(test_l2_deny, is_ip6, is_reflect, False, add_eh) - self.verify_acl_packet_count(acls['L2'].acl_index, pkts) + self.verify_acl_packet_count(acls['L2'], pkts) def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action, is_ip6, add_eh, |