diff options
author | Andrew Yourtchenko <ayourtch@gmail.com> | 2017-04-04 14:10:40 +0000 |
---|---|---|
committer | Damjan Marion <dmarion.lists@gmail.com> | 2017-04-06 15:30:21 +0000 |
commit | d1b05647427c79cfd5322991bbe663fae65f37b5 (patch) | |
tree | c7449b51cd46575c7978d8cac795b15a84f4163a /test/test_acl_plugin.py | |
parent | 0eb2b16f95c0c43302be79a1c4df8b828ac97e37 (diff) |
acl-plugin: make the IPv4/IPv6 non-first fragment handling in line with ACL (VPP-682)
This fixes the previously-implicit "drop all non-first fragments" behavior
to be more in line with security rules: a non-first fragment is treated
for the purposes of matching the ACL as a packet with the port
match succeeding. This allows to change the behavior to permit
the fragmented packets for the default "permit specific rules"
ruleset, but also gives the flexibility to block the non-initial
fragments by inserting into the begining a bogus rule
which would deny the L4 traffic.
Also, add a knob which allows to potentially turn this behavior off
in case of a dire need (and revert to dropping all non-initial fragments),
via a debug CLI.
Change-Id: I546b372b65ff2157d9c68b1d32f9e644f1dd71b4
Signed-off-by: Andrew Yourtchenko <ayourtch@gmail.com>
(cherry picked from commit 9fc0c26c6b28fd6c8b8142ea52f52eafa7e8c7ac)
Diffstat (limited to 'test/test_acl_plugin.py')
-rw-r--r-- | test/test_acl_plugin.py | 49 |
1 files changed, 43 insertions, 6 deletions
diff --git a/test/test_acl_plugin.py b/test/test_acl_plugin.py index 2bbebe85b1b..b051d457824 100644 --- a/test/test_acl_plugin.py +++ b/test/test_acl_plugin.py @@ -9,6 +9,7 @@ from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest +from scapy.layers.inet6 import IPv6ExtHdrFragment from framework import VppTestCase, VppTestRunner from util import Host, ppp @@ -229,7 +230,7 @@ class TestACLplugin(VppTestCase): return '' def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0, - proto=-1, ports=0): + proto=-1, ports=0, fragments=False): """ Create input packet stream for defined interface using hosts or deleted_hosts list. @@ -263,8 +264,14 @@ class TestACLplugin(VppTestCase): p = Ether(dst=dst_host.mac, src=src_host.mac) if pkt_info.ip: p /= IPv6(dst=dst_host.ip6, src=src_host.ip6) + if fragments: + p /= IPv6ExtHdrFragment(offset=64, m=1) else: - p /= IP(src=src_host.ip4, dst=dst_host.ip4) + if fragments: + p /= IP(src=src_host.ip4, dst=dst_host.ip4, + flags=1, frag=64) + else: + p /= IP(src=src_host.ip4, dst=dst_host.ip4) if traffic_type == self.ICMP: if pkt_info.ip: p /= ICMPv6EchoRequest(type=self.icmp6_type, @@ -381,14 +388,16 @@ class TestACLplugin(VppTestCase): self.pg_enable_capture(self.pg_interfaces) self.pg_start() - def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0): + def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0, + frags=False): # Test # Create incoming packet streams for packet-generator interfaces pkts_cnt = 0 for i in self.pg_interfaces: if self.flows.__contains__(i): pkts = self.create_stream(i, self.pg_if_packet_sizes, - traffic_type, ip_type, proto, ports) + traffic_type, ip_type, proto, ports, + frags) if len(pkts) > 0: i.add_stream(pkts) pkts_cnt += len(pkts) @@ -408,13 +417,14 @@ class TestACLplugin(VppTestCase): self.verify_capture(dst_if, capture, traffic_type, ip_type) def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1, - ports=0): + ports=0, frags=False): # Test self.reset_packet_infos() for i in self.pg_interfaces: if self.flows.__contains__(i): pkts = self.create_stream(i, self.pg_if_packet_sizes, - traffic_type, ip_type, proto, ports) + traffic_type, ip_type, proto, ports, + frags) if len(pkts) > 0: i.add_stream(pkts) @@ -1011,5 +1021,32 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0020") + def test_0021_udp_deny_port_verify_fragment_deny(self): + """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked + """ + self.logger.info("ACLP_TEST_START_0021") + + port = random.randint(0, 65535) + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, port, + self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV6, self.DENY, port, + self.proto[self.IP][self.UDP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.PERMIT, + self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.PERMIT, + self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "deny ip4/ip6 udp "+str(port)) + + # Traffic should not pass + self.run_verify_negat_test(self.IP, self.IPRANDOM, + self.proto[self.IP][self.UDP], port, True) + + self.logger.info("ACLP_TEST_FINISH_0021") + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner) |