aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_acl_plugin.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_acl_plugin.py')
-rw-r--r--test/test_acl_plugin.py49
1 files changed, 43 insertions, 6 deletions
diff --git a/test/test_acl_plugin.py b/test/test_acl_plugin.py
index 2bbebe85..b051d457 100644
--- a/test/test_acl_plugin.py
+++ b/test/test_acl_plugin.py
@@ -9,6 +9,7 @@ from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
+from scapy.layers.inet6 import IPv6ExtHdrFragment
from framework import VppTestCase, VppTestRunner
from util import Host, ppp
@@ -229,7 +230,7 @@ class TestACLplugin(VppTestCase):
return ''
def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
- proto=-1, ports=0):
+ proto=-1, ports=0, fragments=False):
"""
Create input packet stream for defined interface using hosts or
deleted_hosts list.
@@ -263,8 +264,14 @@ class TestACLplugin(VppTestCase):
p = Ether(dst=dst_host.mac, src=src_host.mac)
if pkt_info.ip:
p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
+ if fragments:
+ p /= IPv6ExtHdrFragment(offset=64, m=1)
else:
- p /= IP(src=src_host.ip4, dst=dst_host.ip4)
+ if fragments:
+ p /= IP(src=src_host.ip4, dst=dst_host.ip4,
+ flags=1, frag=64)
+ else:
+ p /= IP(src=src_host.ip4, dst=dst_host.ip4)
if traffic_type == self.ICMP:
if pkt_info.ip:
p /= ICMPv6EchoRequest(type=self.icmp6_type,
@@ -381,14 +388,16 @@ class TestACLplugin(VppTestCase):
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0):
+ def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
+ frags=False):
# Test
# Create incoming packet streams for packet-generator interfaces
pkts_cnt = 0
for i in self.pg_interfaces:
if self.flows.__contains__(i):
pkts = self.create_stream(i, self.pg_if_packet_sizes,
- traffic_type, ip_type, proto, ports)
+ traffic_type, ip_type, proto, ports,
+ frags)
if len(pkts) > 0:
i.add_stream(pkts)
pkts_cnt += len(pkts)
@@ -408,13 +417,14 @@ class TestACLplugin(VppTestCase):
self.verify_capture(dst_if, capture, traffic_type, ip_type)
def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
- ports=0):
+ ports=0, frags=False):
# Test
self.reset_packet_infos()
for i in self.pg_interfaces:
if self.flows.__contains__(i):
pkts = self.create_stream(i, self.pg_if_packet_sizes,
- traffic_type, ip_type, proto, ports)
+ traffic_type, ip_type, proto, ports,
+ frags)
if len(pkts) > 0:
i.add_stream(pkts)
@@ -1011,5 +1021,32 @@ class TestACLplugin(VppTestCase):
self.logger.info("ACLP_TEST_FINISH_0020")
+ def test_0021_udp_deny_port_verify_fragment_deny(self):
+ """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked
+ """
+ self.logger.info("ACLP_TEST_START_0021")
+
+ port = random.randint(0, 65535)
+ # Add an ACL
+ rules = []
+ rules.append(self.create_rule(self.IPV4, self.DENY, port,
+ self.proto[self.IP][self.UDP]))
+ rules.append(self.create_rule(self.IPV6, self.DENY, port,
+ self.proto[self.IP][self.UDP]))
+ # deny ip any any in the end
+ rules.append(self.create_rule(self.IPV4, self.PERMIT,
+ self.PORTS_ALL, 0))
+ rules.append(self.create_rule(self.IPV6, self.PERMIT,
+ self.PORTS_ALL, 0))
+
+ # Apply rules
+ self.apply_rules(rules, "deny ip4/ip6 udp "+str(port))
+
+ # Traffic should not pass
+ self.run_verify_negat_test(self.IP, self.IPRANDOM,
+ self.proto[self.IP][self.UDP], port, True)
+
+ self.logger.info("ACLP_TEST_FINISH_0021")
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)