summaryrefslogtreecommitdiffstats
path: root/src/plugins/acl/test/test_acl_plugin_l2l3.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins/acl/test/test_acl_plugin_l2l3.py')
-rw-r--r--src/plugins/acl/test/test_acl_plugin_l2l3.py216
1 files changed, 110 insertions, 106 deletions
diff --git a/src/plugins/acl/test/test_acl_plugin_l2l3.py b/src/plugins/acl/test/test_acl_plugin_l2l3.py
index eaddd0f9bf4..3379871e0b9 100644
--- a/src/plugins/acl/test/test_acl_plugin_l2l3.py
+++ b/src/plugins/acl/test/test_acl_plugin_l2l3.py
@@ -23,12 +23,10 @@
"""
-import copy
import unittest
from socket import inet_pton, AF_INET, AF_INET6
from random import choice, shuffle
from pprint import pprint
-from ipaddress import ip_network
import scapy.compat
from scapy.packet import Raw
@@ -42,8 +40,6 @@ from framework import VppTestCase, VppTestRunner
from vpp_l2 import L2_PORT_TYPE
import time
-from vpp_acl import AclRule, VppAcl, VppAclInterface
-
class TestACLpluginL2L3(VppTestCase):
"""TestACLpluginL2L3 Test Case"""
@@ -263,26 +259,31 @@ class TestACLpluginL2L3(VppTestCase):
else:
rule_l4_proto = p[IP].proto
- new_rule = AclRule(is_permit=is_permit, proto=rule_l4_proto,
- src_prefix=ip_network(
- (p[rule_l3_layer].src, rule_prefix_len)),
- dst_prefix=ip_network(
- (p[rule_l3_layer].dst, rule_prefix_len)))
- new_rule.sport_from = rule_l4_sport
- new_rule.sport_fo = rule_l4_sport
- new_rule.dport_from = rule_l4_dport
- new_rule.dport_to = rule_l4_dport
-
+ new_rule = {
+ 'is_permit': is_permit,
+ 'is_ipv6': p.haslayer(IPv6),
+ 'src_ip_addr': inet_pton(rule_family,
+ p[rule_l3_layer].src),
+ 'src_ip_prefix_len': rule_prefix_len,
+ 'dst_ip_addr': inet_pton(rule_family,
+ p[rule_l3_layer].dst),
+ 'dst_ip_prefix_len': rule_prefix_len,
+ 'srcport_or_icmptype_first': rule_l4_sport,
+ 'srcport_or_icmptype_last': rule_l4_sport,
+ 'dstport_or_icmpcode_first': rule_l4_dport,
+ 'dstport_or_icmpcode_last': rule_l4_dport,
+ 'proto': rule_l4_proto,
+ }
rules.append(new_rule)
- new_rule_permit = copy.copy(new_rule)
- new_rule_permit.is_permit = 1
+ new_rule_permit = new_rule.copy()
+ new_rule_permit['is_permit'] = 1
permit_rules.append(new_rule_permit)
- new_rule_permit_and_reflect = copy.copy(new_rule)
+ new_rule_permit_and_reflect = new_rule.copy()
if can_reflect_this_packet:
- new_rule_permit_and_reflect.is_permit = 2
+ new_rule_permit_and_reflect['is_permit'] = 2
else:
- new_rule_permit_and_reflect.is_permit = is_permit
+ new_rule_permit_and_reflect['is_permit'] = is_permit
permit_and_reflect_rules.append(new_rule_permit_and_reflect)
self.logger.info("create_stream pkt#%d: %s" % (i, payload))
@@ -355,70 +356,79 @@ class TestACLpluginL2L3(VppTestCase):
# UDP:
- def applied_acl_shuffle(self, acl_if):
- saved_n_input = acl_if.n_input
- # TOTO: maybe copy each one??
- saved_acls = acl_if.acls
+ def applied_acl_shuffle(self, sw_if_index):
+ # first collect what ACLs are applied and what they look like
+ r = self.vapi.acl_interface_list_dump(sw_if_index=sw_if_index)
+ orig_applied_acls = r[0]
+
+ # we will collect these just to save and generate additional rulesets
+ orig_acls = []
+ for acl_num in orig_applied_acls.acls:
+ rr = self.vapi.acl_dump(acl_num)
+ orig_acls.append(rr[0])
# now create a list of all the rules in all ACLs
all_rules = []
- for old_acl in saved_acls:
- for rule in old_acl.rules:
- all_rules.append(rule)
+ for old_acl in orig_acls:
+ for rule in old_acl.r:
+ all_rules.append(dict(rule._asdict()))
# Add a few ACLs made from shuffled rules
shuffle(all_rules)
- acl1 = VppAcl(self, rules=all_rules[::2], tag="shuffle 1. acl")
- acl1.add_vpp_config()
-
+ reply = self.vapi.acl_add_replace(acl_index=4294967295,
+ r=all_rules[::2],
+ tag=b"shuffle 1. acl")
+ shuffle_acl_1 = reply.acl_index
shuffle(all_rules)
- acl2 = VppAcl(self, rules=all_rules[::3], tag="shuffle 2. acl")
- acl2.add_vpp_config()
-
+ reply = self.vapi.acl_add_replace(acl_index=4294967295,
+ r=all_rules[::3],
+ tag=b"shuffle 2. acl")
+ shuffle_acl_2 = reply.acl_index
shuffle(all_rules)
- acl3 = VppAcl(self, rules=all_rules[::2], tag="shuffle 3. acl")
- acl3.add_vpp_config()
+ reply = self.vapi.acl_add_replace(acl_index=4294967295,
+ r=all_rules[::2],
+ tag=b"shuffle 3. acl")
+ shuffle_acl_3 = reply.acl_index
# apply the shuffle ACLs in front
- input_acls = [acl1, acl2]
- output_acls = [acl1, acl2]
+ input_acls = [shuffle_acl_1, shuffle_acl_2]
+ output_acls = [shuffle_acl_1, shuffle_acl_2]
# add the currently applied ACLs
- n_input = acl_if.n_input
- input_acls.extend(saved_acls[:n_input])
- output_acls.extend(saved_acls[n_input:])
+ n_input = orig_applied_acls.n_input
+ input_acls.extend(orig_applied_acls.acls[:n_input])
+ output_acls.extend(orig_applied_acls.acls[n_input:])
# and the trailing shuffle ACL(s)
- input_acls.extend([acl3])
- output_acls.extend([acl3])
+ input_acls.extend([shuffle_acl_3])
+ output_acls.extend([shuffle_acl_3])
# set the interface ACL list to the result
- acl_if.n_input = len(input_acls)
- acl_if.acls = input_acls + output_acls
- acl_if.add_vpp_config()
-
+ self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
+ n_input=len(input_acls),
+ acls=input_acls + output_acls)
# change the ACLs a few times
for i in range(1, 10):
shuffle(all_rules)
- acl1.rules = all_rules[::1+(i % 2)]
- acl1.add_vpp_config()
-
+ reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_1,
+ r=all_rules[::1+(i % 2)],
+ tag=b"shuffle 1. acl")
shuffle(all_rules)
- acl2.rules = all_rules[::1+(i % 3)]
- acl2.add_vpp_config()
-
+ reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2,
+ r=all_rules[::1+(i % 3)],
+ tag=b"shuffle 2. acl")
shuffle(all_rules)
- acl3.rules = all_rules[::1+(i % 5)]
- acl3.add_vpp_config()
+ reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2,
+ r=all_rules[::1+(i % 5)],
+ tag=b"shuffle 3. acl")
# restore to how it was before and clean up
- acl_if.n_input = saved_n_input
- acl_if.acls = saved_acls
- acl_if.add_vpp_config()
-
- acl1.remove_vpp_config()
- acl2.remove_vpp_config()
- acl3.remove_vpp_config()
+ self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
+ n_input=orig_applied_acls.n_input,
+ acls=orig_applied_acls.acls)
+ reply = self.vapi.acl_del(acl_index=shuffle_acl_1)
+ reply = self.vapi.acl_del(acl_index=shuffle_acl_2)
+ reply = self.vapi.acl_del(acl_index=shuffle_acl_3)
def create_acls_for_a_stream(self, stream_dict,
test_l2_action, is_reflect):
@@ -426,14 +436,15 @@ class TestACLpluginL2L3(VppTestCase):
r_permit = stream_dict['permit_rules']
r_permit_reflect = stream_dict['permit_and_reflect_rules']
r_action = r_permit_reflect if is_reflect else r
- action_acl = VppAcl(self, rules=r_action, tag="act. acl")
- action_acl.add_vpp_config()
- permit_acl = VppAcl(self, rules=r_permit, tag="perm. acl")
- permit_acl.add_vpp_config()
-
- return {'L2': action_acl if test_l2_action else permit_acl,
- 'L3': permit_acl if test_l2_action else action_acl,
- 'permit': permit_acl, 'action': action_acl}
+ reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_action,
+ tag=b"act. acl")
+ action_acl_index = reply.acl_index
+ reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_permit,
+ tag=b"perm. acl")
+ permit_acl_index = reply.acl_index
+ return {'L2': action_acl_index if test_l2_action else permit_acl_index,
+ 'L3': permit_acl_index if test_l2_action else action_acl_index,
+ 'permit': permit_acl_index, 'action': action_acl_index}
def apply_acl_ip46_x_to_y(self, bridged_to_routed, test_l2_deny,
is_ip6, is_reflect, add_eh):
@@ -441,30 +452,26 @@ class TestACLpluginL2L3(VppTestCase):
"""
self.reset_packet_infos()
stream_dict = self.create_stream(
- self.pg2, self.loop0,
- bridged_to_routed,
- self.pg_if_packet_sizes, is_ip6,
- not is_reflect, False, add_eh)
+ self.pg2, self.loop0,
+ bridged_to_routed,
+ self.pg_if_packet_sizes, is_ip6,
+ not is_reflect, False, add_eh)
stream = stream_dict['stream']
acl_idx = self.create_acls_for_a_stream(stream_dict, test_l2_deny,
is_reflect)
n_input_l3 = 0 if bridged_to_routed else 1
n_input_l2 = 1 if bridged_to_routed else 0
-
- acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index,
- n_input=n_input_l3, acls=[acl_idx['L3']])
- acl_if_pg2.add_vpp_config()
-
- acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index,
- n_input=n_input_l2, acls=[acl_idx['L2']])
- acl_if_pg0.add_vpp_config()
-
- acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index,
- n_input=n_input_l2, acls=[acl_idx['L2']])
- acl_if_pg1.add_vpp_config()
-
- self.applied_acl_shuffle(acl_if_pg0)
- self.applied_acl_shuffle(acl_if_pg1)
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
+ n_input=n_input_l3,
+ acls=[acl_idx['L3']])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+ n_input=n_input_l2,
+ acls=[acl_idx['L2']])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
+ n_input=n_input_l2,
+ acls=[acl_idx['L2']])
+ self.applied_acl_shuffle(self.pg0.sw_if_index)
+ self.applied_acl_shuffle(self.pg2.sw_if_index)
return {'L2': acl_idx['L2'], 'L3': acl_idx['L3']}
def apply_acl_ip46_both_directions_reflect(self,
@@ -509,23 +516,20 @@ class TestACLpluginL2L3(VppTestCase):
else:
outbound_l3_acl = acl_idx_rev['L3']
- acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index,
- n_input=1,
- acls=[inbound_l3_acl, outbound_l3_acl])
- acl_if_pg2.add_vpp_config()
-
- acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index,
- n_input=1,
- acls=[inbound_l2_acl, outbound_l2_acl])
- acl_if_pg0.add_vpp_config()
-
- acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index,
- n_input=1,
- acls=[inbound_l2_acl, outbound_l2_acl])
- acl_if_pg1.add_vpp_config()
-
- self.applied_acl_shuffle(acl_if_pg0)
- self.applied_acl_shuffle(acl_if_pg2)
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
+ n_input=1,
+ acls=[inbound_l3_acl,
+ outbound_l3_acl])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
+ n_input=1,
+ acls=[inbound_l2_acl,
+ outbound_l2_acl])
+ self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
+ n_input=1,
+ acls=[inbound_l2_acl,
+ outbound_l2_acl])
+ self.applied_acl_shuffle(self.pg0.sw_if_index)
+ self.applied_acl_shuffle(self.pg2.sw_if_index)
def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
is_reflect, add_eh):
@@ -590,7 +594,7 @@ class TestACLpluginL2L3(VppTestCase):
pkts = self.run_traffic_ip46_routed_to_bridged(test_l2_deny, is_ip6,
is_reflect, False,
add_eh)
- self.verify_acl_packet_count(acls['L3'].acl_index, pkts)
+ self.verify_acl_packet_count(acls['L3'], pkts)
def run_test_ip46_bridged_to_routed(self, test_l2_deny,
is_ip6, is_reflect, add_eh):
@@ -600,7 +604,7 @@ class TestACLpluginL2L3(VppTestCase):
pkts = self.run_traffic_ip46_bridged_to_routed(test_l2_deny, is_ip6,
is_reflect, False,
add_eh)
- self.verify_acl_packet_count(acls['L2'].acl_index, pkts)
+ self.verify_acl_packet_count(acls['L2'], pkts)
def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action,
is_ip6, add_eh,