summaryrefslogtreecommitdiffstats
path: root/src/plugins/acl/test/test_acl_plugin_l2l3.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins/acl/test/test_acl_plugin_l2l3.py')
-rw-r--r--src/plugins/acl/test/test_acl_plugin_l2l3.py216
1 files changed, 106 insertions, 110 deletions
diff --git a/src/plugins/acl/test/test_acl_plugin_l2l3.py b/src/plugins/acl/test/test_acl_plugin_l2l3.py
index 3379871e0b9..eaddd0f9bf4 100644
--- a/src/plugins/acl/test/test_acl_plugin_l2l3.py
+++ b/src/plugins/acl/test/test_acl_plugin_l2l3.py
@@ -23,10 +23,12 @@
"""
+import copy
import unittest
from socket import inet_pton, AF_INET, AF_INET6
from random import choice, shuffle
from pprint import pprint
+from ipaddress import ip_network
import scapy.compat
from scapy.packet import Raw
@@ -40,6 +42,8 @@ from framework import VppTestCase, VppTestRunner
from vpp_l2 import L2_PORT_TYPE
import time
+from vpp_acl import AclRule, VppAcl, VppAclInterface
+
class TestACLpluginL2L3(VppTestCase):
"""TestACLpluginL2L3 Test Case"""
@@ -259,31 +263,26 @@ class TestACLpluginL2L3(VppTestCase):
else:
rule_l4_proto = p[IP].proto
- new_rule = {
- 'is_permit': is_permit,
- 'is_ipv6': p.haslayer(IPv6),
- 'src_ip_addr': inet_pton(rule_family,
- p[rule_l3_layer].src),
- 'src_ip_prefix_len': rule_prefix_len,
- 'dst_ip_addr': inet_pton(rule_family,
- p[rule_l3_layer].dst),
- 'dst_ip_prefix_len': rule_prefix_len,
- 'srcport_or_icmptype_first': rule_l4_sport,
- 'srcport_or_icmptype_last': rule_l4_sport,
- 'dstport_or_icmpcode_first': rule_l4_dport,
- 'dstport_or_icmpcode_last': rule_l4_dport,
- 'proto': rule_l4_proto,
- }
+ new_rule = AclRule(is_permit=is_permit, proto=rule_l4_proto,
+ src_prefix=ip_network(
+ (p[rule_l3_layer].src, rule_prefix_len)),
+ dst_prefix=ip_network(
+ (p[rule_l3_layer].dst, rule_prefix_len)))
+ new_rule.sport_from = rule_l4_sport
+ new_rule.sport_fo = rule_l4_sport
+ new_rule.dport_from = rule_l4_dport
+ new_rule.dport_to = rule_l4_dport
+
rules.append(new_rule)
- new_rule_permit = new_rule.copy()
- new_rule_permit['is_permit'] = 1
+ new_rule_permit = copy.copy(new_rule)
+ new_rule_permit.is_permit = 1
permit_rules.append(new_rule_permit)
- new_rule_permit_and_reflect = new_rule.copy()
+ new_rule_permit_and_reflect = copy.copy(new_rule)
if can_reflect_this_packet:
- new_rule_permit_and_reflect['is_permit'] = 2
+ new_rule_permit_and_reflect.is_permit = 2
else:
- new_rule_permit_and_reflect['is_permit'] = is_permit
+ new_rule_permit_and_reflect.is_permit = is_permit
permit_and_reflect_rules.append(new_rule_permit_and_reflect)
self.logger.info("create_stream pkt#%d: %s" % (i, payload))
@@ -356,79 +355,70 @@ class TestACLpluginL2L3(VppTestCase):
# UDP:
- def applied_acl_shuffle(self, sw_if_index):
- # first collect what ACLs are applied and what they look like
- r = self.vapi.acl_interface_list_dump(sw_if_index=sw_if_index)
- orig_applied_acls = r[0]
-
- # we will collect these just to save and generate additional rulesets
- orig_acls = []
- for acl_num in orig_applied_acls.acls:
- rr = self.vapi.acl_dump(acl_num)
- orig_acls.append(rr[0])
+ def applied_acl_shuffle(self, acl_if):
+ saved_n_input = acl_if.n_input
+ # TOTO: maybe copy each one??
+ saved_acls = acl_if.acls
# now create a list of all the rules in all ACLs
all_rules = []
- for old_acl in orig_acls:
- for rule in old_acl.r:
- all_rules.append(dict(rule._asdict()))
+ for old_acl in saved_acls:
+ for rule in old_acl.rules:
+ all_rules.append(rule)
# Add a few ACLs made from shuffled rules
shuffle(all_rules)
- reply = self.vapi.acl_add_replace(acl_index=4294967295,
- r=all_rules[::2],
- tag=b"shuffle 1. acl")
- shuffle_acl_1 = reply.acl_index
+ acl1 = VppAcl(self, rules=all_rules[::2], tag="shuffle 1. acl")
+ acl1.add_vpp_config()
+
shuffle(all_rules)
- reply = self.vapi.acl_add_replace(acl_index=4294967295,
- r=all_rules[::3],
- tag=b"shuffle 2. acl")
- shuffle_acl_2 = reply.acl_index
+ acl2 = VppAcl(self, rules=all_rules[::3], tag="shuffle 2. acl")
+ acl2.add_vpp_config()
+
shuffle(all_rules)
- reply = self.vapi.acl_add_replace(acl_index=4294967295,
- r=all_rules[::2],
- tag=b"shuffle 3. acl")
- shuffle_acl_3 = reply.acl_index
+ acl3 = VppAcl(self, rules=all_rules[::2], tag="shuffle 3. acl")
+ acl3.add_vpp_config()
# apply the shuffle ACLs in front
- input_acls = [shuffle_acl_1, shuffle_acl_2]
- output_acls = [shuffle_acl_1, shuffle_acl_2]
+ input_acls = [acl1, acl2]
+ output_acls = [acl1, acl2]
# add the currently applied ACLs
- n_input = orig_applied_acls.n_input
- input_acls.extend(orig_applied_acls.acls[:n_input])
- output_acls.extend(orig_applied_acls.acls[n_input:])
+ n_input = acl_if.n_input
+ input_acls.extend(saved_acls[:n_input])
+ output_acls.extend(saved_acls[n_input:])
# and the trailing shuffle ACL(s)
- input_acls.extend([shuffle_acl_3])
- output_acls.extend([shuffle_acl_3])
+ input_acls.extend([acl3])
+ output_acls.extend([acl3])
# set the interface ACL list to the result
- self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
- n_input=len(input_acls),
- acls=input_acls + output_acls)
+ acl_if.n_input = len(input_acls)
+ acl_if.acls = input_acls + output_acls
+ acl_if.add_vpp_config()
+
# change the ACLs a few times
for i in range(1, 10):
shuffle(all_rules)
- reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_1,
- r=all_rules[::1+(i % 2)],
- tag=b"shuffle 1. acl")
+ acl1.rules = all_rules[::1+(i % 2)]
+ acl1.add_vpp_config()
+
shuffle(all_rules)
- reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2,
- r=all_rules[::1+(i % 3)],
- tag=b"shuffle 2. acl")
+ acl2.rules = all_rules[::1+(i % 3)]
+ acl2.add_vpp_config()
+
shuffle(all_rules)
- reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2,
- r=all_rules[::1+(i % 5)],
- tag=b"shuffle 3. acl")
+ acl3.rules = all_rules[::1+(i % 5)]
+ acl3.add_vpp_config()
# restore to how it was before and clean up
- self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index,
- n_input=orig_applied_acls.n_input,
- acls=orig_applied_acls.acls)
- reply = self.vapi.acl_del(acl_index=shuffle_acl_1)
- reply = self.vapi.acl_del(acl_index=shuffle_acl_2)
- reply = self.vapi.acl_del(acl_index=shuffle_acl_3)
+ acl_if.n_input = saved_n_input
+ acl_if.acls = saved_acls
+ acl_if.add_vpp_config()
+
+ acl1.remove_vpp_config()
+ acl2.remove_vpp_config()
+ acl3.remove_vpp_config()
def create_acls_for_a_stream(self, stream_dict,
test_l2_action, is_reflect):
@@ -436,15 +426,14 @@ class TestACLpluginL2L3(VppTestCase):
r_permit = stream_dict['permit_rules']
r_permit_reflect = stream_dict['permit_and_reflect_rules']
r_action = r_permit_reflect if is_reflect else r
- reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_action,
- tag=b"act. acl")
- action_acl_index = reply.acl_index
- reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_permit,
- tag=b"perm. acl")
- permit_acl_index = reply.acl_index
- return {'L2': action_acl_index if test_l2_action else permit_acl_index,
- 'L3': permit_acl_index if test_l2_action else action_acl_index,
- 'permit': permit_acl_index, 'action': action_acl_index}
+ action_acl = VppAcl(self, rules=r_action, tag="act. acl")
+ action_acl.add_vpp_config()
+ permit_acl = VppAcl(self, rules=r_permit, tag="perm. acl")
+ permit_acl.add_vpp_config()
+
+ return {'L2': action_acl if test_l2_action else permit_acl,
+ 'L3': permit_acl if test_l2_action else action_acl,
+ 'permit': permit_acl, 'action': action_acl}
def apply_acl_ip46_x_to_y(self, bridged_to_routed, test_l2_deny,
is_ip6, is_reflect, add_eh):
@@ -452,26 +441,30 @@ class TestACLpluginL2L3(VppTestCase):
"""
self.reset_packet_infos()
stream_dict = self.create_stream(
- self.pg2, self.loop0,
- bridged_to_routed,
- self.pg_if_packet_sizes, is_ip6,
- not is_reflect, False, add_eh)
+ self.pg2, self.loop0,
+ bridged_to_routed,
+ self.pg_if_packet_sizes, is_ip6,
+ not is_reflect, False, add_eh)
stream = stream_dict['stream']
acl_idx = self.create_acls_for_a_stream(stream_dict, test_l2_deny,
is_reflect)
n_input_l3 = 0 if bridged_to_routed else 1
n_input_l2 = 1 if bridged_to_routed else 0
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
- n_input=n_input_l3,
- acls=[acl_idx['L3']])
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- n_input=n_input_l2,
- acls=[acl_idx['L2']])
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
- n_input=n_input_l2,
- acls=[acl_idx['L2']])
- self.applied_acl_shuffle(self.pg0.sw_if_index)
- self.applied_acl_shuffle(self.pg2.sw_if_index)
+
+ acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index,
+ n_input=n_input_l3, acls=[acl_idx['L3']])
+ acl_if_pg2.add_vpp_config()
+
+ acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index,
+ n_input=n_input_l2, acls=[acl_idx['L2']])
+ acl_if_pg0.add_vpp_config()
+
+ acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index,
+ n_input=n_input_l2, acls=[acl_idx['L2']])
+ acl_if_pg1.add_vpp_config()
+
+ self.applied_acl_shuffle(acl_if_pg0)
+ self.applied_acl_shuffle(acl_if_pg1)
return {'L2': acl_idx['L2'], 'L3': acl_idx['L3']}
def apply_acl_ip46_both_directions_reflect(self,
@@ -516,20 +509,23 @@ class TestACLpluginL2L3(VppTestCase):
else:
outbound_l3_acl = acl_idx_rev['L3']
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index,
- n_input=1,
- acls=[inbound_l3_acl,
- outbound_l3_acl])
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index,
- n_input=1,
- acls=[inbound_l2_acl,
- outbound_l2_acl])
- self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index,
- n_input=1,
- acls=[inbound_l2_acl,
- outbound_l2_acl])
- self.applied_acl_shuffle(self.pg0.sw_if_index)
- self.applied_acl_shuffle(self.pg2.sw_if_index)
+ acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index,
+ n_input=1,
+ acls=[inbound_l3_acl, outbound_l3_acl])
+ acl_if_pg2.add_vpp_config()
+
+ acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index,
+ n_input=1,
+ acls=[inbound_l2_acl, outbound_l2_acl])
+ acl_if_pg0.add_vpp_config()
+
+ acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index,
+ n_input=1,
+ acls=[inbound_l2_acl, outbound_l2_acl])
+ acl_if_pg1.add_vpp_config()
+
+ self.applied_acl_shuffle(acl_if_pg0)
+ self.applied_acl_shuffle(acl_if_pg2)
def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
is_reflect, add_eh):
@@ -594,7 +590,7 @@ class TestACLpluginL2L3(VppTestCase):
pkts = self.run_traffic_ip46_routed_to_bridged(test_l2_deny, is_ip6,
is_reflect, False,
add_eh)
- self.verify_acl_packet_count(acls['L3'], pkts)
+ self.verify_acl_packet_count(acls['L3'].acl_index, pkts)
def run_test_ip46_bridged_to_routed(self, test_l2_deny,
is_ip6, is_reflect, add_eh):
@@ -604,7 +600,7 @@ class TestACLpluginL2L3(VppTestCase):
pkts = self.run_traffic_ip46_bridged_to_routed(test_l2_deny, is_ip6,
is_reflect, False,
add_eh)
- self.verify_acl_packet_count(acls['L2'], pkts)
+ self.verify_acl_packet_count(acls['L2'].acl_index, pkts)
def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action,
is_ip6, add_eh,