summaryrefslogtreecommitdiffstats
path: root/test/test_acl_plugin_l2l3.py
diff options
context:
space:
mode:
authorAndrew Yourtchenko <ayourtch@gmail.com>2018-09-26 21:03:06 +0200
committerDamjan Marion <dmarion@me.com>2018-09-26 22:50:29 +0000
commitfae833799f0c3398a18d1c0823e395345cdb9aa1 (patch)
treec7398e7f70da5d2b2e2bcf2cb7812f6e380ced28 /test/test_acl_plugin_l2l3.py
parentf5fa5ae2b021f946fbb8ec56e692459cd34bc7fb (diff)
acl-plugin: fix the stateful ICMP handling and add testcases
The stateful ICMP/ICMPv6 handling got broken. Fix that and introduce testcases to catch in the future. Change-Id: Ie602e72d6ac613d64ab0bf6693b6d75afb1a9552 Signed-off-by: Andrew Yourtchenko <ayourtch@gmail.com>
Diffstat (limited to 'test/test_acl_plugin_l2l3.py')
-rw-r--r--test/test_acl_plugin_l2l3.py134
1 files changed, 107 insertions, 27 deletions
diff --git a/test/test_acl_plugin_l2l3.py b/test/test_acl_plugin_l2l3.py
index 66d053ff584..8e6b9e30650 100644
--- a/test/test_acl_plugin_l2l3.py
+++ b/test/test_acl_plugin_l2l3.py
@@ -40,7 +40,7 @@ from vpp_papi_provider import L2_PORT_TYPE
import time
-class TestIpIrb(VppTestCase):
+class TestAclIpIrb(VppTestCase):
"""IRB Test Case"""
@classmethod
@@ -53,7 +53,7 @@ class TestIpIrb(VppTestCase):
#. Loopback BVI interface has remote hosts, one half of hosts are
behind pg0 second behind pg1.
"""
- super(TestIpIrb, cls).setUpClass()
+ super(TestAclIpIrb, cls).setUpClass()
cls.pg_if_packet_sizes = [64, 512, 1518, 9018] # packet sizes
cls.bd_id = 10
@@ -94,6 +94,8 @@ class TestIpIrb(VppTestCase):
cls.WITHOUT_EH = False
cls.WITH_EH = True
+ cls.STATELESS_ICMP = False
+ cls.STATEFUL_ICMP = True
# Loopback BVI interface has remote hosts, one half of hosts are behind
# pg0 second behind pg1
@@ -106,24 +108,24 @@ class TestIpIrb(VppTestCase):
``show l2fib verbose``,``show bridge-domain <bd_id> detail``,
``show ip arp``.
"""
- super(TestIpIrb, self).tearDown()
+ super(TestAclIpIrb, self).tearDown()
if not self.vpp_dead:
self.logger.info(self.vapi.cli("show l2patch"))
self.logger.info(self.vapi.cli("show classify tables"))
- self.logger.info(self.vapi.cli("show vlib graph"))
self.logger.info(self.vapi.cli("show l2fib verbose"))
self.logger.info(self.vapi.cli("show bridge-domain %s detail" %
self.bd_id))
self.logger.info(self.vapi.cli("show ip arp"))
self.logger.info(self.vapi.cli("show ip6 neighbors"))
- self.logger.info(self.vapi.cli("show acl-plugin sessions"))
+ cmd = "show acl-plugin sessions verbose 1"
+ self.logger.info(self.vapi.cli(cmd))
self.logger.info(self.vapi.cli("show acl-plugin acl"))
self.logger.info(self.vapi.cli("show acl-plugin interface"))
self.logger.info(self.vapi.cli("show acl-plugin tables"))
def create_stream(self, src_ip_if, dst_ip_if, reverse, packet_sizes,
is_ip6, expect_blocked, expect_established,
- add_extension_header):
+ add_extension_header, icmp_stateful=False):
pkts = []
rules = []
permit_rules = []
@@ -131,7 +133,15 @@ class TestIpIrb(VppTestCase):
total_packet_count = 8
for i in range(0, total_packet_count):
modulo = (i//2) % 2
- can_reflect_this_packet = (modulo == 0)
+ icmp_type_delta = i % 2
+ icmp_code = i
+ is_udp_packet = (modulo == 0)
+ if is_udp_packet and icmp_stateful:
+ continue
+ is_reflectable_icmp = (icmp_stateful and icmp_type_delta == 0 and
+ not is_udp_packet)
+ is_reflected_icmp = is_reflectable_icmp and expect_established
+ can_reflect_this_packet = is_udp_packet or is_reflectable_icmp
is_permit = i % 2
remote_dst_index = i % len(dst_ip_if.remote_hosts)
remote_dst_host = dst_ip_if.remote_hosts[remote_dst_index]
@@ -167,13 +177,15 @@ class TestIpIrb(VppTestCase):
dst_ip4 = remote_dst_host.ip4
src_l4 = 1234 + i
dst_l4 = 4321 + i
+ if is_reflected_icmp:
+ icmp_type_delta = 1
# default ULP should be something we do not use in tests
ulp_l4 = TCP(sport=src_l4, dport=dst_l4)
# potentially a chain of protocols leading to ULP
ulp = ulp_l4
- if can_reflect_this_packet:
+ if is_udp_packet:
if is_ip6:
ulp_l4 = UDP(sport=src_l4, dport=dst_l4)
if add_extension_header:
@@ -200,14 +212,15 @@ class TestIpIrb(VppTestCase):
Raw(payload))
elif modulo == 1:
if is_ip6:
- ulp_l4 = ICMPv6Unknown(type=128 + (i % 2), code=i % 2)
+ ulp_l4 = ICMPv6Unknown(type=128 + icmp_type_delta,
+ code=icmp_code)
ulp = ulp_l4
p = (Ether(dst=dst_mac, src=src_mac) /
IPv6(src=src_ip6, dst=dst_ip6) /
ulp /
Raw(payload))
else:
- ulp_l4 = ICMP(type=8 + (i % 2), code=i % 2)
+ ulp_l4 = ICMP(type=8 - 8*icmp_type_delta, code=icmp_code)
ulp = ulp_l4
p = (Ether(dst=dst_mac, src=src_mac) /
IP(src=src_ip4, dst=dst_ip4) /
@@ -264,7 +277,9 @@ class TestIpIrb(VppTestCase):
new_rule_permit_and_reflect['is_permit'] = 2
else:
new_rule_permit_and_reflect['is_permit'] = is_permit
+
permit_and_reflect_rules.append(new_rule_permit_and_reflect)
+ self.logger.info("create_stream pkt#%d: %s" % (i, payload))
return {'stream': pkts,
'rules': rules,
@@ -452,20 +467,22 @@ class TestIpIrb(VppTestCase):
def apply_acl_ip46_both_directions_reflect(self,
primary_is_bridged_to_routed,
- reflect_on_l2, is_ip6, add_eh):
+ reflect_on_l2, is_ip6, add_eh,
+ stateful_icmp):
primary_is_routed_to_bridged = not primary_is_bridged_to_routed
self.reset_packet_infos()
stream_dict_fwd = self.create_stream(self.pg2, self.loop0,
primary_is_bridged_to_routed,
self.pg_if_packet_sizes, is_ip6,
- False, False, add_eh)
+ False, False, add_eh,
+ stateful_icmp)
acl_idx_fwd = self.create_acls_for_a_stream(stream_dict_fwd,
reflect_on_l2, True)
stream_dict_rev = self.create_stream(self.pg2, self.loop0,
not primary_is_bridged_to_routed,
self.pg_if_packet_sizes, is_ip6,
- True, True, add_eh)
+ True, True, add_eh, stateful_icmp)
# We want the primary action to be "deny" rather than reflect
acl_idx_rev = self.create_acls_for_a_stream(stream_dict_rev,
reflect_on_l2, False)
@@ -517,13 +534,14 @@ class TestIpIrb(VppTestCase):
def run_traffic_ip46_x_to_y(self, bridged_to_routed,
test_l2_deny, is_ip6,
- is_reflect, is_established, add_eh):
+ is_reflect, is_established, add_eh,
+ stateful_icmp=False):
self.reset_packet_infos()
stream_dict = self.create_stream(self.pg2, self.loop0,
bridged_to_routed,
self.pg_if_packet_sizes, is_ip6,
not is_reflect, is_established,
- add_eh)
+ add_eh, stateful_icmp)
stream = stream_dict['stream']
tx_if = self.pg0 if bridged_to_routed else self.pg2
@@ -537,14 +555,18 @@ class TestIpIrb(VppTestCase):
self.verify_capture(self.loop0, self.pg2, rcvd1, bridged_to_routed)
def run_traffic_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
- is_reflect, is_established, add_eh):
+ is_reflect, is_established, add_eh,
+ stateful_icmp=False):
self.run_traffic_ip46_x_to_y(False, test_l2_deny, is_ip6,
- is_reflect, is_established, add_eh)
+ is_reflect, is_established, add_eh,
+ stateful_icmp)
def run_traffic_ip46_bridged_to_routed(self, test_l2_deny, is_ip6,
- is_reflect, is_established, add_eh):
+ is_reflect, is_established, add_eh,
+ stateful_icmp=False):
self.run_traffic_ip46_x_to_y(True, test_l2_deny, is_ip6,
- is_reflect, is_established, add_eh)
+ is_reflect, is_established, add_eh,
+ stateful_icmp)
def run_test_ip46_routed_to_bridged(self, test_l2_deny,
is_ip6, is_reflect, add_eh):
@@ -561,22 +583,30 @@ class TestIpIrb(VppTestCase):
is_reflect, False, add_eh)
def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action,
- is_ip6, add_eh):
+ is_ip6, add_eh,
+ stateful_icmp=False):
self.apply_acl_ip46_both_directions_reflect(False, test_l2_action,
- is_ip6, add_eh)
+ is_ip6, add_eh,
+ stateful_icmp)
self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6,
- True, False, add_eh)
+ True, False, add_eh,
+ stateful_icmp)
self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6,
- False, True, add_eh)
+ False, True, add_eh,
+ stateful_icmp)
def run_test_ip46_bridged_to_routed_and_back(self, test_l2_action,
- is_ip6, add_eh):
+ is_ip6, add_eh,
+ stateful_icmp=False):
self.apply_acl_ip46_both_directions_reflect(True, test_l2_action,
- is_ip6, add_eh)
+ is_ip6, add_eh,
+ stateful_icmp)
self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6,
- True, False, add_eh)
+ True, False, add_eh,
+ stateful_icmp)
self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6,
- False, True, add_eh)
+ False, True, add_eh,
+ stateful_icmp)
def test_0000_ip6_irb_1(self):
""" ACL plugin prepare"""
@@ -761,6 +791,56 @@ class TestIpIrb(VppTestCase):
""" ACL IPv4+MF bridged -> routed, L3 ACL permit+reflect"""
self.run_test_ip46_bridged_to_routed_and_back(False, False,
self.WITH_EH)
+ # Stateful ACL tests with stateful ICMP
+
+ def test_1401_ip6_irb_1(self):
+ """ IPv6 routed -> bridged, L2 ACL permit+reflect, ICMP reflect"""
+ self.run_test_ip46_routed_to_bridged_and_back(True, True,
+ self.WITHOUT_EH,
+ self.STATEFUL_ICMP)
+
+ def test_1402_ip6_irb_1(self):
+ """ IPv6 bridged -> routed, L2 ACL permit+reflect, ICMP reflect"""
+ self.run_test_ip46_bridged_to_routed_and_back(True, True,
+ self.WITHOUT_EH,
+ self.STATEFUL_ICMP)
+
+ def test_1403_ip4_irb_1(self):
+ """ IPv4 routed -> bridged, L2 ACL permit+reflect, ICMP reflect"""
+ self.run_test_ip46_routed_to_bridged_and_back(True, False,
+ self.WITHOUT_EH,
+ self.STATEFUL_ICMP)
+
+ def test_1404_ip4_irb_1(self):
+ """ IPv4 bridged -> routed, L2 ACL permit+reflect, ICMP reflect"""
+ self.run_test_ip46_bridged_to_routed_and_back(True, False,
+ self.WITHOUT_EH,
+ self.STATEFUL_ICMP)
+
+ def test_1411_ip6_irb_1(self):
+ """ IPv6 routed -> bridged, L3 ACL permit+reflect, ICMP reflect"""
+ self.run_test_ip46_routed_to_bridged_and_back(False, True,
+ self.WITHOUT_EH,
+ self.STATEFUL_ICMP)
+
+ def test_1412_ip6_irb_1(self):
+ """ IPv6 bridged -> routed, L3 ACL permit+reflect, ICMP reflect"""
+ self.run_test_ip46_bridged_to_routed_and_back(False, True,
+ self.WITHOUT_EH,
+ self.STATEFUL_ICMP)
+
+ def test_1413_ip4_irb_1(self):
+ """ IPv4 routed -> bridged, L3 ACL permit+reflect, ICMP reflect"""
+ self.run_test_ip46_routed_to_bridged_and_back(False, False,
+ self.WITHOUT_EH,
+ self.STATEFUL_ICMP)
+
+ def test_1414_ip4_irb_1(self):
+ """ IPv4 bridged -> routed, L3 ACL permit+reflect, ICMP reflect"""
+ self.run_test_ip46_bridged_to_routed_and_back(False, False,
+ self.WITHOUT_EH,
+ self.STATEFUL_ICMP)
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)