diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/test_acl_plugin.py | 49 | ||||
-rw-r--r-- | test/test_acl_plugin_l2l3.py | 51 |
2 files changed, 91 insertions, 9 deletions
diff --git a/test/test_acl_plugin.py b/test/test_acl_plugin.py index 2bbebe85b1b..b051d457824 100644 --- a/test/test_acl_plugin.py +++ b/test/test_acl_plugin.py @@ -9,6 +9,7 @@ from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest +from scapy.layers.inet6 import IPv6ExtHdrFragment from framework import VppTestCase, VppTestRunner from util import Host, ppp @@ -229,7 +230,7 @@ class TestACLplugin(VppTestCase): return '' def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0, - proto=-1, ports=0): + proto=-1, ports=0, fragments=False): """ Create input packet stream for defined interface using hosts or deleted_hosts list. @@ -263,8 +264,14 @@ class TestACLplugin(VppTestCase): p = Ether(dst=dst_host.mac, src=src_host.mac) if pkt_info.ip: p /= IPv6(dst=dst_host.ip6, src=src_host.ip6) + if fragments: + p /= IPv6ExtHdrFragment(offset=64, m=1) else: - p /= IP(src=src_host.ip4, dst=dst_host.ip4) + if fragments: + p /= IP(src=src_host.ip4, dst=dst_host.ip4, + flags=1, frag=64) + else: + p /= IP(src=src_host.ip4, dst=dst_host.ip4) if traffic_type == self.ICMP: if pkt_info.ip: p /= ICMPv6EchoRequest(type=self.icmp6_type, @@ -381,14 +388,16 @@ class TestACLplugin(VppTestCase): self.pg_enable_capture(self.pg_interfaces) self.pg_start() - def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0): + def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0, + frags=False): # Test # Create incoming packet streams for packet-generator interfaces pkts_cnt = 0 for i in self.pg_interfaces: if self.flows.__contains__(i): pkts = self.create_stream(i, self.pg_if_packet_sizes, - traffic_type, ip_type, proto, ports) + traffic_type, ip_type, proto, ports, + frags) if len(pkts) > 0: i.add_stream(pkts) pkts_cnt += len(pkts) @@ -408,13 +417,14 @@ class TestACLplugin(VppTestCase): self.verify_capture(dst_if, capture, traffic_type, ip_type) def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1, - ports=0): + ports=0, frags=False): # Test self.reset_packet_infos() for i in self.pg_interfaces: if self.flows.__contains__(i): pkts = self.create_stream(i, self.pg_if_packet_sizes, - traffic_type, ip_type, proto, ports) + traffic_type, ip_type, proto, ports, + frags) if len(pkts) > 0: i.add_stream(pkts) @@ -1011,5 +1021,32 @@ class TestACLplugin(VppTestCase): self.logger.info("ACLP_TEST_FINISH_0020") + def test_0021_udp_deny_port_verify_fragment_deny(self): + """ deny single UDPv4/v6, permit ip any, verify non-initial fragment blocked + """ + self.logger.info("ACLP_TEST_START_0021") + + port = random.randint(0, 65535) + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, port, + self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV6, self.DENY, port, + self.proto[self.IP][self.UDP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.PERMIT, + self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.PERMIT, + self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, "deny ip4/ip6 udp "+str(port)) + + # Traffic should not pass + self.run_verify_negat_test(self.IP, self.IPRANDOM, + self.proto[self.IP][self.UDP], port, True) + + self.logger.info("ACLP_TEST_FINISH_0021") + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner) diff --git a/test/test_acl_plugin_l2l3.py b/test/test_acl_plugin_l2l3.py index 346825fceec..32abf184bb9 100644 --- a/test/test_acl_plugin_l2l3.py +++ b/test/test_acl_plugin_l2l3.py @@ -33,6 +33,7 @@ from scapy.layers.l2 import Ether from scapy.layers.inet import IP, UDP, ICMP, TCP from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting +from scapy.layers.inet6 import IPv6ExtHdrFragment from framework import VppTestCase, VppTestRunner import time @@ -203,7 +204,7 @@ class TestIpIrb(VppTestCase): if add_extension_header: # prepend some extension headers ulp = (IPv6ExtHdrRouting() / IPv6ExtHdrRouting() / - IPv6ExtHdrRouting() / ulp_l4) + IPv6ExtHdrFragment(offset=0, m=1) / ulp_l4) # uncomment below to test invalid ones # ulp = IPv6ExtHdrRouting(len = 200) / ulp_l4 else: @@ -214,10 +215,12 @@ class TestIpIrb(VppTestCase): Raw(payload)) else: ulp_l4 = UDP(sport=src_l4, dport=dst_l4) - # IPv4 does not allow extension headers + # IPv4 does not allow extension headers, + # but we rather make it a first fragment + flags = 1 if add_extension_header else 0 ulp = ulp_l4 p = (Ether(dst=dst_mac, src=src_mac) / - IP(src=src_ip4, dst=dst_ip4) / + IP(src=src_ip4, dst=dst_ip4, frag=0, flags=flags) / ulp / Raw(payload)) elif modulo == 1: @@ -670,6 +673,48 @@ class TestIpIrb(VppTestCase): self.run_test_ip46_bridged_to_routed_and_back(False, True, self.WITH_EH) + # IPv4 with "MF" bit set + + def test_1201_ip6_irb_1(self): + """ ACL IPv4+MF routed -> bridged, L2 ACL deny""" + self.run_test_ip46_routed_to_bridged(True, False, False, + self.WITH_EH) + + def test_1202_ip6_irb_1(self): + """ ACL IPv4+MF routed -> bridged, L3 ACL deny""" + self.run_test_ip46_routed_to_bridged(False, False, False, + self.WITH_EH) + + def test_1205_ip6_irb_1(self): + """ ACL IPv4+MF bridged -> routed, L2 ACL deny """ + self.run_test_ip46_bridged_to_routed(True, False, False, + self.WITH_EH) + + def test_1206_ip6_irb_1(self): + """ ACL IPv4+MF bridged -> routed, L3 ACL deny """ + self.run_test_ip46_bridged_to_routed(False, False, False, + self.WITH_EH) + + def test_1301_ip6_irb_1(self): + """ ACL IPv4+MF routed -> bridged, L2 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(True, False, + self.WITH_EH) + + def test_1302_ip6_irb_1(self): + """ ACL IPv4+MF bridged -> routed, L2 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(True, False, + self.WITH_EH) + + def test_1311_ip6_irb_1(self): + """ ACL IPv4+MF routed -> bridged, L3 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(False, False, + self.WITH_EH) + + def test_1312_ip6_irb_1(self): + """ ACL IPv4+MF bridged -> routed, L3 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(False, False, + self.WITH_EH) + # Old datapath group def test_8900_ip6_irb_1(self): """ ACL plugin set old L2 datapath""" |