From a43c93f8554ad7418e31be3791b3fb71232f60ac Mon Sep 17 00:00:00 2001 From: Dave Wallace Date: Thu, 22 Aug 2019 00:32:29 +0000 Subject: tests: move plugin tests to src/plugins/*/test - Relocate plugin tests for 'make test' into src/plugins/*/test so that plugin test cases are co-located with the plugin source code. Type: refactor Signed-off-by: Dave Wallace Change-Id: I503e6a43528e14981799b735fa65674155713f67 Signed-off-by: Dave Wallace --- src/plugins/acl/test/test_acl_plugin.py | 1519 +++++++++++++++++++++++++ src/plugins/acl/test/test_acl_plugin_conns.py | 411 +++++++ src/plugins/acl/test/test_acl_plugin_l2l3.py | 871 ++++++++++++++ src/plugins/acl/test/test_acl_plugin_macip.py | 1295 +++++++++++++++++++++ src/plugins/acl/test/test_classify_l2_acl.py | 689 +++++++++++ 5 files changed, 4785 insertions(+) create mode 100644 src/plugins/acl/test/test_acl_plugin.py create mode 100644 src/plugins/acl/test/test_acl_plugin_conns.py create mode 100644 src/plugins/acl/test/test_acl_plugin_l2l3.py create mode 100644 src/plugins/acl/test/test_acl_plugin_macip.py create mode 100644 src/plugins/acl/test/test_classify_l2_acl.py (limited to 'src/plugins/acl/test') diff --git a/src/plugins/acl/test/test_acl_plugin.py b/src/plugins/acl/test/test_acl_plugin.py new file mode 100644 index 00000000000..eca02316bf6 --- /dev/null +++ b/src/plugins/acl/test/test_acl_plugin.py @@ -0,0 +1,1519 @@ +#!/usr/bin/env python +"""ACL plugin Test Case HLD: +""" + +import unittest +import random + +from scapy.packet import Raw +from scapy.layers.l2 import Ether +from scapy.layers.inet import IP, TCP, UDP, ICMP +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest +from scapy.layers.inet6 import IPv6ExtHdrFragment +from framework import VppTestCase, VppTestRunner +from util import Host, ppp + +from vpp_lo_interface import VppLoInterface + + +class TestACLplugin(VppTestCase): + """ ACL plugin Test Case """ + + # traffic types + IP = 0 + ICMP = 1 + + # IP version + IPRANDOM = -1 + IPV4 = 0 + IPV6 = 1 + + # rule types + DENY = 0 + PERMIT = 1 + + # supported protocols + proto = [[6, 17], [1, 58]] + proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'} + ICMPv4 = 0 + ICMPv6 = 1 + TCP = 0 + UDP = 1 + PROTO_ALL = 0 + + # port ranges + PORTS_ALL = -1 + PORTS_RANGE = 0 + PORTS_RANGE_2 = 1 + udp_sport_from = 10 + udp_sport_to = udp_sport_from + 5 + udp_dport_from = 20000 + udp_dport_to = udp_dport_from + 5000 + tcp_sport_from = 30 + tcp_sport_to = tcp_sport_from + 5 + tcp_dport_from = 40000 + tcp_dport_to = tcp_dport_from + 5000 + + udp_sport_from_2 = 90 + udp_sport_to_2 = udp_sport_from_2 + 5 + udp_dport_from_2 = 30000 + udp_dport_to_2 = udp_dport_from_2 + 5000 + tcp_sport_from_2 = 130 + tcp_sport_to_2 = tcp_sport_from_2 + 5 + tcp_dport_from_2 = 20000 + tcp_dport_to_2 = tcp_dport_from_2 + 5000 + + icmp4_type = 8 # echo request + icmp4_code = 3 + icmp6_type = 128 # echo request + icmp6_code = 3 + + icmp4_type_2 = 8 + icmp4_code_from_2 = 5 + icmp4_code_to_2 = 20 + icmp6_type_2 = 128 + icmp6_code_from_2 = 8 + icmp6_code_to_2 = 42 + + # Test variables + bd_id = 1 + + @classmethod + def setUpClass(cls): + """ + Perform standard class setup (defined by class method setUpClass in + class VppTestCase) before running the test case, set test case related + variables and configure VPP. + """ + super(TestACLplugin, cls).setUpClass() + + try: + # Create 2 pg interfaces + cls.create_pg_interfaces(range(2)) + + # Packet flows mapping pg0 -> pg1, pg2 etc. + cls.flows = dict() + cls.flows[cls.pg0] = [cls.pg1] + + # Packet sizes + cls.pg_if_packet_sizes = [64, 512, 1518, 9018] + + # Create BD with MAC learning and unknown unicast flooding disabled + # and put interfaces to this BD + cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1, + learn=1) + for pg_if in cls.pg_interfaces: + cls.vapi.sw_interface_set_l2_bridge( + rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id) + + # Set up all interfaces + for i in cls.pg_interfaces: + i.admin_up() + + # Mapping between packet-generator index and lists of test hosts + cls.hosts_by_pg_idx = dict() + for pg_if in cls.pg_interfaces: + cls.hosts_by_pg_idx[pg_if.sw_if_index] = [] + + # Create list of deleted hosts + cls.deleted_hosts_by_pg_idx = dict() + for pg_if in cls.pg_interfaces: + cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = [] + + # warm-up the mac address tables + # self.warmup_test() + count = 16 + start = 0 + n_int = len(cls.pg_interfaces) + macs_per_if = count / n_int + i = -1 + for pg_if in cls.pg_interfaces: + i += 1 + start_nr = macs_per_if * i + start + end_nr = count + start if i == (n_int - 1) \ + else macs_per_if * (i + 1) + start + hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index] + for j in range(start_nr, end_nr): + host = Host( + "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j), + "172.17.1%02x.%u" % (pg_if.sw_if_index, j), + "2017:dead:%02x::%u" % (pg_if.sw_if_index, j)) + hosts.append(host) + + except Exception: + super(TestACLplugin, cls).tearDownClass() + raise + + @classmethod + def tearDownClass(cls): + super(TestACLplugin, cls).tearDownClass() + + def setUp(self): + super(TestACLplugin, self).setUp() + self.reset_packet_infos() + + def tearDown(self): + """ + Show various debug prints after each test. + """ + super(TestACLplugin, self).tearDown() + + def show_commands_at_teardown(self): + cli = "show vlib graph l2-input-feat-arc" + self.logger.info(self.vapi.ppcli(cli)) + cli = "show vlib graph l2-input-feat-arc-end" + self.logger.info(self.vapi.ppcli(cli)) + cli = "show vlib graph l2-output-feat-arc" + self.logger.info(self.vapi.ppcli(cli)) + cli = "show vlib graph l2-output-feat-arc-end" + self.logger.info(self.vapi.ppcli(cli)) + self.logger.info(self.vapi.ppcli("show l2fib verbose")) + self.logger.info(self.vapi.ppcli("show acl-plugin acl")) + self.logger.info(self.vapi.ppcli("show acl-plugin interface")) + self.logger.info(self.vapi.ppcli("show acl-plugin tables")) + self.logger.info(self.vapi.ppcli("show bridge-domain %s detail" + % self.bd_id)) + + def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1, + s_prefix=0, s_ip='\x00\x00\x00\x00', + d_prefix=0, d_ip='\x00\x00\x00\x00'): + if proto == -1: + return + if ports == self.PORTS_ALL: + sport_from = 0 + dport_from = 0 + sport_to = 65535 if proto != 1 and proto != 58 else 255 + dport_to = sport_to + elif ports == self.PORTS_RANGE: + if proto == 1: + sport_from = self.icmp4_type + sport_to = self.icmp4_type + dport_from = self.icmp4_code + dport_to = self.icmp4_code + elif proto == 58: + sport_from = self.icmp6_type + sport_to = self.icmp6_type + dport_from = self.icmp6_code + dport_to = self.icmp6_code + elif proto == self.proto[self.IP][self.TCP]: + sport_from = self.tcp_sport_from + sport_to = self.tcp_sport_to + dport_from = self.tcp_dport_from + dport_to = self.tcp_dport_to + elif proto == self.proto[self.IP][self.UDP]: + sport_from = self.udp_sport_from + sport_to = self.udp_sport_to + dport_from = self.udp_dport_from + dport_to = self.udp_dport_to + elif ports == self.PORTS_RANGE_2: + if proto == 1: + sport_from = self.icmp4_type_2 + sport_to = self.icmp4_type_2 + dport_from = self.icmp4_code_from_2 + dport_to = self.icmp4_code_to_2 + elif proto == 58: + sport_from = self.icmp6_type_2 + sport_to = self.icmp6_type_2 + dport_from = self.icmp6_code_from_2 + dport_to = self.icmp6_code_to_2 + elif proto == self.proto[self.IP][self.TCP]: + sport_from = self.tcp_sport_from_2 + sport_to = self.tcp_sport_to_2 + dport_from = self.tcp_dport_from_2 + dport_to = self.tcp_dport_to_2 + elif proto == self.proto[self.IP][self.UDP]: + sport_from = self.udp_sport_from_2 + sport_to = self.udp_sport_to_2 + dport_from = self.udp_dport_from_2 + dport_to = self.udp_dport_to_2 + else: + sport_from = ports + sport_to = ports + dport_from = ports + dport_to = ports + + rule = ({'is_permit': permit_deny, 'is_ipv6': ip, 'proto': proto, + 'srcport_or_icmptype_first': sport_from, + 'srcport_or_icmptype_last': sport_to, + 'src_ip_prefix_len': s_prefix, + 'src_ip_addr': s_ip, + 'dstport_or_icmpcode_first': dport_from, + 'dstport_or_icmpcode_last': dport_to, + 'dst_ip_prefix_len': d_prefix, + 'dst_ip_addr': d_ip}) + return rule + + def apply_rules(self, rules, tag=b''): + reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules, + tag=tag) + self.logger.info("Dumped ACL: " + str( + self.vapi.acl_dump(reply.acl_index))) + # Apply a ACL on the interface as inbound + for i in self.pg_interfaces: + self.vapi.acl_interface_set_acl_list(sw_if_index=i.sw_if_index, + n_input=1, + acls=[reply.acl_index]) + return reply.acl_index + + def apply_rules_to(self, rules, tag=b'', sw_if_index=0xFFFFFFFF): + reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules, + tag=tag) + self.logger.info("Dumped ACL: " + str( + self.vapi.acl_dump(reply.acl_index))) + # Apply a ACL on the interface as inbound + self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index, + n_input=1, + acls=[reply.acl_index]) + return reply.acl_index + + def etype_whitelist(self, whitelist, n_input): + # Apply whitelists on all the interfaces + for i in self.pg_interfaces: + # checkstyle can't read long names. Help them. + fun = self.vapi.acl_interface_set_etype_whitelist + fun(sw_if_index=i.sw_if_index, n_input=n_input, + whitelist=whitelist) + return + + def create_upper_layer(self, packet_index, proto, ports=0): + p = self.proto_map[proto] + if p == 'UDP': + if ports == 0: + return UDP(sport=random.randint(self.udp_sport_from, + self.udp_sport_to), + dport=random.randint(self.udp_dport_from, + self.udp_dport_to)) + else: + return UDP(sport=ports, dport=ports) + elif p == 'TCP': + if ports == 0: + return TCP(sport=random.randint(self.tcp_sport_from, + self.tcp_sport_to), + dport=random.randint(self.tcp_dport_from, + self.tcp_dport_to)) + else: + return TCP(sport=ports, dport=ports) + return '' + + def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0, + proto=-1, ports=0, fragments=False, + pkt_raw=True, etype=-1): + """ + Create input packet stream for defined interface using hosts or + deleted_hosts list. + + :param object src_if: Interface to create packet stream for. + :param list packet_sizes: List of required packet sizes. + :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise. + :return: Stream of packets. + """ + pkts = [] + if self.flows.__contains__(src_if): + src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index] + for dst_if in self.flows[src_if]: + dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index] + n_int = len(dst_hosts) * len(src_hosts) + for i in range(0, n_int): + dst_host = dst_hosts[i / len(src_hosts)] + src_host = src_hosts[i % len(src_hosts)] + pkt_info = self.create_packet_info(src_if, dst_if) + if ipv6 == 1: + pkt_info.ip = 1 + elif ipv6 == 0: + pkt_info.ip = 0 + else: + pkt_info.ip = random.choice([0, 1]) + if proto == -1: + pkt_info.proto = random.choice(self.proto[self.IP]) + else: + pkt_info.proto = proto + payload = self.info_to_payload(pkt_info) + p = Ether(dst=dst_host.mac, src=src_host.mac) + if etype > 0: + p = Ether(dst=dst_host.mac, + src=src_host.mac, + type=etype) + if pkt_info.ip: + p /= IPv6(dst=dst_host.ip6, src=src_host.ip6) + if fragments: + p /= IPv6ExtHdrFragment(offset=64, m=1) + else: + if fragments: + p /= IP(src=src_host.ip4, dst=dst_host.ip4, + flags=1, frag=64) + else: + p /= IP(src=src_host.ip4, dst=dst_host.ip4) + if traffic_type == self.ICMP: + if pkt_info.ip: + p /= ICMPv6EchoRequest(type=self.icmp6_type, + code=self.icmp6_code) + else: + p /= ICMP(type=self.icmp4_type, + code=self.icmp4_code) + else: + p /= self.create_upper_layer(i, pkt_info.proto, ports) + if pkt_raw: + p /= Raw(payload) + pkt_info.data = p.copy() + if pkt_raw: + size = random.choice(packet_sizes) + self.extend_packet(p, size) + pkts.append(p) + return pkts + + def verify_capture(self, pg_if, capture, + traffic_type=0, ip_type=0, etype=-1): + """ + Verify captured input packet stream for defined interface. + + :param object pg_if: Interface to verify captured packet stream for. + :param list capture: Captured packet stream. + :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise. + """ + last_info = dict() + for i in self.pg_interfaces: + last_info[i.sw_if_index] = None + dst_sw_if_index = pg_if.sw_if_index + for packet in capture: + if etype > 0: + if packet[Ether].type != etype: + self.logger.error(ppp("Unexpected ethertype in packet:", + packet)) + else: + continue + try: + # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data + if traffic_type == self.ICMP and ip_type == self.IPV6: + payload_info = self.payload_to_info( + packet[ICMPv6EchoRequest], 'data') + payload = packet[ICMPv6EchoRequest] + else: + payload_info = self.payload_to_info(packet[Raw]) + payload = packet[self.proto_map[payload_info.proto]] + except: + self.logger.error(ppp("Unexpected or invalid packet " + "(outside network):", packet)) + raise + + if ip_type != 0: + self.assertEqual(payload_info.ip, ip_type) + if traffic_type == self.ICMP: + try: + if payload_info.ip == 0: + self.assertEqual(payload.type, self.icmp4_type) + self.assertEqual(payload.code, self.icmp4_code) + else: + self.assertEqual(payload.type, self.icmp6_type) + self.assertEqual(payload.code, self.icmp6_code) + except: + self.logger.error(ppp("Unexpected or invalid packet " + "(outside network):", packet)) + raise + else: + try: + ip_version = IPv6 if payload_info.ip == 1 else IP + + ip = packet[ip_version] + packet_index = payload_info.index + + self.assertEqual(payload_info.dst, dst_sw_if_index) + self.logger.debug("Got packet on port %s: src=%u (id=%u)" % + (pg_if.name, payload_info.src, + packet_index)) + next_info = self.get_next_packet_info_for_interface2( + payload_info.src, dst_sw_if_index, + last_info[payload_info.src]) + last_info[payload_info.src] = next_info + self.assertTrue(next_info is not None) + self.assertEqual(packet_index, next_info.index) + saved_packet = next_info.data + # Check standard fields + self.assertEqual(ip.src, saved_packet[ip_version].src) + self.assertEqual(ip.dst, saved_packet[ip_version].dst) + p = self.proto_map[payload_info.proto] + if p == 'TCP': + tcp = packet[TCP] + self.assertEqual(tcp.sport, saved_packet[ + TCP].sport) + self.assertEqual(tcp.dport, saved_packet[ + TCP].dport) + elif p == 'UDP': + udp = packet[UDP] + self.assertEqual(udp.sport, saved_packet[ + UDP].sport) + self.assertEqual(udp.dport, saved_packet[ + UDP].dport) + except: + self.logger.error(ppp("Unexpected or invalid packet:", + packet)) + raise + for i in self.pg_interfaces: + remaining_packet = self.get_next_packet_info_for_interface2( + i, dst_sw_if_index, last_info[i.sw_if_index]) + self.assertTrue( + remaining_packet is None, + "Port %u: Packet expected from source %u didn't arrive" % + (dst_sw_if_index, i.sw_if_index)) + + def run_traffic_no_check(self): + # Test + # Create incoming packet streams for packet-generator interfaces + for i in self.pg_interfaces: + if self.flows.__contains__(i): + pkts = self.create_stream(i, self.pg_if_packet_sizes) + if len(pkts) > 0: + i.add_stream(pkts) + + # Enable packet capture and start packet sending + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0, + frags=False, pkt_raw=True, etype=-1): + # Test + # Create incoming packet streams for packet-generator interfaces + pkts_cnt = 0 + for i in self.pg_interfaces: + if self.flows.__contains__(i): + pkts = self.create_stream(i, self.pg_if_packet_sizes, + traffic_type, ip_type, proto, ports, + frags, pkt_raw, etype) + if len(pkts) > 0: + i.add_stream(pkts) + pkts_cnt += len(pkts) + + # Enable packet capture and start packet sendingself.IPV + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.logger.info("sent packets count: %d" % pkts_cnt) + + # Verify + # Verify outgoing packet streams per packet-generator interface + for src_if in self.pg_interfaces: + if self.flows.__contains__(src_if): + for dst_if in self.flows[src_if]: + capture = dst_if.get_capture(pkts_cnt) + self.logger.info("Verifying capture on interface %s" % + dst_if.name) + self.verify_capture(dst_if, capture, + traffic_type, ip_type, etype) + + def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1, + ports=0, frags=False, etype=-1): + # Test + pkts_cnt = 0 + self.reset_packet_infos() + for i in self.pg_interfaces: + if self.flows.__contains__(i): + pkts = self.create_stream(i, self.pg_if_packet_sizes, + traffic_type, ip_type, proto, ports, + frags, True, etype) + if len(pkts) > 0: + i.add_stream(pkts) + pkts_cnt += len(pkts) + + # Enable packet capture and start packet sending + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.logger.info("sent packets count: %d" % pkts_cnt) + + # Verify + # Verify outgoing packet streams per packet-generator interface + for src_if in self.pg_interfaces: + if self.flows.__contains__(src_if): + for dst_if in self.flows[src_if]: + self.logger.info("Verifying capture on interface %s" % + dst_if.name) + capture = dst_if.get_capture(0) + self.assertEqual(len(capture), 0) + + def test_0000_warmup_test(self): + """ ACL plugin version check; learn MACs + """ + reply = self.vapi.papi.acl_plugin_get_version() + self.assertEqual(reply.major, 1) + self.logger.info("Working with ACL plugin version: %d.%d" % ( + reply.major, reply.minor)) + # minor version changes are non breaking + # self.assertEqual(reply.minor, 0) + + def test_0001_acl_create(self): + """ ACL create/delete test + """ + + self.logger.info("ACLP_TEST_START_0001") + # Add an ACL + r = [{'is_permit': 1, 'is_ipv6': 0, 'proto': 17, + 'srcport_or_icmptype_first': 1234, + 'srcport_or_icmptype_last': 1235, + 'src_ip_prefix_len': 0, + 'src_ip_addr': b'\x00\x00\x00\x00', + 'dstport_or_icmpcode_first': 1234, + 'dstport_or_icmpcode_last': 1234, + 'dst_ip_addr': b'\x00\x00\x00\x00', + 'dst_ip_prefix_len': 0}] + # Test 1: add a new ACL + reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r, + tag=b"permit 1234") + self.assertEqual(reply.retval, 0) + # The very first ACL gets #0 + self.assertEqual(reply.acl_index, 0) + first_acl = reply.acl_index + rr = self.vapi.acl_dump(reply.acl_index) + self.logger.info("Dumped ACL: " + str(rr)) + self.assertEqual(len(rr), 1) + # We should have the same number of ACL entries as we had asked + self.assertEqual(len(rr[0].r), len(r)) + # The rules should be the same. But because the submitted and returned + # are different types, we need to iterate over rules and keys to get + # to basic values. + for i_rule in range(0, len(r) - 1): + for rule_key in r[i_rule]: + self.assertEqual(rr[0].r[i_rule][rule_key], + r[i_rule][rule_key]) + + # Add a deny-1234 ACL + r_deny = [{'is_permit': 0, 'is_ipv6': 0, 'proto': 17, + 'srcport_or_icmptype_first': 1234, + 'srcport_or_icmptype_last': 1235, + 'src_ip_prefix_len': 0, + 'src_ip_addr': b'\x00\x00\x00\x00', + 'dstport_or_icmpcode_first': 1234, + 'dstport_or_icmpcode_last': 1234, + 'dst_ip_addr': b'\x00\x00\x00\x00', + 'dst_ip_prefix_len': 0}, + {'is_permit': 1, 'is_ipv6': 0, 'proto': 17, + 'srcport_or_icmptype_first': 0, + 'srcport_or_icmptype_last': 0, + 'src_ip_prefix_len': 0, + 'src_ip_addr': b'\x00\x00\x00\x00', + 'dstport_or_icmpcode_first': 0, + 'dstport_or_icmpcode_last': 0, + 'dst_ip_addr': b'\x00\x00\x00\x00', + 'dst_ip_prefix_len': 0}] + + reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_deny, + tag=b"deny 1234;permit all") + self.assertEqual(reply.retval, 0) + # The second ACL gets #1 + self.assertEqual(reply.acl_index, 1) + second_acl = reply.acl_index + + # Test 2: try to modify a nonexistent ACL + reply = self.vapi.acl_add_replace(acl_index=432, r=r, + tag=b"FFFF:FFFF", expected_retval=-6) + self.assertEqual(reply.retval, -6) + # The ACL number should pass through + self.assertEqual(reply.acl_index, 432) + # apply an ACL on an interface inbound, try to delete ACL, must fail + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, + n_input=1, + acls=[first_acl]) + reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=-142) + # Unapply an ACL and then try to delete it - must be ok + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, + n_input=0, + acls=[]) + reply = self.vapi.acl_del(acl_index=first_acl, expected_retval=0) + + # apply an ACL on an interface outbound, try to delete ACL, must fail + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, + n_input=0, + acls=[second_acl]) + reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=-143) + # Unapply the ACL and then try to delete it - must be ok + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, + n_input=0, + acls=[]) + reply = self.vapi.acl_del(acl_index=second_acl, expected_retval=0) + + # try to apply a nonexistent ACL - must fail + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, + n_input=1, + acls=[first_acl], + expected_retval=-6) + + self.logger.info("ACLP_TEST_FINISH_0001") + + def test_0002_acl_permit_apply(self): + """ permit ACL apply test + """ + self.logger.info("ACLP_TEST_START_0002") + + rules = [] + rules.append(self.create_rule(self.IPV4, self.PERMIT, + 0, self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, + 0, self.proto[self.IP][self.TCP])) + + # Apply rules + acl_idx = self.apply_rules(rules, b"permit per-flow") + + # enable counters + reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1) + + # Traffic should still pass + self.run_verify_test(self.IP, self.IPV4, -1) + + matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx) + self.logger.info("stat segment counters: %s" % repr(matches)) + cli = "show acl-plugin acl" + self.logger.info(self.vapi.ppcli(cli)) + cli = "show acl-plugin tables" + self.logger.info(self.vapi.ppcli(cli)) + + total_hits = matches[0][0]['packets'] + matches[0][1]['packets'] + self.assertEqual(total_hits, 64) + + # disable counters + reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0) + + self.logger.info("ACLP_TEST_FINISH_0002") + + def test_0003_acl_deny_apply(self): + """ deny ACL apply test + """ + self.logger.info("ACLP_TEST_START_0003") + # Add a deny-flows ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, + self.PORTS_ALL, self.proto[self.IP][self.UDP])) + # Permit ip any any in the end + rules.append(self.create_rule(self.IPV4, self.PERMIT, + self.PORTS_ALL, 0)) + + # Apply rules + acl_idx = self.apply_rules(rules, b"deny per-flow;permit all") + + # enable counters + reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1) + + # Traffic should not pass + self.run_verify_negat_test(self.IP, self.IPV4, + self.proto[self.IP][self.UDP]) + + matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx) + self.logger.info("stat segment counters: %s" % repr(matches)) + cli = "show acl-plugin acl" + self.logger.info(self.vapi.ppcli(cli)) + cli = "show acl-plugin tables" + self.logger.info(self.vapi.ppcli(cli)) + self.assertEqual(matches[0][0]['packets'], 64) + # disable counters + reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0) + self.logger.info("ACLP_TEST_FINISH_0003") + # self.assertEqual(, 0) + + def test_0004_vpp624_permit_icmpv4(self): + """ VPP_624 permit ICMPv4 + """ + self.logger.info("ACLP_TEST_START_0004") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.ICMP][self.ICMPv4])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit icmpv4") + + # Traffic should still pass + self.run_verify_test(self.ICMP, self.IPV4, + self.proto[self.ICMP][self.ICMPv4]) + + self.logger.info("ACLP_TEST_FINISH_0004") + + def test_0005_vpp624_permit_icmpv6(self): + """ VPP_624 permit ICMPv6 + """ + self.logger.info("ACLP_TEST_START_0005") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE, + self.proto[self.ICMP][self.ICMPv6])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit icmpv6") + + # Traffic should still pass + self.run_verify_test(self.ICMP, self.IPV6, + self.proto[self.ICMP][self.ICMPv6]) + + self.logger.info("ACLP_TEST_FINISH_0005") + + def test_0006_vpp624_deny_icmpv4(self): + """ VPP_624 deny ICMPv4 + """ + self.logger.info("ACLP_TEST_START_0006") + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE, + self.proto[self.ICMP][self.ICMPv4])) + # permit ip any any in the end + rules.append(self.create_rule(self.IPV4, self.PERMIT, + self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"deny icmpv4") + + # Traffic should not pass + self.run_verify_negat_test(self.ICMP, self.IPV4, 0) + + self.logger.info("ACLP_TEST_FINISH_0006") + + def test_0007_vpp624_deny_icmpv6(self): + """ VPP_624 deny ICMPv6 + """ + self.logger.info("ACLP_TEST_START_0007") + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE, + self.proto[self.ICMP][self.ICMPv6])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV6, self.PERMIT, + self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"deny icmpv6") + + # Traffic should not pass + self.run_verify_negat_test(self.ICMP, self.IPV6, 0) + + self.logger.info("ACLP_TEST_FINISH_0007") + + def test_0008_tcp_permit_v4(self): + """ permit TCPv4 + """ + self.logger.info("ACLP_TEST_START_0008") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit ipv4 tcp") + + # Traffic should still pass + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP]) + + self.logger.info("ACLP_TEST_FINISH_0008") + + def test_0009_tcp_permit_v6(self): + """ permit TCPv6 + """ + self.logger.info("ACLP_TEST_START_0009") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit ip6 tcp") + + # Traffic should still pass + self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP]) + + self.logger.info("ACLP_TEST_FINISH_0008") + + def test_0010_udp_permit_v4(self): + """ permit UDPv4 + """ + self.logger.info("ACLP_TEST_START_0010") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.UDP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit ipv udp") + + # Traffic should still pass + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP]) + + self.logger.info("ACLP_TEST_FINISH_0010") + + def test_0011_udp_permit_v6(self): + """ permit UDPv6 + """ + self.logger.info("ACLP_TEST_START_0011") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.UDP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit ip6 udp") + + # Traffic should still pass + self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP]) + + self.logger.info("ACLP_TEST_FINISH_0011") + + def test_0012_tcp_deny(self): + """ deny TCPv4/v6 + """ + self.logger.info("ACLP_TEST_START_0012") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # permit ip any any in the end + rules.append(self.create_rule(self.IPV4, self.PERMIT, + self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.PERMIT, + self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"deny ip4/ip6 tcp") + + # Traffic should not pass + self.run_verify_negat_test(self.IP, self.IPRANDOM, + self.proto[self.IP][self.TCP]) + + self.logger.info("ACLP_TEST_FINISH_0012") + + def test_0013_udp_deny(self): + """ deny UDPv4/v6 + """ + self.logger.info("ACLP_TEST_START_0013") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE, + self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE, + self.proto[self.IP][self.UDP])) + # permit ip any any in the end + rules.append(self.create_rule(self.IPV4, self.PERMIT, + self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.PERMIT, + self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"deny ip4/ip6 udp") + + # Traffic should not pass + self.run_verify_negat_test(self.IP, self.IPRANDOM, + self.proto[self.IP][self.UDP]) + + self.logger.info("ACLP_TEST_FINISH_0013") + + def test_0014_acl_dump(self): + """ verify add/dump acls + """ + self.logger.info("ACLP_TEST_START_0014") + + r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]], + [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]], + [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]], + [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]], + [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]], + [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]], + [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]], + [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]], + [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]], + [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]], + [self.IPV4, self.DENY, self.PORTS_ALL, 0], + [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]], + [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]], + [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]], + [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]], + [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]], + [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]], + [self.IPV6, self.DENY, self.PORTS_ALL, 0] + ] + + # Add and verify new ACLs + rules = [] + for i in range(len(r)): + rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3])) + + reply = self.vapi.acl_add_replace(acl_index=4294967295, r=rules) + result = self.vapi.acl_dump(reply.acl_index) + + i = 0 + for drules in result: + for dr in drules.r: + self.assertEqual(dr.is_ipv6, r[i][0]) + self.assertEqual(dr.is_permit, r[i][1]) + self.assertEqual(dr.proto, r[i][3]) + + if r[i][2] > 0: + self.assertEqual(dr.srcport_or_icmptype_first, r[i][2]) + else: + if r[i][2] < 0: + self.assertEqual(dr.srcport_or_icmptype_first, 0) + self.assertEqual(dr.srcport_or_icmptype_last, 65535) + else: + if dr.proto == self.proto[self.IP][self.TCP]: + self.assertGreater(dr.srcport_or_icmptype_first, + self.tcp_sport_from-1) + self.assertLess(dr.srcport_or_icmptype_first, + self.tcp_sport_to+1) + self.assertGreater(dr.dstport_or_icmpcode_last, + self.tcp_dport_from-1) + self.assertLess(dr.dstport_or_icmpcode_last, + self.tcp_dport_to+1) + elif dr.proto == self.proto[self.IP][self.UDP]: + self.assertGreater(dr.srcport_or_icmptype_first, + self.udp_sport_from-1) + self.assertLess(dr.srcport_or_icmptype_first, + self.udp_sport_to+1) + self.assertGreater(dr.dstport_or_icmpcode_last, + self.udp_dport_from-1) + self.assertLess(dr.dstport_or_icmpcode_last, + self.udp_dport_to+1) + i += 1 + + self.logger.info("ACLP_TEST_FINISH_0014") + + def test_0015_tcp_permit_port_v4(self): + """ permit single TCPv4 + """ + self.logger.info("ACLP_TEST_START_0015") + + port = random.randint(0, 65535) + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.PERMIT, port, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit ip4 tcp %d" % port) + + # Traffic should still pass + self.run_verify_test(self.IP, self.IPV4, + self.proto[self.IP][self.TCP], port) + + self.logger.info("ACLP_TEST_FINISH_0015") + + def test_0016_udp_permit_port_v4(self): + """ permit single UDPv4 + """ + self.logger.info("ACLP_TEST_START_0016") + + port = random.randint(0, 65535) + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.PERMIT, port, + self.proto[self.IP][self.UDP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit ip4 tcp %d" % port) + + # Traffic should still pass + self.run_verify_test(self.IP, self.IPV4, + self.proto[self.IP][self.UDP], port) + + self.logger.info("ACLP_TEST_FINISH_0016") + + def test_0017_tcp_permit_port_v6(self): + """ permit single TCPv6 + """ + self.logger.info("ACLP_TEST_START_0017") + + port = random.randint(0, 65535) + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV6, self.PERMIT, port, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit ip4 tcp %d" % port) + + # Traffic should still pass + self.run_verify_test(self.IP, self.IPV6, + self.proto[self.IP][self.TCP], port) + + self.logger.info("ACLP_TEST_FINISH_0017") + + def test_0018_udp_permit_port_v6(self): + """ permit single UPPv6 + """ + self.logger.info("ACLP_TEST_START_0018") + + port = random.randint(0, 65535) + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV6, self.PERMIT, port, + self.proto[self.IP][self.UDP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV6, self.DENY, + self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit ip4 tcp %d" % port) + + # Traffic should still pass + self.run_verify_test(self.IP, self.IPV6, + self.proto[self.IP][self.UDP], port) + + self.logger.info("ACLP_TEST_FINISH_0018") + + def test_0019_udp_deny_port(self): + """ deny single TCPv4/v6 + """ + self.logger.info("ACLP_TEST_START_0019") + + port = random.randint(0, 65535) + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, port, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV6, self.DENY, port, + self.proto[self.IP][self.TCP])) + # Permit ip any any in the end + rules.append(self.create_rule(self.IPV4, self.PERMIT, + self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.PERMIT, + self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port) + + # Traffic should not pass + self.run_verify_negat_test(self.IP, self.IPRANDOM, + self.proto[self.IP][self.TCP], port) + + self.logger.info("ACLP_TEST_FINISH_0019") + + def test_0020_udp_deny_port(self): + """ deny single UDPv4/v6 + """ + self.logger.info("ACLP_TEST_START_0020") + + port = random.randint(0, 65535) + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, port, + self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV6, self.DENY, port, + self.proto[self.IP][self.UDP])) + # Permit ip any any in the end + rules.append(self.create_rule(self.IPV4, self.PERMIT, + self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.PERMIT, + self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port) + + # Traffic should not pass + self.run_verify_negat_test(self.IP, self.IPRANDOM, + self.proto[self.IP][self.UDP], port) + + self.logger.info("ACLP_TEST_FINISH_0020") + + def test_0021_udp_deny_port_verify_fragment_deny(self): + """ deny single UDPv4/v6, permit ip any, verify non-initial fragment + blocked + """ + self.logger.info("ACLP_TEST_START_0021") + + port = random.randint(0, 65535) + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, port, + self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV6, self.DENY, port, + self.proto[self.IP][self.UDP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.PERMIT, + self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.PERMIT, + self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"deny ip4/ip6 udp %d" % port) + + # Traffic should not pass + self.run_verify_negat_test(self.IP, self.IPRANDOM, + self.proto[self.IP][self.UDP], port, True) + + self.logger.info("ACLP_TEST_FINISH_0021") + + def test_0022_zero_length_udp_ipv4(self): + """ VPP-687 zero length udp ipv4 packet""" + self.logger.info("ACLP_TEST_START_0022") + + port = random.randint(0, 65535) + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.PERMIT, port, + self.proto[self.IP][self.UDP])) + # deny ip any any in the end + rules.append( + self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit empty udp ip4 %d" % port) + + # Traffic should still pass + # Create incoming packet streams for packet-generator interfaces + pkts_cnt = 0 + pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes, + self.IP, self.IPV4, + self.proto[self.IP][self.UDP], port, + False, False) + if len(pkts) > 0: + self.pg0.add_stream(pkts) + pkts_cnt += len(pkts) + + # Enable packet capture and start packet sendingself.IPV + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + self.pg1.get_capture(pkts_cnt) + + self.logger.info("ACLP_TEST_FINISH_0022") + + def test_0023_zero_length_udp_ipv6(self): + """ VPP-687 zero length udp ipv6 packet""" + self.logger.info("ACLP_TEST_START_0023") + + port = random.randint(0, 65535) + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV6, self.PERMIT, port, + self.proto[self.IP][self.UDP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit empty udp ip6 %d" % port) + + # Traffic should still pass + # Create incoming packet streams for packet-generator interfaces + pkts_cnt = 0 + pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes, + self.IP, self.IPV6, + self.proto[self.IP][self.UDP], port, + False, False) + if len(pkts) > 0: + self.pg0.add_stream(pkts) + pkts_cnt += len(pkts) + + # Enable packet capture and start packet sendingself.IPV + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + # Verify outgoing packet streams per packet-generator interface + self.pg1.get_capture(pkts_cnt) + + self.logger.info("ACLP_TEST_FINISH_0023") + + def test_0108_tcp_permit_v4(self): + """ permit TCPv4 + non-match range + """ + self.logger.info("ACLP_TEST_START_0108") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit ipv4 tcp") + + # Traffic should still pass + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP]) + + self.logger.info("ACLP_TEST_FINISH_0108") + + def test_0109_tcp_permit_v6(self): + """ permit TCPv6 + non-match range + """ + self.logger.info("ACLP_TEST_START_0109") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit ip6 tcp") + + # Traffic should still pass + self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP]) + + self.logger.info("ACLP_TEST_FINISH_0109") + + def test_0110_udp_permit_v4(self): + """ permit UDPv4 + non-match range + """ + self.logger.info("ACLP_TEST_START_0110") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.UDP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit ipv4 udp") + + # Traffic should still pass + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP]) + + self.logger.info("ACLP_TEST_FINISH_0110") + + def test_0111_udp_permit_v6(self): + """ permit UDPv6 + non-match range + """ + self.logger.info("ACLP_TEST_START_0111") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.UDP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit ip6 udp") + + # Traffic should still pass + self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP]) + + self.logger.info("ACLP_TEST_FINISH_0111") + + def test_0112_tcp_deny(self): + """ deny TCPv4/v6 + non-match range + """ + self.logger.info("ACLP_TEST_START_0112") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.PERMIT, + self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV6, self.PERMIT, + self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # permit ip any any in the end + rules.append(self.create_rule(self.IPV4, self.PERMIT, + self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.PERMIT, + self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"deny ip4/ip6 tcp") + + # Traffic should not pass + self.run_verify_negat_test(self.IP, self.IPRANDOM, + self.proto[self.IP][self.TCP]) + + self.logger.info("ACLP_TEST_FINISH_0112") + + def test_0113_udp_deny(self): + """ deny UDPv4/v6 + non-match range + """ + self.logger.info("ACLP_TEST_START_0113") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.PERMIT, + self.PORTS_RANGE_2, + self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV6, self.PERMIT, + self.PORTS_RANGE_2, + self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE, + self.proto[self.IP][self.UDP])) + rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE, + self.proto[self.IP][self.UDP])) + # permit ip any any in the end + rules.append(self.create_rule(self.IPV4, self.PERMIT, + self.PORTS_ALL, 0)) + rules.append(self.create_rule(self.IPV6, self.PERMIT, + self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"deny ip4/ip6 udp") + + # Traffic should not pass + self.run_verify_negat_test(self.IP, self.IPRANDOM, + self.proto[self.IP][self.UDP]) + + self.logger.info("ACLP_TEST_FINISH_0113") + + def test_0300_tcp_permit_v4_etype_aaaa(self): + """ permit TCPv4, send 0xAAAA etype + """ + self.logger.info("ACLP_TEST_START_0300") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit ipv4 tcp") + + # Traffic should still pass also for an odd ethertype + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], + 0, False, True, 0xaaaa) + self.logger.info("ACLP_TEST_FINISH_0300") + + def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self): + """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked + """ + self.logger.info("ACLP_TEST_START_0305") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit ipv4 tcp") + + # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked + self.etype_whitelist([0xbbb], 1) + + # The oddball ethertype should be blocked + self.run_verify_negat_test(self.IP, self.IPV4, + self.proto[self.IP][self.TCP], + 0, False, 0xaaaa) + + # remove the whitelist + self.etype_whitelist([], 0) + + self.logger.info("ACLP_TEST_FINISH_0305") + + def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self): + """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass + """ + self.logger.info("ACLP_TEST_START_0306") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit ipv4 tcp") + + # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked + self.etype_whitelist([0xbbb], 1) + + # The whitelisted traffic, should pass + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], + 0, False, True, 0x0bbb) + + # remove the whitelist, the previously blocked 0xAAAA should pass now + self.etype_whitelist([], 0) + + self.logger.info("ACLP_TEST_FINISH_0306") + + def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self): + """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass + """ + self.logger.info("ACLP_TEST_START_0307") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # Apply rules + self.apply_rules(rules, b"permit ipv4 tcp") + + # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked + self.etype_whitelist([0xbbb], 1) + # remove the whitelist, the previously blocked 0xAAAA should pass now + self.etype_whitelist([], 0) + + # The whitelisted traffic, should pass + self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP], + 0, False, True, 0xaaaa) + + self.logger.info("ACLP_TEST_FINISH_0306") + + def test_0315_del_intf(self): + """ apply an acl and delete the interface + """ + self.logger.info("ACLP_TEST_START_0315") + + # Add an ACL + rules = [] + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2, + self.proto[self.IP][self.TCP])) + rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE, + self.proto[self.IP][self.TCP])) + # deny ip any any in the end + rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0)) + + # create an interface + intf = [] + intf.append(VppLoInterface(self)) + + # Apply rules + self.apply_rules_to(rules, b"permit ipv4 tcp", intf[0].sw_if_index) + + # Remove the interface + intf[0].remove_vpp_config() + + self.logger.info("ACLP_TEST_FINISH_0315") + +if __name__ == '__main__': + unittest.main(testRunner=VppTestRunner) diff --git a/src/plugins/acl/test/test_acl_plugin_conns.py b/src/plugins/acl/test/test_acl_plugin_conns.py new file mode 100644 index 00000000000..58c44e68262 --- /dev/null +++ b/src/plugins/acl/test/test_acl_plugin_conns.py @@ -0,0 +1,411 @@ +#!/usr/bin/env python +""" ACL plugin extended stateful tests """ + +import unittest +from framework import VppTestCase, VppTestRunner, running_extended_tests +from scapy.layers.l2 import Ether +from scapy.packet import Raw +from scapy.layers.inet import IP, UDP, TCP +from scapy.packet import Packet +from socket import inet_pton, AF_INET, AF_INET6 +from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest +from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting +from scapy.layers.inet6 import IPv6ExtHdrFragment +from pprint import pprint +from random import randint +from util import L4_Conn + + +def to_acl_rule(self, is_permit, wildcard_sport=False): + p = self + rule_family = AF_INET6 if p.haslayer(IPv6) else AF_INET + rule_prefix_len = 128 if p.haslayer(IPv6) else 32 + rule_l3_layer = IPv6 if p.haslayer(IPv6) else IP + rule_l4_sport = p.sport + rule_l4_dport = p.dport + if p.haslayer(IPv6): + rule_l4_proto = p[IPv6].nh + else: + rule_l4_proto = p[IP].proto + + if wildcard_sport: + rule_l4_sport_first = 0 + rule_l4_sport_last = 65535 + else: + rule_l4_sport_first = rule_l4_sport + rule_l4_sport_last = rule_l4_sport + + new_rule = { + 'is_permit': is_permit, + 'is_ipv6': p.haslayer(IPv6), + 'src_ip_addr': inet_pton(rule_family, + p[rule_l3_layer].src), + 'src_ip_prefix_len': rule_prefix_len, + 'dst_ip_addr': inet_pton(rule_family, + p[rule_l3_layer].dst), + 'dst_ip_prefix_len': rule_prefix_len, + 'srcport_or_icmptype_first': rule_l4_sport_first, + 'srcport_or_icmptype_last': rule_l4_sport_last, + 'dstport_or_icmpcode_first': rule_l4_dport, + 'dstport_or_icmpcode_last': rule_l4_dport, + 'proto': rule_l4_proto, + } + return new_rule + +Packet.to_acl_rule = to_acl_rule + + +class IterateWithSleep(): + def __init__(self, testcase, n_iters, description, sleep_sec): + self.curr = 0 + self.testcase = testcase + self.n_iters = n_iters + self.sleep_sec = sleep_sec + self.description = description + + def __iter__(self): + for x in range(0, self.n_iters): + yield x + self.testcase.sleep(self.sleep_sec) + + +class Conn(L4_Conn): + def apply_acls(self, reflect_side, acl_side): + pkts = [] + pkts.append(self.pkt(0)) + pkts.append(self.pkt(1)) + pkt = pkts[reflect_side] + + r = [] + r.append(pkt.to_acl_rule(2, wildcard_sport=True)) + r.append(self.wildcard_rule(0)) + res = self.testcase.vapi.acl_add_replace(0xffffffff, r) + self.testcase.assert_equal(res.retval, 0, "error adding ACL") + reflect_acl_index = res.acl_index + + r = [] + r.append(self.wildcard_rule(0)) + res = self.testcase.vapi.acl_add_replace(0xffffffff, r) + self.testcase.assert_equal(res.retval, 0, "error adding deny ACL") + deny_acl_index = res.acl_index + + if reflect_side == acl_side: + self.testcase.vapi.acl_interface_set_acl_list( + self.ifs[acl_side].sw_if_index, 1, + [reflect_acl_index, + deny_acl_index]) + self.testcase.vapi.acl_interface_set_acl_list( + self.ifs[1-acl_side].sw_if_index, 0, []) + else: + self.testcase.vapi.acl_interface_set_acl_list( + self.ifs[acl_side].sw_if_index, 1, + [deny_acl_index, + reflect_acl_index]) + self.testcase.vapi.acl_interface_set_acl_list( + self.ifs[1-acl_side].sw_if_index, 0, []) + + def wildcard_rule(self, is_permit): + any_addr = ["0.0.0.0", "::"] + rule_family = self.address_family + is_ip6 = 1 if rule_family == AF_INET6 else 0 + new_rule = { + 'is_permit': is_permit, + 'is_ipv6': is_ip6, + 'src_ip_addr': inet_pton(rule_family, any_addr[is_ip6]), + 'src_ip_prefix_len': 0, + 'dst_ip_addr': inet_pton(rule_family, any_addr[is_ip6]), + 'dst_ip_prefix_len': 0, + 'srcport_or_icmptype_first': 0, + 'srcport_or_icmptype_last': 65535, + 'dstport_or_icmpcode_first': 0, + 'dstport_or_icmpcode_last': 65535, + 'proto': 0, + } + return new_rule + + +@unittest.skipUnless(running_extended_tests, "part of extended tests") +class ACLPluginConnTestCase(VppTestCase): + """ ACL plugin connection-oriented extended testcases """ + + @classmethod + def setUpClass(cls): + super(ACLPluginConnTestCase, cls).setUpClass() + # create pg0 and pg1 + cls.create_pg_interfaces(range(2)) + cmd = "set acl-plugin session table event-trace 1" + cls.logger.info(cls.vapi.cli(cmd)) + for i in cls.pg_interfaces: + i.admin_up() + i.config_ip4() + i.config_ip6() + i.resolve_arp() + i.resolve_ndp() + + @classmethod + def tearDownClass(cls): + super(ACLPluginConnTestCase, cls).tearDownClass() + + def tearDown(self): + """Run standard test teardown and log various show commands + """ + super(ACLPluginConnTestCase, self).tearDown() + + def show_commands_at_teardown(self): + self.logger.info(self.vapi.cli("show ip arp")) + self.logger.info(self.vapi.cli("show ip6 neighbors")) + self.logger.info(self.vapi.cli("show acl-plugin sessions")) + self.logger.info(self.vapi.cli("show acl-plugin acl")) + self.logger.info(self.vapi.cli("show acl-plugin interface")) + self.logger.info(self.vapi.cli("show acl-plugin tables")) + self.logger.info(self.vapi.cli("show event-logger all")) + + def run_basic_conn_test(self, af, acl_side): + """ Basic conn timeout test """ + conn1 = Conn(self, self.pg0, self.pg1, af, UDP, 42001, 4242) + conn1.apply_acls(0, acl_side) + conn1.send_through(0) + # the return packets should pass + conn1.send_through(1) + # send some packets on conn1, ensure it doesn't go away + for i in IterateWithSleep(self, 20, "Keep conn active", 0.3): + conn1.send_through(1) + # allow the conn to time out + for i in IterateWithSleep(self, 30, "Wait for timeout", 0.1): + pass + # now try to send a packet on the reflected side + try: + p2 = conn1.send_through(1).command() + except: + # If we asserted while waiting, it's good. + # the conn should have timed out. + p2 = None + self.assert_equal(p2, None, "packet on long-idle conn") + + def run_active_conn_test(self, af, acl_side): + """ Idle connection behind active connection test """ + base = 10000 + 1000*acl_side + conn1 = Conn(self, self.pg0, self.pg1, af, UDP, base + 1, 2323) + conn2 = Conn(self, self.pg0, self.pg1, af, UDP, base + 2, 2323) + conn3 = Conn(self, self.pg0, self.pg1, af, UDP, base + 3, 2323) + conn1.apply_acls(0, acl_side) + conn1.send(0) + conn1.recv(1) + # create and check that the conn2/3 work + self.sleep(0.1) + conn2.send_pingpong(0) + self.sleep(0.1) + conn3.send_pingpong(0) + # send some packets on conn1, keep conn2/3 idle + for i in IterateWithSleep(self, 20, "Keep conn active", 0.2): + conn1.send_through(1) + try: + p2 = conn2.send_through(1).command() + except: + # If we asserted while waiting, it's good. + # the conn should have timed out. + p2 = None + # We should have not received the packet on a long-idle + # connection, because it should have timed out + # If it didn't - it is a problem + self.assert_equal(p2, None, "packet on long-idle conn") + + def run_clear_conn_test(self, af, acl_side): + """ Clear the connections via CLI """ + conn1 = Conn(self, self.pg0, self.pg1, af, UDP, 42001, 4242) + conn1.apply_acls(0, acl_side) + conn1.send_through(0) + # the return packets should pass + conn1.send_through(1) + # send some packets on conn1, ensure it doesn't go away + for i in IterateWithSleep(self, 20, "Keep conn active", 0.3): + conn1.send_through(1) + # clear all connections + self.vapi.ppcli("clear acl-plugin sessions") + # now try to send a packet on the reflected side + try: + p2 = conn1.send_through(1).command() + except: + # If we asserted while waiting, it's good. + # the conn should have timed out. + p2 = None + self.assert_equal(p2, None, "packet on supposedly deleted conn") + + def run_tcp_transient_setup_conn_test(self, af, acl_side): + conn1 = Conn(self, self.pg0, self.pg1, af, TCP, 53001, 5151) + conn1.apply_acls(0, acl_side) + conn1.send_through(0, 'S') + # the return packets should pass + conn1.send_through(1, 'SA') + # allow the conn to time out + for i in IterateWithSleep(self, 30, "Wait for timeout", 0.1): + pass + # ensure conn times out + try: + p2 = conn1.send_through(1).command() + except: + # If we asserted while waiting, it's good. + # the conn should have timed out. + p2 = None + self.assert_equal(p2, None, "packet on supposedly deleted conn") + + def run_tcp_established_conn_test(self, af, acl_side): + conn1 = Conn(self, self.pg0, self.pg1, af, TCP, 53002, 5052) + conn1.apply_acls(0, acl_side) + conn1.send_through(0, 'S') + # the return packets should pass + conn1.send_through(1, 'SA') + # complete the threeway handshake + # (NB: sequence numbers not tracked, so not set!) + conn1.send_through(0, 'A') + # allow the conn to time out if it's in embryonic timer + for i in IterateWithSleep(self, 30, "Wait for transient timeout", 0.1): + pass + # Try to send the packet from the "forbidden" side - it must pass + conn1.send_through(1, 'A') + # ensure conn times out for real + for i in IterateWithSleep(self, 130, "Wait for timeout", 0.1): + pass + try: + p2 = conn1.send_through(1).command() + except: + # If we asserted while waiting, it's good. + # the conn should have timed out. + p2 = None + self.assert_equal(p2, None, "packet on supposedly deleted conn") + + def run_tcp_transient_teardown_conn_test(self, af, acl_side): + conn1 = Conn(self, self.pg0, self.pg1, af, TCP, 53002, 5052) + conn1.apply_acls(0, acl_side) + conn1.send_through(0, 'S') + # the return packets should pass + conn1.send_through(1, 'SA') + # complete the threeway handshake + # (NB: sequence numbers not tracked, so not set!) + conn1.send_through(0, 'A') + # allow the conn to time out if it's in embryonic timer + for i in IterateWithSleep(self, 30, "Wait for transient timeout", 0.1): + pass + # Try to send the packet from the "forbidden" side - it must pass + conn1.send_through(1, 'A') + # Send the FIN to bounce the session out of established + conn1.send_through(1, 'FA') + # If conn landed on transient timer it will time out here + for i in IterateWithSleep(self, 30, "Wait for transient timeout", 0.1): + pass + # Now it should have timed out already + try: + p2 = conn1.send_through(1).command() + except: + # If we asserted while waiting, it's good. + # the conn should have timed out. + p2 = None + self.assert_equal(p2, None, "packet on supposedly deleted conn") + + def test_0000_conn_prepare_test(self): + """ Prepare the settings """ + self.vapi.ppcli("set acl-plugin session timeout udp idle 1") + + def test_0001_basic_conn_test(self): + """ IPv4: Basic conn timeout test reflect on ingress """ + self.run_basic_conn_test(AF_INET, 0) + + def test_0002_basic_conn_test(self): + """ IPv4: Basic conn timeout test reflect on egress """ + self.run_basic_conn_test(AF_INET, 1) + + def test_0005_clear_conn_test(self): + """ IPv4: reflect egress, clear conn """ + self.run_clear_conn_test(AF_INET, 1) + + def test_0006_clear_conn_test(self): + """ IPv4: reflect ingress, clear conn """ + self.run_clear_conn_test(AF_INET, 0) + + def test_0011_active_conn_test(self): + """ IPv4: Idle conn behind active conn, reflect on ingress """ + self.run_active_conn_test(AF_INET, 0) + + def test_0012_active_conn_test(self): + """ IPv4: Idle conn behind active conn, reflect on egress """ + self.run_active_conn_test(AF_INET, 1) + + def test_1001_basic_conn_test(self): + """ IPv6: Basic conn timeout test reflect on ingress """ + self.run_basic_conn_test(AF_INET6, 0) + + def test_1002_basic_conn_test(self): + """ IPv6: Basic conn timeout test reflect on egress """ + self.run_basic_conn_test(AF_INET6, 1) + + def test_1005_clear_conn_test(self): + """ IPv6: reflect egress, clear conn """ + self.run_clear_conn_test(AF_INET6, 1) + + def test_1006_clear_conn_test(self): + """ IPv6: reflect ingress, clear conn """ + self.run_clear_conn_test(AF_INET6, 0) + + def test_1011_active_conn_test(self): + """ IPv6: Idle conn behind active conn, reflect on ingress """ + self.run_active_conn_test(AF_INET6, 0) + + def test_1012_active_conn_test(self): + """ IPv6: Idle conn behind active conn, reflect on egress """ + self.run_active_conn_test(AF_INET6, 1) + + def test_2000_prepare_for_tcp_test(self): + """ Prepare for TCP session tests """ + # ensure the session hangs on if it gets treated as UDP + self.vapi.ppcli("set acl-plugin session timeout udp idle 200") + # let the TCP connection time out at 5 seconds + self.vapi.ppcli("set acl-plugin session timeout tcp idle 10") + self.vapi.ppcli("set acl-plugin session timeout tcp transient 1") + + def test_2001_tcp_transient_conn_test(self): + """ IPv4: transient TCP session (incomplete 3WHS), ref. on ingress """ + self.run_tcp_transient_setup_conn_test(AF_INET, 0) + + def test_2002_tcp_transient_conn_test(self): + """ IPv4: transient TCP session (incomplete 3WHS), ref. on egress """ + self.run_tcp_transient_setup_conn_test(AF_INET, 1) + + def test_2003_tcp_transient_conn_test(self): + """ IPv4: established TCP session (complete 3WHS), ref. on ingress """ + self.run_tcp_established_conn_test(AF_INET, 0) + + def test_2004_tcp_transient_conn_test(self): + """ IPv4: established TCP session (complete 3WHS), ref. on egress """ + self.run_tcp_established_conn_test(AF_INET, 1) + + def test_2005_tcp_transient_teardown_conn_test(self): + """ IPv4: transient TCP session (3WHS,ACK,FINACK), ref. on ingress """ + self.run_tcp_transient_teardown_conn_test(AF_INET, 0) + + def test_2006_tcp_transient_teardown_conn_test(self): + """ IPv4: transient TCP session (3WHS,ACK,FINACK), ref. on egress """ + self.run_tcp_transient_teardown_conn_test(AF_INET, 1) + + def test_3001_tcp_transient_conn_test(self): + """ IPv6: transient TCP session (incomplete 3WHS), ref. on ingress """ + self.run_tcp_transient_setup_conn_test(AF_INET6, 0) + + def test_3002_tcp_transient_conn_test(self): + """ IPv6: transient TCP session (incomplete 3WHS), ref. on egress """ + self.run_tcp_transient_setup_conn_test(AF_INET6, 1) + + def test_3003_tcp_transient_conn_test(self): + """ IPv6: established TCP session (complete 3WHS), ref. on ingress """ + self.run_tcp_established_conn_test(AF_INET6, 0) + + def test_3004_tcp_transient_conn_test(self): + """ IPv6: established TCP session (complete 3WHS), ref. on egress """ + self.run_tcp_established_conn_test(AF_INET6, 1) + + def test_3005_tcp_transient_teardown_conn_test(self): + """ IPv6: transient TCP session (3WHS,ACK,FINACK), ref. on ingress """ + self.run_tcp_transient_teardown_conn_test(AF_INET6, 0) + + def test_3006_tcp_transient_teardown_conn_test(self): + """ IPv6: transient TCP session (3WHS,ACK,FINACK), ref. on egress """ + self.run_tcp_transient_teardown_conn_test(AF_INET6, 1) diff --git a/src/plugins/acl/test/test_acl_plugin_l2l3.py b/src/plugins/acl/test/test_acl_plugin_l2l3.py new file mode 100644 index 00000000000..31b4058fc69 --- /dev/null +++ b/src/plugins/acl/test/test_acl_plugin_l2l3.py @@ -0,0 +1,871 @@ +#!/usr/bin/env python +"""ACL IRB Test Case HLD: + +**config** + - L2 MAC learning enabled in l2bd + - 2 routed interfaces untagged, bvi (Bridge Virtual Interface) + - 2 bridged interfaces in l2bd with bvi + +**test** + - sending ip4 eth pkts between routed interfaces + - 2 routed interfaces + - 2 bridged interfaces + + - 64B, 512B, 1518B, 9200B (ether_size) + + - burst of pkts per interface + - 257pkts per burst + - routed pkts hitting different FIB entries + - bridged pkts hitting different MAC entries + +**verify** + - all packets received correctly + +""" + +import unittest +from socket import inet_pton, AF_INET, AF_INET6 +from random import choice, shuffle +from pprint import pprint + +import scapy.compat +from scapy.packet import Raw +from scapy.layers.l2 import Ether +from scapy.layers.inet import IP, UDP, ICMP, TCP +from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest +from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting +from scapy.layers.inet6 import IPv6ExtHdrFragment + +from framework import VppTestCase, VppTestRunner +from vpp_l2 import L2_PORT_TYPE +import time + + +class TestACLpluginL2L3(VppTestCase): + """TestACLpluginL2L3 Test Case""" + + @classmethod + def setUpClass(cls): + """ + #. Create BD with MAC learning enabled and put interfaces to this BD. + #. Configure IPv4 addresses on loopback interface and routed interface. + #. Configure MAC address binding to IPv4 neighbors on loop0. + #. Configure MAC address on pg2. + #. Loopback BVI interface has remote hosts, one half of hosts are + behind pg0 second behind pg1. + """ + super(TestACLpluginL2L3, cls).setUpClass() + + cls.pg_if_packet_sizes = [64, 512, 1518, 9018] # packet sizes + cls.bd_id = 10 + cls.remote_hosts_count = 250 + + # create 3 pg interfaces, 1 loopback interface + cls.create_pg_interfaces(range(3)) + cls.create_loopback_interfaces(1) + + cls.interfaces = list(cls.pg_interfaces) + cls.interfaces.extend(cls.lo_interfaces) + + for i in cls.interfaces: + i.admin_up() + + # Create BD with MAC learning enabled and put interfaces to this BD + cls.vapi.sw_interface_set_l2_bridge( + rx_sw_if_index=cls.loop0.sw_if_index, bd_id=cls.bd_id, + port_type=L2_PORT_TYPE.BVI) + cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg0.sw_if_index, + bd_id=cls.bd_id) + cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg1.sw_if_index, + bd_id=cls.bd_id) + + # Configure IPv4 addresses on loopback interface and routed interface + cls.loop0.config_ip4() + cls.loop0.config_ip6() + cls.pg2.config_ip4() + cls.pg2.config_ip6() + + # Configure MAC address binding to IPv4 neighbors on loop0 + cls.loop0.generate_remote_hosts(cls.remote_hosts_count) + cls.loop0.configure_ipv4_neighbors() + cls.loop0.configure_ipv6_neighbors() + # configure MAC address on pg2 + cls.pg2.resolve_arp() + cls.pg2.resolve_ndp() + + cls.WITHOUT_EH = False + cls.WITH_EH = True + cls.STATELESS_ICMP = False + cls.STATEFUL_ICMP = True + + # Loopback BVI interface has remote hosts, one half of hosts are behind + # pg0 second behind pg1 + half = cls.remote_hosts_count // 2 + cls.pg0.remote_hosts = cls.loop0.remote_hosts[:half] + cls.pg1.remote_hosts = cls.loop0.remote_hosts[half:] + reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=1) + + @classmethod + def tearDownClass(cls): + reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=0) + super(TestACLpluginL2L3, cls).tearDownClass() + + def tearDown(self): + """Run standard test teardown and log ``show l2patch``, + ``show l2fib verbose``,``show bridge-domain detail``, + ``show ip arp``. + """ + super(TestACLpluginL2L3, self).tearDown() + + def show_commands_at_teardown(self): + self.logger.info(self.vapi.cli("show l2patch")) + self.logger.info(self.vapi.cli("show classify tables")) + self.logger.info(self.vapi.cli("show l2fib verbose")) + self.logger.info(self.vapi.cli("show bridge-domain %s detail" % + self.bd_id)) + self.logger.info(self.vapi.cli("show ip arp")) + self.logger.info(self.vapi.cli("show ip6 neighbors")) + cmd = "show acl-plugin sessions verbose 1" + self.logger.info(self.vapi.cli(cmd)) + self.logger.info(self.vapi.cli("show acl-plugin acl")) + self.logger.info(self.vapi.cli("show acl-plugin interface")) + self.logger.info(self.vapi.cli("show acl-plugin tables")) + + def create_stream(self, src_ip_if, dst_ip_if, reverse, packet_sizes, + is_ip6, expect_blocked, expect_established, + add_extension_header, icmp_stateful=False): + pkts = [] + rules = [] + permit_rules = [] + permit_and_reflect_rules = [] + total_packet_count = 8 + for i in range(0, total_packet_count): + modulo = (i//2) % 2 + icmp_type_delta = i % 2 + icmp_code = i + is_udp_packet = (modulo == 0) + if is_udp_packet and icmp_stateful: + continue + is_reflectable_icmp = (icmp_stateful and icmp_type_delta == 0 and + not is_udp_packet) + is_reflected_icmp = is_reflectable_icmp and expect_established + can_reflect_this_packet = is_udp_packet or is_reflectable_icmp + is_permit = i % 2 + remote_dst_index = i % len(dst_ip_if.remote_hosts) + remote_dst_host = dst_ip_if.remote_hosts[remote_dst_index] + if is_permit == 1: + info = self.create_packet_info(src_ip_if, dst_ip_if) + payload = self.info_to_payload(info) + else: + to_be_blocked = False + if (expect_blocked and not expect_established): + to_be_blocked = True + if (not can_reflect_this_packet): + to_be_blocked = True + if to_be_blocked: + payload = "to be blocked" + else: + info = self.create_packet_info(src_ip_if, dst_ip_if) + payload = self.info_to_payload(info) + if reverse: + dst_mac = 'de:ad:00:00:00:00' + src_mac = remote_dst_host._mac + dst_ip6 = src_ip_if.remote_ip6 + src_ip6 = remote_dst_host.ip6 + dst_ip4 = src_ip_if.remote_ip4 + src_ip4 = remote_dst_host.ip4 + dst_l4 = 1234 + i + src_l4 = 4321 + i + else: + dst_mac = src_ip_if.local_mac + src_mac = src_ip_if.remote_mac + src_ip6 = src_ip_if.remote_ip6 + dst_ip6 = remote_dst_host.ip6 + src_ip4 = src_ip_if.remote_ip4 + dst_ip4 = remote_dst_host.ip4 + src_l4 = 1234 + i + dst_l4 = 4321 + i + if is_reflected_icmp: + icmp_type_delta = 1 + + # default ULP should be something we do not use in tests + ulp_l4 = TCP(sport=src_l4, dport=dst_l4) + # potentially a chain of protocols leading to ULP + ulp = ulp_l4 + + if is_udp_packet: + if is_ip6: + ulp_l4 = UDP(sport=src_l4, dport=dst_l4) + if add_extension_header: + # prepend some extension headers + ulp = (IPv6ExtHdrRouting() / IPv6ExtHdrRouting() / + IPv6ExtHdrFragment(offset=0, m=1) / ulp_l4) + # uncomment below to test invalid ones + # ulp = IPv6ExtHdrRouting(len = 200) / ulp_l4 + else: + ulp = ulp_l4 + p = (Ether(dst=dst_mac, src=src_mac) / + IPv6(src=src_ip6, dst=dst_ip6) / + ulp / + Raw(payload)) + else: + ulp_l4 = UDP(sport=src_l4, dport=dst_l4) + # IPv4 does not allow extension headers, + # but we rather make it a first fragment + flags = 1 if add_extension_header else 0 + ulp = ulp_l4 + p = (Ether(dst=dst_mac, src=src_mac) / + IP(src=src_ip4, dst=dst_ip4, frag=0, flags=flags) / + ulp / + Raw(payload)) + elif modulo == 1: + if is_ip6: + ulp_l4 = ICMPv6Unknown(type=128 + icmp_type_delta, + code=icmp_code) + ulp = ulp_l4 + p = (Ether(dst=dst_mac, src=src_mac) / + IPv6(src=src_ip6, dst=dst_ip6) / + ulp / + Raw(payload)) + else: + ulp_l4 = ICMP(type=8 - 8*icmp_type_delta, code=icmp_code) + ulp = ulp_l4 + p = (Ether(dst=dst_mac, src=src_mac) / + IP(src=src_ip4, dst=dst_ip4) / + ulp / + Raw(payload)) + + if i % 2 == 1: + info.data = p.copy() + size = packet_sizes[(i // 2) % len(packet_sizes)] + self.extend_packet(p, size) + pkts.append(p) + + rule_family = AF_INET6 if p.haslayer(IPv6) else AF_INET + rule_prefix_len = 128 if p.haslayer(IPv6) else 32 + rule_l3_layer = IPv6 if p.haslayer(IPv6) else IP + + if p.haslayer(UDP): + rule_l4_sport = p[UDP].sport + rule_l4_dport = p[UDP].dport + else: + if p.haslayer(ICMP): + rule_l4_sport = p[ICMP].type + rule_l4_dport = p[ICMP].code + else: + rule_l4_sport = p[ICMPv6Unknown].type + rule_l4_dport = p[ICMPv6Unknown].code + if p.haslayer(IPv6): + rule_l4_proto = ulp_l4.overload_fields[IPv6]['nh'] + else: + rule_l4_proto = p[IP].proto + + new_rule = { + 'is_permit': is_permit, + 'is_ipv6': p.haslayer(IPv6), + 'src_ip_addr': inet_pton(rule_family, + p[rule_l3_layer].src), + 'src_ip_prefix_len': rule_prefix_len, + 'dst_ip_addr': inet_pton(rule_family, + p[rule_l3_layer].dst), + 'dst_ip_prefix_len': rule_prefix_len, + 'srcport_or_icmptype_first': rule_l4_sport, + 'srcport_or_icmptype_last': rule_l4_sport, + 'dstport_or_icmpcode_first': rule_l4_dport, + 'dstport_or_icmpcode_last': rule_l4_dport, + 'proto': rule_l4_proto, + } + rules.append(new_rule) + new_rule_permit = new_rule.copy() + new_rule_permit['is_permit'] = 1 + permit_rules.append(new_rule_permit) + + new_rule_permit_and_reflect = new_rule.copy() + if can_reflect_this_packet: + new_rule_permit_and_reflect['is_permit'] = 2 + else: + new_rule_permit_and_reflect['is_permit'] = is_permit + + permit_and_reflect_rules.append(new_rule_permit_and_reflect) + self.logger.info("create_stream pkt#%d: %s" % (i, payload)) + + return {'stream': pkts, + 'rules': rules, + 'permit_rules': permit_rules, + 'permit_and_reflect_rules': permit_and_reflect_rules} + + def verify_capture(self, dst_ip_if, src_ip_if, capture, reverse): + last_info = dict() + for i in self.interfaces: + last_info[i.sw_if_index] = None + + dst_ip_sw_if_index = dst_ip_if.sw_if_index + + for packet in capture: + l3 = IP if packet.haslayer(IP) else IPv6 + ip = packet[l3] + if packet.haslayer(UDP): + l4 = UDP + else: + if packet.haslayer(ICMP): + l4 = ICMP + else: + l4 = ICMPv6Unknown + + # Scapy IPv6 stuff is too smart for its own good. + # So we do this and coerce the ICMP into unknown type + if packet.haslayer(UDP): + data = scapy.compat.raw(packet[UDP][Raw]) + else: + if l3 == IP: + data = scapy.compat.raw(ICMP( + scapy.compat.raw(packet[l3].payload))[Raw]) + else: + data = scapy.compat.raw(ICMPv6Unknown( + scapy.compat.raw(packet[l3].payload)).msgbody) + udp_or_icmp = packet[l3].payload + data_obj = Raw(data) + # FIXME: make framework believe we are on object + payload_info = self.payload_to_info(data_obj) + packet_index = payload_info.index + + self.assertEqual(payload_info.dst, dst_ip_sw_if_index) + + next_info = self.get_next_packet_info_for_interface2( + payload_info.src, dst_ip_sw_if_index, + last_info[payload_info.src]) + last_info[payload_info.src] = next_info + self.assertTrue(next_info is not None) + self.assertEqual(packet_index, next_info.index) + saved_packet = next_info.data + self.assertTrue(next_info is not None) + + # MAC: src, dst + if not reverse: + self.assertEqual(packet.src, dst_ip_if.local_mac) + host = dst_ip_if.host_by_mac(packet.dst) + + # IP: src, dst + # self.assertEqual(ip.src, src_ip_if.remote_ip4) + if saved_packet is not None: + self.assertEqual(ip.src, saved_packet[l3].src) + self.assertEqual(ip.dst, saved_packet[l3].dst) + if l4 == UDP: + self.assertEqual(udp_or_icmp.sport, saved_packet[l4].sport) + self.assertEqual(udp_or_icmp.dport, saved_packet[l4].dport) + # self.assertEqual(ip.dst, host.ip4) + + # UDP: + + def applied_acl_shuffle(self, sw_if_index): + # first collect what ACLs are applied and what they look like + r = self.vapi.acl_interface_list_dump(sw_if_index=sw_if_index) + orig_applied_acls = r[0] + + # we will collect these just to save and generate additional rulesets + orig_acls = [] + for acl_num in orig_applied_acls.acls: + rr = self.vapi.acl_dump(acl_num) + orig_acls.append(rr[0]) + + # now create a list of all the rules in all ACLs + all_rules = [] + for old_acl in orig_acls: + for rule in old_acl.r: + all_rules.append(dict(rule._asdict())) + + # Add a few ACLs made from shuffled rules + shuffle(all_rules) + reply = self.vapi.acl_add_replace(acl_index=4294967295, + r=all_rules[::2], + tag=b"shuffle 1. acl") + shuffle_acl_1 = reply.acl_index + shuffle(all_rules) + reply = self.vapi.acl_add_replace(acl_index=4294967295, + r=all_rules[::3], + tag=b"shuffle 2. acl") + shuffle_acl_2 = reply.acl_index + shuffle(all_rules) + reply = self.vapi.acl_add_replace(acl_index=4294967295, + r=all_rules[::2], + tag=b"shuffle 3. acl") + shuffle_acl_3 = reply.acl_index + + # apply the shuffle ACLs in front + input_acls = [shuffle_acl_1, shuffle_acl_2] + output_acls = [shuffle_acl_1, shuffle_acl_2] + + # add the currently applied ACLs + n_input = orig_applied_acls.n_input + input_acls.extend(orig_applied_acls.acls[:n_input]) + output_acls.extend(orig_applied_acls.acls[n_input:]) + + # and the trailing shuffle ACL(s) + input_acls.extend([shuffle_acl_3]) + output_acls.extend([shuffle_acl_3]) + + # set the interface ACL list to the result + self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index, + n_input=len(input_acls), + acls=input_acls + output_acls) + # change the ACLs a few times + for i in range(1, 10): + shuffle(all_rules) + reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_1, + r=all_rules[::1+(i % 2)], + tag=b"shuffle 1. acl") + shuffle(all_rules) + reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2, + r=all_rules[::1+(i % 3)], + tag=b"shuffle 2. acl") + shuffle(all_rules) + reply = self.vapi.acl_add_replace(acl_index=shuffle_acl_2, + r=all_rules[::1+(i % 5)], + tag=b"shuffle 3. acl") + + # restore to how it was before and clean up + self.vapi.acl_interface_set_acl_list(sw_if_index=sw_if_index, + n_input=orig_applied_acls.n_input, + acls=orig_applied_acls.acls) + reply = self.vapi.acl_del(acl_index=shuffle_acl_1) + reply = self.vapi.acl_del(acl_index=shuffle_acl_2) + reply = self.vapi.acl_del(acl_index=shuffle_acl_3) + + def create_acls_for_a_stream(self, stream_dict, + test_l2_action, is_reflect): + r = stream_dict['rules'] + r_permit = stream_dict['permit_rules'] + r_permit_reflect = stream_dict['permit_and_reflect_rules'] + r_action = r_permit_reflect if is_reflect else r + reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_action, + tag=b"act. acl") + action_acl_index = reply.acl_index + reply = self.vapi.acl_add_replace(acl_index=4294967295, r=r_permit, + tag=b"perm. acl") + permit_acl_index = reply.acl_index + return {'L2': action_acl_index if test_l2_action else permit_acl_index, + 'L3': permit_acl_index if test_l2_action else action_acl_index, + 'permit': permit_acl_index, 'action': action_acl_index} + + def apply_acl_ip46_x_to_y(self, bridged_to_routed, test_l2_deny, + is_ip6, is_reflect, add_eh): + """ Apply the ACLs + """ + self.reset_packet_infos() + stream_dict = self.create_stream( + self.pg2, self.loop0, + bridged_to_routed, + self.pg_if_packet_sizes, is_ip6, + not is_reflect, False, add_eh) + stream = stream_dict['stream'] + acl_idx = self.create_acls_for_a_stream(stream_dict, test_l2_deny, + is_reflect) + n_input_l3 = 0 if bridged_to_routed else 1 + n_input_l2 = 1 if bridged_to_routed else 0 + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index, + n_input=n_input_l3, + acls=[acl_idx['L3']]) + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, + n_input=n_input_l2, + acls=[acl_idx['L2']]) + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index, + n_input=n_input_l2, + acls=[acl_idx['L2']]) + self.applied_acl_shuffle(self.pg0.sw_if_index) + self.applied_acl_shuffle(self.pg2.sw_if_index) + return {'L2': acl_idx['L2'], 'L3': acl_idx['L3']} + + def apply_acl_ip46_both_directions_reflect(self, + primary_is_bridged_to_routed, + reflect_on_l2, is_ip6, add_eh, + stateful_icmp): + primary_is_routed_to_bridged = not primary_is_bridged_to_routed + self.reset_packet_infos() + stream_dict_fwd = self.create_stream(self.pg2, self.loop0, + primary_is_bridged_to_routed, + self.pg_if_packet_sizes, is_ip6, + False, False, add_eh, + stateful_icmp) + acl_idx_fwd = self.create_acls_for_a_stream(stream_dict_fwd, + reflect_on_l2, True) + + stream_dict_rev = self.create_stream(self.pg2, self.loop0, + not primary_is_bridged_to_routed, + self.pg_if_packet_sizes, is_ip6, + True, True, add_eh, stateful_icmp) + # We want the primary action to be "deny" rather than reflect + acl_idx_rev = self.create_acls_for_a_stream(stream_dict_rev, + reflect_on_l2, False) + + if primary_is_bridged_to_routed: + inbound_l2_acl = acl_idx_fwd['L2'] + else: + inbound_l2_acl = acl_idx_rev['L2'] + + if primary_is_routed_to_bridged: + outbound_l2_acl = acl_idx_fwd['L2'] + else: + outbound_l2_acl = acl_idx_rev['L2'] + + if primary_is_routed_to_bridged: + inbound_l3_acl = acl_idx_fwd['L3'] + else: + inbound_l3_acl = acl_idx_rev['L3'] + + if primary_is_bridged_to_routed: + outbound_l3_acl = acl_idx_fwd['L3'] + else: + outbound_l3_acl = acl_idx_rev['L3'] + + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg2.sw_if_index, + n_input=1, + acls=[inbound_l3_acl, + outbound_l3_acl]) + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg0.sw_if_index, + n_input=1, + acls=[inbound_l2_acl, + outbound_l2_acl]) + self.vapi.acl_interface_set_acl_list(sw_if_index=self.pg1.sw_if_index, + n_input=1, + acls=[inbound_l2_acl, + outbound_l2_acl]) + self.applied_acl_shuffle(self.pg0.sw_if_index) + self.applied_acl_shuffle(self.pg2.sw_if_index) + + def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6, + is_reflect, add_eh): + return self.apply_acl_ip46_x_to_y(False, test_l2_deny, is_ip6, + is_reflect, add_eh) + + def apply_acl_ip46_bridged_to_routed(self, test_l2_deny, is_ip6, + is_reflect, add_eh): + return self.apply_acl_ip46_x_to_y(True, test_l2_deny, is_ip6, + is_reflect, add_eh) + + def verify_acl_packet_count(self, acl_idx, packet_count): + matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx) + self.logger.info("stat seg for ACL %d: %s" % (acl_idx, repr(matches))) + total_count = 0 + for p in matches[0]: + total_count = total_count + p['packets'] + self.assertEqual(total_count, packet_count) + + def run_traffic_ip46_x_to_y(self, bridged_to_routed, + test_l2_deny, is_ip6, + is_reflect, is_established, add_eh, + stateful_icmp=False): + self.reset_packet_infos() + stream_dict = self.create_stream(self.pg2, self.loop0, + bridged_to_routed, + self.pg_if_packet_sizes, is_ip6, + not is_reflect, is_established, + add_eh, stateful_icmp) + stream = stream_dict['stream'] + + tx_if = self.pg0 if bridged_to_routed else self.pg2 + rx_if = self.pg2 if bridged_to_routed else self.pg0 + + tx_if.add_stream(stream) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + packet_count = self.get_packet_count_for_if_idx(self.loop0.sw_if_index) + rcvd1 = rx_if.get_capture(packet_count) + self.verify_capture(self.loop0, self.pg2, rcvd1, bridged_to_routed) + return len(stream) + + def run_traffic_ip46_routed_to_bridged(self, test_l2_deny, is_ip6, + is_reflect, is_established, add_eh, + stateful_icmp=False): + return self.run_traffic_ip46_x_to_y(False, test_l2_deny, is_ip6, + is_reflect, is_established, add_eh, + stateful_icmp) + + def run_traffic_ip46_bridged_to_routed(self, test_l2_deny, is_ip6, + is_reflect, is_established, add_eh, + stateful_icmp=False): + return self.run_traffic_ip46_x_to_y(True, test_l2_deny, is_ip6, + is_reflect, is_established, add_eh, + stateful_icmp) + + def run_test_ip46_routed_to_bridged(self, test_l2_deny, + is_ip6, is_reflect, add_eh): + acls = self.apply_acl_ip46_routed_to_bridged(test_l2_deny, + is_ip6, is_reflect, + add_eh) + pkts = self.run_traffic_ip46_routed_to_bridged(test_l2_deny, is_ip6, + is_reflect, False, + add_eh) + self.verify_acl_packet_count(acls['L3'], pkts) + + def run_test_ip46_bridged_to_routed(self, test_l2_deny, + is_ip6, is_reflect, add_eh): + acls = self.apply_acl_ip46_bridged_to_routed(test_l2_deny, + is_ip6, is_reflect, + add_eh) + pkts = self.run_traffic_ip46_bridged_to_routed(test_l2_deny, is_ip6, + is_reflect, False, + add_eh) + self.verify_acl_packet_count(acls['L2'], pkts) + + def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action, + is_ip6, add_eh, + stateful_icmp=False): + self.apply_acl_ip46_both_directions_reflect(False, test_l2_action, + is_ip6, add_eh, + stateful_icmp) + self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6, + True, False, add_eh, + stateful_icmp) + self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6, + False, True, add_eh, + stateful_icmp) + + def run_test_ip46_bridged_to_routed_and_back(self, test_l2_action, + is_ip6, add_eh, + stateful_icmp=False): + self.apply_acl_ip46_both_directions_reflect(True, test_l2_action, + is_ip6, add_eh, + stateful_icmp) + self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6, + True, False, add_eh, + stateful_icmp) + self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6, + False, True, add_eh, + stateful_icmp) + + def test_0000_ip6_irb_1(self): + """ ACL plugin prepare""" + if not self.vpp_dead: + cmd = "set acl-plugin session timeout udp idle 2000" + self.logger.info(self.vapi.ppcli(cmd)) + # uncomment to not skip past the routing header + # and watch the EH tests fail + # self.logger.info(self.vapi.ppcli( + # "set acl-plugin skip-ipv6-extension-header 43 0")) + # uncomment to test the session limit (stateful tests will fail) + # self.logger.info(self.vapi.ppcli( + # "set acl-plugin session table max-entries 1")) + # new datapath is the default, but just in case + # self.logger.info(self.vapi.ppcli( + # "set acl-plugin l2-datapath new")) + # If you want to see some tests fail, uncomment the next line + # self.logger.info(self.vapi.ppcli( + # "set acl-plugin l2-datapath old")) + + def test_0001_ip6_irb_1(self): + """ ACL IPv6 routed -> bridged, L2 ACL deny""" + self.run_test_ip46_routed_to_bridged(True, True, False, + self.WITHOUT_EH) + + def test_0002_ip6_irb_1(self): + """ ACL IPv6 routed -> bridged, L3 ACL deny""" + self.run_test_ip46_routed_to_bridged(False, True, False, + self.WITHOUT_EH) + + def test_0003_ip4_irb_1(self): + """ ACL IPv4 routed -> bridged, L2 ACL deny""" + self.run_test_ip46_routed_to_bridged(True, False, False, + self.WITHOUT_EH) + + def test_0004_ip4_irb_1(self): + """ ACL IPv4 routed -> bridged, L3 ACL deny""" + self.run_test_ip46_routed_to_bridged(False, False, False, + self.WITHOUT_EH) + + def test_0005_ip6_irb_1(self): + """ ACL IPv6 bridged -> routed, L2 ACL deny """ + self.run_test_ip46_bridged_to_routed(True, True, False, + self.WITHOUT_EH) + + def test_0006_ip6_irb_1(self): + """ ACL IPv6 bridged -> routed, L3 ACL deny """ + self.run_test_ip46_bridged_to_routed(False, True, False, + self.WITHOUT_EH) + + def test_0007_ip6_irb_1(self): + """ ACL IPv4 bridged -> routed, L2 ACL deny """ + self.run_test_ip46_bridged_to_routed(True, False, False, + self.WITHOUT_EH) + + def test_0008_ip6_irb_1(self): + """ ACL IPv4 bridged -> routed, L3 ACL deny """ + self.run_test_ip46_bridged_to_routed(False, False, False, + self.WITHOUT_EH) + + # Stateful ACL tests + def test_0101_ip6_irb_1(self): + """ ACL IPv6 routed -> bridged, L2 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(True, True, + self.WITHOUT_EH) + + def test_0102_ip6_irb_1(self): + """ ACL IPv6 bridged -> routed, L2 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(True, True, + self.WITHOUT_EH) + + def test_0103_ip6_irb_1(self): + """ ACL IPv4 routed -> bridged, L2 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(True, False, + self.WITHOUT_EH) + + def test_0104_ip6_irb_1(self): + """ ACL IPv4 bridged -> routed, L2 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(True, False, + self.WITHOUT_EH) + + def test_0111_ip6_irb_1(self): + """ ACL IPv6 routed -> bridged, L3 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(False, True, + self.WITHOUT_EH) + + def test_0112_ip6_irb_1(self): + """ ACL IPv6 bridged -> routed, L3 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(False, True, + self.WITHOUT_EH) + + def test_0113_ip6_irb_1(self): + """ ACL IPv4 routed -> bridged, L3 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(False, False, + self.WITHOUT_EH) + + def test_0114_ip6_irb_1(self): + """ ACL IPv4 bridged -> routed, L3 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(False, False, + self.WITHOUT_EH) + + # A block of tests with extension headers + + def test_1001_ip6_irb_1(self): + """ ACL IPv6+EH routed -> bridged, L2 ACL deny""" + self.run_test_ip46_routed_to_bridged(True, True, False, + self.WITH_EH) + + def test_1002_ip6_irb_1(self): + """ ACL IPv6+EH routed -> bridged, L3 ACL deny""" + self.run_test_ip46_routed_to_bridged(False, True, False, + self.WITH_EH) + + def test_1005_ip6_irb_1(self): + """ ACL IPv6+EH bridged -> routed, L2 ACL deny """ + self.run_test_ip46_bridged_to_routed(True, True, False, + self.WITH_EH) + + def test_1006_ip6_irb_1(self): + """ ACL IPv6+EH bridged -> routed, L3 ACL deny """ + self.run_test_ip46_bridged_to_routed(False, True, False, + self.WITH_EH) + + def test_1101_ip6_irb_1(self): + """ ACL IPv6+EH routed -> bridged, L2 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(True, True, + self.WITH_EH) + + def test_1102_ip6_irb_1(self): + """ ACL IPv6+EH bridged -> routed, L2 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(True, True, + self.WITH_EH) + + def test_1111_ip6_irb_1(self): + """ ACL IPv6+EH routed -> bridged, L3 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(False, True, + self.WITH_EH) + + def test_1112_ip6_irb_1(self): + """ ACL IPv6+EH bridged -> routed, L3 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(False, True, + self.WITH_EH) + + # IPv4 with "MF" bit set + + def test_1201_ip6_irb_1(self): + """ ACL IPv4+MF routed -> bridged, L2 ACL deny""" + self.run_test_ip46_routed_to_bridged(True, False, False, + self.WITH_EH) + + def test_1202_ip6_irb_1(self): + """ ACL IPv4+MF routed -> bridged, L3 ACL deny""" + self.run_test_ip46_routed_to_bridged(False, False, False, + self.WITH_EH) + + def test_1205_ip6_irb_1(self): + """ ACL IPv4+MF bridged -> routed, L2 ACL deny """ + self.run_test_ip46_bridged_to_routed(True, False, False, + self.WITH_EH) + + def test_1206_ip6_irb_1(self): + """ ACL IPv4+MF bridged -> routed, L3 ACL deny """ + self.run_test_ip46_bridged_to_routed(False, False, False, + self.WITH_EH) + + def test_1301_ip6_irb_1(self): + """ ACL IPv4+MF routed -> bridged, L2 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(True, False, + self.WITH_EH) + + def test_1302_ip6_irb_1(self): + """ ACL IPv4+MF bridged -> routed, L2 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(True, False, + self.WITH_EH) + + def test_1311_ip6_irb_1(self): + """ ACL IPv4+MF routed -> bridged, L3 ACL permit+reflect""" + self.run_test_ip46_routed_to_bridged_and_back(False, False, + self.WITH_EH) + + def test_1312_ip6_irb_1(self): + """ ACL IPv4+MF bridged -> routed, L3 ACL permit+reflect""" + self.run_test_ip46_bridged_to_routed_and_back(False, False, + self.WITH_EH) + # Stateful ACL tests with stateful ICMP + + def test_1401_ip6_irb_1(self): + """ IPv6 routed -> bridged, L2 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_routed_to_bridged_and_back(True, True, + self.WITHOUT_EH, + self.STATEFUL_ICMP) + + def test_1402_ip6_irb_1(self): + """ IPv6 bridged -> routed, L2 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_bridged_to_routed_and_back(True, True, + self.WITHOUT_EH, + self.STATEFUL_ICMP) + + def test_1403_ip4_irb_1(self): + """ IPv4 routed -> bridged, L2 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_routed_to_bridged_and_back(True, False, + self.WITHOUT_EH, + self.STATEFUL_ICMP) + + def test_1404_ip4_irb_1(self): + """ IPv4 bridged -> routed, L2 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_bridged_to_routed_and_back(True, False, + self.WITHOUT_EH, + self.STATEFUL_ICMP) + + def test_1411_ip6_irb_1(self): + """ IPv6 routed -> bridged, L3 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_routed_to_bridged_and_back(False, True, + self.WITHOUT_EH, + self.STATEFUL_ICMP) + + def test_1412_ip6_irb_1(self): + """ IPv6 bridged -> routed, L3 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_bridged_to_routed_and_back(False, True, + self.WITHOUT_EH, + self.STATEFUL_ICMP) + + def test_1413_ip4_irb_1(self): + """ IPv4 routed -> bridged, L3 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_routed_to_bridged_and_back(False, False, + self.WITHOUT_EH, + self.STATEFUL_ICMP) + + def test_1414_ip4_irb_1(self): + """ IPv4 bridged -> routed, L3 ACL permit+reflect, ICMP reflect""" + self.run_test_ip46_bridged_to_routed_and_back(False, False, + self.WITHOUT_EH, + self.STATEFUL_ICMP) + + +if __name__ == '__main__': + unittest.main(testRunner=VppTestRunner) diff --git a/src/plugins/acl/test/test_acl_plugin_macip.py b/src/plugins/acl/test/test_acl_plugin_macip.py new file mode 100644 index 00000000000..41735251792 --- /dev/null +++ b/src/plugins/acl/test/test_acl_plugin_macip.py @@ -0,0 +1,1295 @@ +#!/usr/bin/env python +from __future__ import print_function +"""ACL plugin - MACIP tests +""" +import binascii +import ipaddress +import random +from socket import inet_ntop, inet_pton, AF_INET, AF_INET6 +from struct import pack, unpack +import re +import unittest + +import scapy.compat +from scapy.packet import Raw +from scapy.layers.l2 import Ether +from scapy.layers.inet import IP, UDP +from scapy.layers.inet6 import IPv6 + +from framework import VppTestCase, VppTestRunner, running_extended_tests +from vpp_lo_interface import VppLoInterface +from vpp_l2 import L2_PORT_TYPE +from vpp_sub_interface import L2_VTR_OP, VppSubInterface, VppDot1QSubint, \ + VppDot1ADSubint + + +class MethodHolder(VppTestCase): + DEBUG = False + + BRIDGED = True + ROUTED = False + + IS_IP4 = False + IS_IP6 = True + + DOT1AD = "dot1ad" + DOT1Q = "dot1q" + PERMIT_TAGS = True + DENY_TAGS = False + + # rule types + DENY = 0 + PERMIT = 1 + + # ACL types + EXACT_IP = 1 + SUBNET_IP = 2 + WILD_IP = 3 + + EXACT_MAC = 1 + WILD_MAC = 2 + OUI_MAC = 3 + + ACLS = [] + + @classmethod + def setUpClass(cls): + """ + Perform standard class setup (defined by class method setUpClass in + class VppTestCase) before running the test case, set test case related + variables and configure VPP. + """ + super(MethodHolder, cls).setUpClass() + + cls.pg_if_packet_sizes = [64, 512, 1518, 9018] # packet sizes + cls.bd_id = 111 + cls.remote_hosts_count = 200 + + try: + # create 4 pg interfaces, 1 loopback interface + cls.create_pg_interfaces(range(4)) + cls.create_loopback_interfaces(1) + + # create 2 subinterfaces + cls.subifs = [ + VppDot1QSubint(cls, cls.pg1, 10), + VppDot1ADSubint(cls, cls.pg2, 20, 300, 400), + VppDot1QSubint(cls, cls.pg3, 30), + VppDot1ADSubint(cls, cls.pg3, 40, 600, 700)] + + cls.subifs[0].set_vtr(L2_VTR_OP.L2_POP_1, + inner=10, push1q=1) + cls.subifs[1].set_vtr(L2_VTR_OP.L2_POP_2, + outer=300, inner=400, push1q=1) + cls.subifs[2].set_vtr(L2_VTR_OP.L2_POP_1, + inner=30, push1q=1) + cls.subifs[3].set_vtr(L2_VTR_OP.L2_POP_2, + outer=600, inner=700, push1q=1) + + cls.interfaces = list(cls.pg_interfaces) + cls.interfaces.extend(cls.lo_interfaces) + cls.interfaces.extend(cls.subifs) + + for i in cls.interfaces: + i.admin_up() + + # Create BD with MAC learning enabled and put interfaces to this BD + cls.vapi.sw_interface_set_l2_bridge( + rx_sw_if_index=cls.loop0.sw_if_index, bd_id=cls.bd_id, + port_type=L2_PORT_TYPE.BVI) + cls.vapi.sw_interface_set_l2_bridge( + rx_sw_if_index=cls.pg0.sw_if_index, bd_id=cls.bd_id) + cls.vapi.sw_interface_set_l2_bridge( + rx_sw_if_index=cls.pg1.sw_if_index, bd_id=cls.bd_id) + cls.vapi.sw_interface_set_l2_bridge( + rx_sw_if_index=cls.subifs[0].sw_if_index, bd_id=cls.bd_id) + cls.vapi.sw_interface_set_l2_bridge( + rx_sw_if_index=cls.subifs[1].sw_if_index, bd_id=cls.bd_id) + + # Configure IPv4/6 addresses on loop interface and routed interface + cls.loop0.config_ip4() + cls.loop0.config_ip6() + cls.pg2.config_ip4() + cls.pg2.config_ip6() + cls.pg3.config_ip4() + cls.pg3.config_ip6() + + # Configure MAC address binding to IPv4 neighbors on loop0 + cls.loop0.generate_remote_hosts(cls.remote_hosts_count) + # Modify host mac addresses to have different OUI parts + for i in range(2, cls.remote_hosts_count + 2): + mac = cls.loop0.remote_hosts[i-2]._mac.split(':') + mac[2] = format(int(mac[2], 16) + i, "02x") + cls.loop0.remote_hosts[i - 2]._mac = ":".join(mac) + + cls.loop0.configure_ipv4_neighbors() + cls.loop0.configure_ipv6_neighbors() + + # configure MAC address on pg3 + cls.pg3.resolve_arp() + cls.pg3.resolve_ndp() + + # configure MAC address on subifs + for i in cls.subifs: + i.config_ip4() + i.resolve_arp() + i.config_ip6() + + # configure MAC address on pg2 + cls.pg2.resolve_arp() + cls.pg2.resolve_ndp() + + # Loopback BVI interface has remote hosts + # one half of hosts are behind pg0 second behind pg1,pg2,pg3 subifs + cls.pg0.remote_hosts = cls.loop0.remote_hosts[:100] + cls.subifs[0].remote_hosts = cls.loop0.remote_hosts[100:125] + cls.subifs[1].remote_hosts = cls.loop0.remote_hosts[125:150] + cls.subifs[2].remote_hosts = cls.loop0.remote_hosts[150:175] + cls.subifs[3].remote_hosts = cls.loop0.remote_hosts[175:] + + except Exception: + super(MethodHolder, cls).tearDownClass() + raise + + @classmethod + def tearDownClass(cls): + super(MethodHolder, cls).tearDownClass() + + def setUp(self): + super(MethodHolder, self).setUp() + self.reset_packet_infos() + del self.ACLS[:] + + def tearDown(self): + super(MethodHolder, self).tearDown() + self.delete_acls() + + def show_commands_at_teardown(self): + self.logger.info(self.vapi.ppcli("show interface address")) + self.logger.info(self.vapi.ppcli("show hardware")) + self.logger.info(self.vapi.ppcli("sh acl-plugin macip acl")) + self.logger.info(self.vapi.ppcli("sh acl-plugin macip interface")) + self.logger.info(self.vapi.ppcli("sh classify tables verbose")) + self.logger.info(self.vapi.ppcli("sh acl-plugin acl")) + self.logger.info(self.vapi.ppcli("sh acl-plugin interface")) + self.logger.info(self.vapi.ppcli("sh acl-plugin tables")) + # print(self.vapi.ppcli("show interface address")) + # print(self.vapi.ppcli("show hardware")) + # print(self.vapi.ppcli("sh acl-plugin macip interface")) + # print(self.vapi.ppcli("sh acl-plugin macip acl")) + + def macip_acl_dump_debug(self): + acls = self.vapi.macip_acl_dump() + if self.DEBUG: + for acl in acls: + print("ACL #"+str(acl.acl_index)) + for r in acl.r: + rule = "ACTION" + if r.is_permit == 1: + rule = "PERMIT" + elif r.is_permit == 0: + rule = "DENY " + print(" IP6" if r.is_ipv6 else " IP4", + rule, + binascii.hexlify(r.src_mac), + binascii.hexlify(r.src_mac_mask), + unpack('<16B', r.src_ip_addr), + r.src_ip_prefix_len) + return acls + + def create_rules(self, mac_type=EXACT_MAC, ip_type=EXACT_IP, + acl_count=1, rules_count=None): + acls = [] + if rules_count is None: + rules_count = [1] + src_mac = int("220000dead00", 16) + for acl in range(2, (acl_count+1) * 2): + rules = [] + host = random.choice(self.loop0.remote_hosts) + is_ip6 = acl % 2 + ip4 = host.ip4.split('.') + ip6 = list(unpack('<16B', inet_pton(AF_INET6, host.ip6))) + + if ip_type == self.EXACT_IP: + prefix_len4 = 32 + prefix_len6 = 128 + elif ip_type == self.WILD_IP: + ip4 = [0, 0, 0, 0] + ip6 = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + prefix_len4 = 0 + prefix_len6 = 0 + rules_count[(acl / 2) - 1] = 1 + else: + prefix_len4 = 24 + prefix_len6 = 64 + + if mac_type == self.EXACT_MAC: + mask = "ff:ff:ff:ff:ff:ff" + elif mac_type == self.WILD_MAC: + mask = "00:00:00:00:00:00" + elif mac_type == self.OUI_MAC: + mask = "ff:ff:ff:00:00:00" + else: + mask = "ff:ff:ff:ff:ff:00" + + ip = ip6 if is_ip6 else ip4 + ip_len = prefix_len6 if is_ip6 else prefix_len4 + + for i in range(0, rules_count[(acl / 2) - 1]): + src_mac += 16777217 + if mac_type == self.WILD_MAC: + mac = "00:00:00:00:00:00" + elif mac_type == self.OUI_MAC: + mac = ':'.join(re.findall('..', '{:02x}'.format( + src_mac))[:3])+":00:00:00" + else: + mac = ':'.join(re.findall( + '..', '{:02x}'.format(src_mac))) + + if ip_type == self.EXACT_IP: + ip4[3] = random.randint(100, 200) + ip6[15] = random.randint(100, 200) + elif ip_type == self.SUBNET_IP: + ip4[2] = random.randint(100, 200) + ip4[3] = 0 + ip6[8] = random.randint(100, 200) + ip6[15] = 0 + ip_pack = b'' + for j in range(0, len(ip)): + ip_pack += pack(' 0: + continue + + if is_permit: + macip_rule = ({ + 'is_permit': is_permit, + 'is_ipv6': is_ip6, + 'src_ip_addr': ip_rule, + 'src_ip_prefix_len': prefix_len, + 'src_mac': binascii.unhexlify(mac_rule.replace(':', '')), + 'src_mac_mask': binascii.unhexlify( + mac_mask.replace(':', ''))}) + macip_rules.append(macip_rule) + + # deny all other packets + if not (mac_type == self.WILD_MAC and ip_type == self.WILD_IP): + macip_rule = ({'is_permit': 0, + 'is_ipv6': is_ip6, + 'src_ip_addr': "", + 'src_ip_prefix_len': 0, + 'src_mac': "", + 'src_mac_mask': ""}) + macip_rules.append(macip_rule) + + acl_rule = {'is_permit': 0, + 'is_ipv6': is_ip6} + acl_rules.append(acl_rule) + return {'stream': packets, + 'macip_rules': macip_rules, + 'acl_rules': acl_rules} + + def verify_capture(self, stream, capture, is_ip6): + """ + :param stream: + :param capture: + :param is_ip6: + :return: + """ + # p_l3 = IPv6 if is_ip6 else IP + # if self.DEBUG: + # for p in stream: + # print(p[Ether].src, p[Ether].dst, p[p_l3].src, p[p_l3].dst) + # + # acls = self.macip_acl_dump_debug() + + # TODO : verify + # for acl in acls: + # for r in acl.r: + # print(binascii.hexlify(r.src_mac), \ + # binascii.hexlify(r.src_mac_mask),\ + # unpack('<16B', r.src_ip_addr), \ + # r.src_ip_prefix_len) + # + # for p in capture: + # print(p[Ether].src, p[Ether].dst, p[p_l3].src, p[p_l3].dst + # data = p[Raw].load.split(':',1)[1]) + # print(p[p_l3].src, data) + + def run_traffic(self, mac_type, ip_type, traffic, is_ip6, packets, + do_not_expected_capture=False, tags=None, + apply_rules=True, isMACIP=True, permit_tags=PERMIT_TAGS, + try_replace=False): + self.reset_packet_infos() + + if tags is None: + tx_if = self.pg0 if traffic == self.BRIDGED else self.pg3 + rx_if = self.pg3 if traffic == self.BRIDGED else self.pg0 + src_if = self.pg3 + dst_if = self.loop0 + else: + if tags == self.DOT1Q: + if traffic == self.BRIDGED: + tx_if = self.subifs[0] + rx_if = self.pg0 + src_if = self.subifs[0] + dst_if = self.loop0 + else: + tx_if = self.subifs[2] + rx_if = self.pg0 + src_if = self.subifs[2] + dst_if = self.loop0 + elif tags == self.DOT1AD: + if traffic == self.BRIDGED: + tx_if = self.subifs[1] + rx_if = self.pg0 + src_if = self.subifs[1] + dst_if = self.loop0 + else: + tx_if = self.subifs[3] + rx_if = self.pg0 + src_if = self.subifs[3] + dst_if = self.loop0 + else: + return + + test_dict = self.create_stream(mac_type, ip_type, packets, + src_if, dst_if, + traffic, is_ip6, + tags=permit_tags) + + if apply_rules: + if isMACIP: + reply = self.vapi.macip_acl_add(test_dict['macip_rules']) + else: + reply = self.vapi.acl_add_replace(acl_index=4294967295, + r=test_dict['acl_rules']) + self.assertEqual(reply.retval, 0) + acl_index = reply.acl_index + + if isMACIP: + self.vapi.macip_acl_interface_add_del( + sw_if_index=tx_if.sw_if_index, + acl_index=acl_index) + reply = self.vapi.macip_acl_interface_get() + self.assertEqual(reply.acls[tx_if.sw_if_index], acl_index) + self.ACLS.append(reply.acls[tx_if.sw_if_index]) + else: + self.vapi.acl_interface_add_del( + sw_if_index=tx_if.sw_if_index, acl_index=acl_index) + else: + self.vapi.macip_acl_interface_add_del( + sw_if_index=tx_if.sw_if_index, + acl_index=0) + if try_replace: + if isMACIP: + reply = self.vapi.macip_acl_add_replace( + test_dict['macip_rules'], + acl_index) + else: + reply = self.vapi.acl_add_replace(acl_index=acl_index, + r=test_dict['acl_rules']) + self.assertEqual(reply.retval, 0) + + if not isinstance(src_if, VppSubInterface): + tx_if.add_stream(test_dict['stream']) + else: + tx_if.parent.add_stream(test_dict['stream']) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + if do_not_expected_capture: + rx_if.get_capture(0) + else: + if traffic == self.BRIDGED and mac_type == self.WILD_MAC and \ + ip_type == self.WILD_IP: + capture = rx_if.get_capture(packets) + else: + capture = rx_if.get_capture( + self.get_packet_count_for_if_idx(dst_if.sw_if_index)) + self.verify_capture(test_dict['stream'], capture, is_ip6) + if not isMACIP: + self.vapi.acl_interface_add_del(sw_if_index=tx_if.sw_if_index, + acl_index=acl_index, is_add=0) + self.vapi.acl_del(acl_index) + + def run_test_acls(self, mac_type, ip_type, acl_count, + rules_count, traffic=None, ip=None): + self.apply_macip_rules(self.create_rules(mac_type, ip_type, acl_count, + rules_count)) + self.verify_macip_acls(acl_count, rules_count) + + if traffic is not None: + self.run_traffic(self.EXACT_MAC, self.EXACT_IP, traffic, ip, 9) + + +class TestMACIP_IP4(MethodHolder): + """MACIP with IP4 traffic""" + + @classmethod + def setUpClass(cls): + super(TestMACIP_IP4, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestMACIP_IP4, cls).tearDownClass() + + def test_acl_bridged_ip4_exactMAC_exactIP(self): + """ IP4 MACIP exactMAC|exactIP ACL bridged traffic + """ + self.run_traffic(self.EXACT_MAC, self.EXACT_IP, + self.BRIDGED, self.IS_IP4, 9) + + def test_acl_bridged_ip4_exactMAC_subnetIP(self): + """ IP4 MACIP exactMAC|subnetIP ACL bridged traffic + """ + + self.run_traffic(self.EXACT_MAC, self.SUBNET_IP, + self.BRIDGED, self.IS_IP4, 9) + + def test_acl_bridged_ip4_exactMAC_wildIP(self): + """ IP4 MACIP exactMAC|wildIP ACL bridged traffic + """ + + self.run_traffic(self.EXACT_MAC, self.WILD_IP, + self.BRIDGED, self.IS_IP4, 9) + + def test_acl_bridged_ip4_ouiMAC_exactIP(self): + """ IP4 MACIP ouiMAC|exactIP ACL bridged traffic + """ + + self.run_traffic(self.OUI_MAC, self.EXACT_IP, + self.BRIDGED, self.IS_IP4, 3) + + def test_acl_bridged_ip4_ouiMAC_subnetIP(self): + """ IP4 MACIP ouiMAC|subnetIP ACL bridged traffic + """ + + self.run_traffic(self.OUI_MAC, self.SUBNET_IP, + self.BRIDGED, self.IS_IP4, 9) + + def test_acl_bridged_ip4_ouiMAC_wildIP(self): + """ IP4 MACIP ouiMAC|wildIP ACL bridged traffic + """ + + self.run_traffic(self.OUI_MAC, self.WILD_IP, + self.BRIDGED, self.IS_IP4, 9) + + def test_ac_bridgedl_ip4_wildMAC_exactIP(self): + """ IP4 MACIP wildcardMAC|exactIP ACL bridged traffic + """ + + self.run_traffic(self.WILD_MAC, self.EXACT_IP, + self.BRIDGED, self.IS_IP4, 9) + + def test_acl_bridged_ip4_wildMAC_subnetIP(self): + """ IP4 MACIP wildcardMAC|subnetIP ACL bridged traffic + """ + + self.run_traffic(self.WILD_MAC, self.SUBNET_IP, + self.BRIDGED, self.IS_IP4, 9) + + def test_acl_bridged_ip4_wildMAC_wildIP(self): + """ IP4 MACIP wildcardMAC|wildIP ACL bridged traffic + """ + + self.run_traffic(self.WILD_MAC, self.WILD_IP, + self.BRIDGED, self.IS_IP4, 9) + + def test_acl_routed_ip4_exactMAC_exactIP(self): + """ IP4 MACIP exactMAC|exactIP ACL routed traffic + """ + self.run_traffic(self.EXACT_MAC, self.EXACT_IP, + self.ROUTED, self.IS_IP4, 9) + + def test_acl_routed_ip4_exactMAC_subnetIP(self): + """ IP4 MACIP exactMAC|subnetIP ACL routed traffic + """ + self.run_traffic(self.EXACT_MAC, self.SUBNET_IP, + self.ROUTED, self.IS_IP4, 9) + + def test_acl_routed_ip4_exactMAC_wildIP(self): + """ IP4 MACIP exactMAC|wildIP ACL routed traffic + """ + self.run_traffic(self.EXACT_MAC, self.WILD_IP, + self.ROUTED, self.IS_IP4, 9) + + def test_acl_routed_ip4_ouiMAC_exactIP(self): + """ IP4 MACIP ouiMAC|exactIP ACL routed traffic + """ + + self.run_traffic(self.OUI_MAC, self.EXACT_IP, + self.ROUTED, self.IS_IP4, 9) + + def test_acl_routed_ip4_ouiMAC_subnetIP(self): + """ IP4 MACIP ouiMAC|subnetIP ACL routed traffic + """ + + self.run_traffic(self.OUI_MAC, self.SUBNET_IP, + self.ROUTED, self.IS_IP4, 9) + + def test_acl_routed_ip4_ouiMAC_wildIP(self): + """ IP4 MACIP ouiMAC|wildIP ACL routed traffic + """ + + self.run_traffic(self.OUI_MAC, self.WILD_IP, + self.ROUTED, self.IS_IP4, 9) + + def test_acl_routed_ip4_wildMAC_exactIP(self): + """ IP4 MACIP wildcardMAC|exactIP ACL routed traffic + """ + + self.run_traffic(self.WILD_MAC, self.EXACT_IP, + self.ROUTED, self.IS_IP4, 9) + + def test_acl_routed_ip4_wildMAC_subnetIP(self): + """ IP4 MACIP wildcardMAC|subnetIP ACL routed traffic + """ + + self.run_traffic(self.WILD_MAC, self.SUBNET_IP, + self.ROUTED, self.IS_IP4, 9) + + def test_acl_routed_ip4_wildMAC_wildIP(self): + """ IP4 MACIP wildcardMAC|wildIP ACL + """ + + self.run_traffic(self.WILD_MAC, self.WILD_IP, + self.ROUTED, self.IS_IP4, 9) + + def test_acl_replace_traffic_ip4(self): + """ MACIP replace ACL with IP4 traffic + """ + self.run_traffic(self.OUI_MAC, self.SUBNET_IP, + self.BRIDGED, self.IS_IP4, 9, try_replace=True) + self.run_traffic(self.EXACT_MAC, self.EXACT_IP, + self.BRIDGED, self.IS_IP4, 9, try_replace=True) + + +class TestMACIP_IP6(MethodHolder): + """MACIP with IP6 traffic""" + + @classmethod + def setUpClass(cls): + super(TestMACIP_IP6, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestMACIP_IP6, cls).tearDownClass() + + def test_acl_bridged_ip6_exactMAC_exactIP(self): + """ IP6 MACIP exactMAC|exactIP ACL bridged traffic + """ + + self.run_traffic(self.EXACT_MAC, self.EXACT_IP, + self.BRIDGED, self.IS_IP6, 9) + + def test_acl_bridged_ip6_exactMAC_subnetIP(self): + """ IP6 MACIP exactMAC|subnetIP ACL bridged traffic + """ + + self.run_traffic(self.EXACT_MAC, self.SUBNET_IP, + self.BRIDGED, self.IS_IP6, 9) + + def test_acl_bridged_ip6_exactMAC_wildIP(self): + """ IP6 MACIP exactMAC|wildIP ACL bridged traffic + """ + + self.run_traffic(self.EXACT_MAC, self.WILD_IP, + self.BRIDGED, self.IS_IP6, 9) + + def test_acl_bridged_ip6_ouiMAC_exactIP(self): + """ IP6 MACIP oui_MAC|exactIP ACL bridged traffic + """ + + self.run_traffic(self.OUI_MAC, self.EXACT_IP, + self.BRIDGED, self.IS_IP6, 9) + + def test_acl_bridged_ip6_ouiMAC_subnetIP(self): + """ IP6 MACIP ouiMAC|subnetIP ACL bridged traffic + """ + + self.run_traffic(self.OUI_MAC, self.SUBNET_IP, + self.BRIDGED, self.IS_IP6, 9) + + def test_acl_bridged_ip6_ouiMAC_wildIP(self): + """ IP6 MACIP ouiMAC|wildIP ACL bridged traffic + """ + + self.run_traffic(self.OUI_MAC, self.WILD_IP, + self.BRIDGED, self.IS_IP6, 9) + + def test_acl_bridged_ip6_wildMAC_exactIP(self): + """ IP6 MACIP wildcardMAC|exactIP ACL bridged traffic + """ + + self.run_traffic(self.WILD_MAC, self.EXACT_IP, + self.BRIDGED, self.IS_IP6, 9) + + def test_acl_bridged_ip6_wildMAC_subnetIP(self): + """ IP6 MACIP wildcardMAC|subnetIP ACL bridged traffic + """ + + self.run_traffic(self.WILD_MAC, self.SUBNET_IP, + self.BRIDGED, self.IS_IP6, 9) + + def test_acl_bridged_ip6_wildMAC_wildIP(self): + """ IP6 MACIP wildcardMAC|wildIP ACL bridged traffic + """ + + self.run_traffic(self.WILD_MAC, self.WILD_IP, + self.BRIDGED, self.IS_IP6, 9) + + def test_acl_routed_ip6_exactMAC_exactIP(self): + """ IP6 MACIP exactMAC|exactIP ACL routed traffic + """ + + self.run_traffic(self.EXACT_MAC, self.EXACT_IP, + self.ROUTED, self.IS_IP6, 9) + + def test_acl_routed_ip6_exactMAC_subnetIP(self): + """ IP6 MACIP exactMAC|subnetIP ACL routed traffic + """ + + self.run_traffic(self.EXACT_MAC, self.SUBNET_IP, + self.ROUTED, self.IS_IP6, 9) + + def test_acl_routed_ip6_exactMAC_wildIP(self): + """ IP6 MACIP exactMAC|wildIP ACL routed traffic + """ + + self.run_traffic(self.EXACT_MAC, self.WILD_IP, + self.ROUTED, self.IS_IP6, 9) + + def test_acl_routed_ip6_ouiMAC_exactIP(self): + """ IP6 MACIP ouiMAC|exactIP ACL routed traffic + """ + + self.run_traffic(self.OUI_MAC, self.EXACT_IP, + self.ROUTED, self.IS_IP6, 9) + + def test_acl_routed_ip6_ouiMAC_subnetIP(self): + """ IP6 MACIP ouiMAC|subnetIP ACL routed traffic + """ + + self.run_traffic(self.OUI_MAC, self.SUBNET_IP, + self.ROUTED, self.IS_IP6, 9) + + def test_acl_routed_ip6_ouiMAC_wildIP(self): + """ IP6 MACIP ouiMAC|wildIP ACL routed traffic + """ + + self.run_traffic(self.OUI_MAC, self.WILD_IP, + self.ROUTED, self.IS_IP6, 9) + + def test_acl_routed_ip6_wildMAC_exactIP(self): + """ IP6 MACIP wildcardMAC|exactIP ACL routed traffic + """ + + self.run_traffic(self.WILD_MAC, self.EXACT_IP, + self.ROUTED, self.IS_IP6, 9) + + def test_acl_routed_ip6_wildMAC_subnetIP(self): + """ IP6 MACIP wildcardMAC|subnetIP ACL routed traffic + """ + + self.run_traffic(self.WILD_MAC, self.SUBNET_IP, + self.ROUTED, self.IS_IP6, 9) + + def test_acl_routed_ip6_wildMAC_wildIP(self): + """ IP6 MACIP wildcardMAC|wildIP ACL + """ + + self.run_traffic(self.WILD_MAC, self.WILD_IP, + self.ROUTED, self.IS_IP6, 9) + + def test_acl_replace_traffic_ip6(self): + """ MACIP replace ACL with IP6 traffic + """ + self.run_traffic(self.OUI_MAC, self.SUBNET_IP, + self.BRIDGED, self.IS_IP6, 9, try_replace=True) + self.run_traffic(self.EXACT_MAC, self.EXACT_IP, + self.BRIDGED, self.IS_IP6, 9, try_replace=True) + + +class TestMACIP(MethodHolder): + """MACIP Tests""" + + @classmethod + def setUpClass(cls): + super(TestMACIP, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestMACIP, cls).tearDownClass() + + def test_acl_1_2(self): + """ MACIP ACL with 2 entries + """ + + self.run_test_acls(self.EXACT_MAC, self.WILD_IP, 1, [2]) + + def test_acl_1_5(self): + """ MACIP ACL with 5 entries + """ + + self.run_test_acls(self.EXACT_MAC, self.SUBNET_IP, 1, [5]) + + def test_acl_1_10(self): + """ MACIP ACL with 10 entries + """ + + self.run_test_acls(self.EXACT_MAC, self.EXACT_IP, 1, [10]) + + def test_acl_1_20(self): + """ MACIP ACL with 20 entries + """ + + self.run_test_acls(self.OUI_MAC, self.WILD_IP, 1, [20]) + + def test_acl_1_50(self): + """ MACIP ACL with 50 entries + """ + + self.run_test_acls(self.OUI_MAC, self.SUBNET_IP, 1, [50]) + + def test_acl_1_100(self): + """ MACIP ACL with 100 entries + """ + + self.run_test_acls(self.OUI_MAC, self.EXACT_IP, 1, [100]) + + def test_acl_2_X(self): + """ MACIP 2 ACLs each with 100+ entries + """ + + self.run_test_acls(self.OUI_MAC, self.SUBNET_IP, 2, [100, 200]) + + def test_acl_10_X(self): + """ MACIP 10 ACLs each with 100+ entries + """ + + self.run_test_acls(self.EXACT_MAC, self.EXACT_IP, 10, + [100, 120, 140, 160, 180, 200, 210, 220, 230, 240]) + + def test_acl_10_X_traffic_ip4(self): + """ MACIP 10 ACLs each with 100+ entries with IP4 traffic + """ + + self.run_test_acls(self.EXACT_MAC, self.EXACT_IP, 10, + [100, 120, 140, 160, 180, 200, 210, 220, 230, 240], + self.BRIDGED, self.IS_IP4) + + def test_acl_10_X_traffic_ip6(self): + """ MACIP 10 ACLs each with 100+ entries with IP6 traffic + """ + + self.run_test_acls(self.EXACT_MAC, self.EXACT_IP, 10, + [100, 120, 140, 160, 180, 200, 210, 220, 230, 240], + self.BRIDGED, self.IS_IP6) + + def test_acl_replace(self): + """ MACIP replace ACL + """ + + r1 = self.create_rules(acl_count=3, rules_count=[2, 2, 2]) + r2 = self.create_rules(mac_type=self.OUI_MAC, ip_type=self.SUBNET_IP) + self.apply_macip_rules(r1) + + acls_before = self.macip_acl_dump_debug() + + # replace acls #2, #3 with new + reply = self.vapi.macip_acl_add_replace(r2[0], 2) + self.assertEqual(reply.retval, 0) + self.assertEqual(reply.acl_index, 2) + reply = self.vapi.macip_acl_add_replace(r2[1], 3) + self.assertEqual(reply.retval, 0) + self.assertEqual(reply.acl_index, 3) + + acls_after = self.macip_acl_dump_debug() + + # verify changes + self.assertEqual(len(acls_before), len(acls_after)) + for acl1, acl2 in zip( + acls_before[:2]+acls_before[4:], + acls_after[:2]+acls_after[4:]): + self.assertEqual(len(acl1), len(acl2)) + + self.assertEqual(len(acl1.r), len(acl2.r)) + for r1, r2 in zip(acl1.r, acl2.r): + self.assertEqual(len(acl1.r), len(acl2.r)) + self.assertEqual(acl1.r, acl2.r) + for acl1, acl2 in zip( + acls_before[2:4], + acls_after[2:4]): + self.assertEqual(len(acl1), len(acl2)) + + self.assertNotEqual(len(acl1.r), len(acl2.r)) + for r1, r2 in zip(acl1.r, acl2.r): + self.assertNotEqual(len(acl1.r), len(acl2.r)) + self.assertNotEqual(acl1.r, acl2.r) + + def test_delete_intf(self): + """ MACIP ACL delete intf with acl + """ + + intf_count = len(self.interfaces)+1 + intf = [] + self.apply_macip_rules(self.create_rules(acl_count=3, + rules_count=[3, 5, 4])) + + intf.append(VppLoInterface(self)) + intf.append(VppLoInterface(self)) + + sw_if_index0 = intf[0].sw_if_index + self.vapi.macip_acl_interface_add_del(sw_if_index0, 1) + + reply = self.vapi.macip_acl_interface_get() + self.assertEqual(reply.count, intf_count+1) + self.assertEqual(reply.acls[sw_if_index0], 1) + + sw_if_index1 = intf[1].sw_if_index + self.vapi.macip_acl_interface_add_del(sw_if_index1, 0) + + reply = self.vapi.macip_acl_interface_get() + self.assertEqual(reply.count, intf_count+2) + self.assertEqual(reply.acls[sw_if_index1], 0) + + intf[0].remove_vpp_config() + reply = self.vapi.macip_acl_interface_get() + self.assertEqual(reply.count, intf_count+2) + self.assertEqual(reply.acls[sw_if_index0], 4294967295) + self.assertEqual(reply.acls[sw_if_index1], 0) + + intf.append(VppLoInterface(self)) + intf.append(VppLoInterface(self)) + sw_if_index2 = intf[2].sw_if_index + sw_if_index3 = intf[3].sw_if_index + self.vapi.macip_acl_interface_add_del(sw_if_index2, 1) + self.vapi.macip_acl_interface_add_del(sw_if_index3, 1) + + reply = self.vapi.macip_acl_interface_get() + self.assertEqual(reply.count, intf_count+3) + self.assertEqual(reply.acls[sw_if_index1], 0) + self.assertEqual(reply.acls[sw_if_index2], 1) + self.assertEqual(reply.acls[sw_if_index3], 1) + self.logger.info("MACIP ACL on multiple interfaces:") + self.logger.info(self.vapi.ppcli("sh acl-plugin macip acl")) + self.logger.info(self.vapi.ppcli("sh acl-plugin macip acl index 1234")) + self.logger.info(self.vapi.ppcli("sh acl-plugin macip acl index 1")) + self.logger.info(self.vapi.ppcli("sh acl-plugin macip acl index 0")) + self.logger.info(self.vapi.ppcli("sh acl-plugin macip interface")) + + intf[2].remove_vpp_config() + intf[1].remove_vpp_config() + + reply = self.vapi.macip_acl_interface_get() + self.assertEqual(reply.count, intf_count+3) + self.assertEqual(reply.acls[sw_if_index0], 4294967295) + self.assertEqual(reply.acls[sw_if_index1], 4294967295) + self.assertEqual(reply.acls[sw_if_index2], 4294967295) + self.assertEqual(reply.acls[sw_if_index3], 1) + + intf[3].remove_vpp_config() + reply = self.vapi.macip_acl_interface_get() + + self.assertEqual(len([x for x in reply.acls if x != 4294967295]), 0) + + +class TestACL_dot1q_bridged(MethodHolder): + """ACL on dot1q bridged subinterfaces Tests""" + + @classmethod + def setUpClass(cls): + super(TestACL_dot1q_bridged, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestACL_dot1q_bridged, cls).tearDownClass() + + def test_acl_bridged_ip4_subif_dot1q(self): + """ IP4 ACL SubIf Dot1Q bridged traffic""" + self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.BRIDGED, + self.IS_IP4, 9, tags=self.DOT1Q, isMACIP=False) + + def test_acl_bridged_ip6_subif_dot1q(self): + """ IP6 ACL SubIf Dot1Q bridged traffic""" + self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.BRIDGED, + self.IS_IP6, 9, tags=self.DOT1Q, isMACIP=False) + + +class TestACL_dot1ad_bridged(MethodHolder): + """ACL on dot1ad bridged subinterfaces Tests""" + + @classmethod + def setUpClass(cls): + super(TestACL_dot1ad_bridged, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestACL_dot1ad_bridged, cls).tearDownClass() + + def test_acl_bridged_ip4_subif_dot1ad(self): + """ IP4 ACL SubIf Dot1AD bridged traffic""" + self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.BRIDGED, + self.IS_IP4, 9, tags=self.DOT1AD, isMACIP=False) + + def test_acl_bridged_ip6_subif_dot1ad(self): + """ IP6 ACL SubIf Dot1AD bridged traffic""" + self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.BRIDGED, + self.IS_IP6, 9, tags=self.DOT1AD, isMACIP=False) + + +class TestACL_dot1q_routed(MethodHolder): + """ACL on dot1q routed subinterfaces Tests""" + + @classmethod + def setUpClass(cls): + super(TestACL_dot1q_routed, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestACL_dot1q_routed, cls).tearDownClass() + + def test_acl_routed_ip4_subif_dot1q(self): + """ IP4 ACL SubIf Dot1Q routed traffic""" + self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.ROUTED, + self.IS_IP4, 9, tags=self.DOT1Q, isMACIP=False) + + def test_acl_routed_ip6_subif_dot1q(self): + """ IP6 ACL SubIf Dot1Q routed traffic""" + self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.ROUTED, + self.IS_IP6, 9, tags=self.DOT1Q, isMACIP=False) + + def test_acl_routed_ip4_subif_dot1q_deny_by_tags(self): + """ IP4 ACL SubIf wrong tags Dot1Q routed traffic""" + self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.ROUTED, + self.IS_IP4, 9, True, tags=self.DOT1Q, isMACIP=False, + permit_tags=self.DENY_TAGS) + + def test_acl_routed_ip6_subif_dot1q_deny_by_tags(self): + """ IP6 ACL SubIf wrong tags Dot1Q routed traffic""" + self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.ROUTED, + self.IS_IP6, 9, True, tags=self.DOT1Q, isMACIP=False, + permit_tags=self.DENY_TAGS) + + +class TestACL_dot1ad_routed(MethodHolder): + """ACL on dot1ad routed subinterfaces Tests""" + + @classmethod + def setUpClass(cls): + super(TestACL_dot1ad_routed, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestACL_dot1ad_routed, cls).tearDownClass() + + def test_acl_routed_ip6_subif_dot1ad(self): + """ IP6 ACL SubIf Dot1AD routed traffic""" + self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.ROUTED, + self.IS_IP6, 9, tags=self.DOT1AD, isMACIP=False) + + def test_acl_routed_ip4_subif_dot1ad(self): + """ IP4 ACL SubIf Dot1AD routed traffic""" + self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.ROUTED, + self.IS_IP4, 9, tags=self.DOT1AD, isMACIP=False) + + def test_acl_routed_ip6_subif_dot1ad_deny_by_tags(self): + """ IP6 ACL SubIf wrong tags Dot1AD routed traffic""" + self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.ROUTED, + self.IS_IP6, 9, True, tags=self.DOT1AD, isMACIP=False, + permit_tags=self.DENY_TAGS) + + def test_acl_routed_ip4_subif_dot1ad_deny_by_tags(self): + """ IP4 ACL SubIf wrong tags Dot1AD routed traffic""" + self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.ROUTED, + self.IS_IP4, 9, True, tags=self.DOT1AD, isMACIP=False, + permit_tags=self.DENY_TAGS) + + +if __name__ == '__main__': + unittest.main(testRunner=VppTestRunner) diff --git a/src/plugins/acl/test/test_classify_l2_acl.py b/src/plugins/acl/test/test_classify_l2_acl.py new file mode 100644 index 00000000000..8ba7181aef1 --- /dev/null +++ b/src/plugins/acl/test/test_classify_l2_acl.py @@ -0,0 +1,689 @@ +#!/usr/bin/env python +""" Classifier-based L2 ACL Test Case HLD: +""" + +import unittest +import random +import binascii +import socket + + +from scapy.packet import Raw +from scapy.data import ETH_P_IP +from scapy.layers.l2 import Ether +from scapy.layers.inet import IP, TCP, UDP, ICMP +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest +from scapy.layers.inet6 import IPv6ExtHdrFragment +from framework import VppTestCase, VppTestRunner +from util import Host, ppp + + +class TestClassifyAcl(VppTestCase): + """ Classifier-based L2 input and output ACL Test Case """ + + # traffic types + IP = 0 + ICMP = 1 + + # IP version + IPRANDOM = -1 + IPV4 = 0 + IPV6 = 1 + + # rule types + DENY = 0 + PERMIT = 1 + + # supported protocols + proto = [[6, 17], [1, 58]] + proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'} + ICMPv4 = 0 + ICMPv6 = 1 + TCP = 0 + UDP = 1 + PROTO_ALL = 0 + + # port ranges + PORTS_ALL = -1 + PORTS_RANGE = 0 + PORTS_RANGE_2 = 1 + udp_sport_from = 10 + udp_sport_to = udp_sport_from + 5 + udp_dport_from = 20000 + udp_dport_to = udp_dport_from + 5000 + tcp_sport_from = 30 + tcp_sport_to = tcp_sport_from + 5 + tcp_dport_from = 40000 + tcp_dport_to = tcp_dport_from + 5000 + + udp_sport_from_2 = 90 + udp_sport_to_2 = udp_sport_from_2 + 5 + udp_dport_from_2 = 30000 + udp_dport_to_2 = udp_dport_from_2 + 5000 + tcp_sport_from_2 = 130 + tcp_sport_to_2 = tcp_sport_from_2 + 5 + tcp_dport_from_2 = 20000 + tcp_dport_to_2 = tcp_dport_from_2 + 5000 + + icmp4_type = 8 # echo request + icmp4_code = 3 + icmp6_type = 128 # echo request + icmp6_code = 3 + + icmp4_type_2 = 8 + icmp4_code_from_2 = 5 + icmp4_code_to_2 = 20 + icmp6_type_2 = 128 + icmp6_code_from_2 = 8 + icmp6_code_to_2 = 42 + + # Test variables + bd_id = 1 + + @classmethod + def setUpClass(cls): + """ + Perform standard class setup (defined by class method setUpClass in + class VppTestCase) before running the test case, set test case related + variables and configure VPP. + """ + super(TestClassifyAcl, cls).setUpClass() + + try: + # Create 2 pg interfaces + cls.create_pg_interfaces(range(2)) + + # Packet flows mapping pg0 -> pg1, pg2 etc. + cls.flows = dict() + cls.flows[cls.pg0] = [cls.pg1] + + # Packet sizes + cls.pg_if_packet_sizes = [64, 512, 1518, 9018] + + # Create BD with MAC learning and unknown unicast flooding disabled + # and put interfaces to this BD + cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1, + learn=1) + for pg_if in cls.pg_interfaces: + cls.vapi.sw_interface_set_l2_bridge( + rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id) + + # Set up all interfaces + for i in cls.pg_interfaces: + i.admin_up() + + # Mapping between packet-generator index and lists of test hosts + cls.hosts_by_pg_idx = dict() + for pg_if in cls.pg_interfaces: + cls.hosts_by_pg_idx[pg_if.sw_if_index] = [] + + # Create list of deleted hosts + cls.deleted_hosts_by_pg_idx = dict() + for pg_if in cls.pg_interfaces: + cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = [] + + # warm-up the mac address tables + # self.warmup_test() + + # Holder of the active classify table key + cls.acl_active_table = '' + + except Exception: + super(TestClassifyAcl, cls).tearDownClass() + raise + + @classmethod + def tearDownClass(cls): + super(TestClassifyAcl, cls).tearDownClass() + + def setUp(self): + super(TestClassifyAcl, self).setUp() + + self.acl_tbl_idx = {} + self.reset_packet_infos() + + def tearDown(self): + """ + Show various debug prints after each test. + """ + if not self.vpp_dead: + if self.acl_active_table == 'mac_inout': + self.output_acl_set_interface( + self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0) + self.input_acl_set_interface( + self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0) + self.acl_active_table = '' + elif self.acl_active_table == 'mac_out': + self.output_acl_set_interface( + self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0) + self.acl_active_table = '' + elif self.acl_active_table == 'mac_in': + self.input_acl_set_interface( + self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0) + self.acl_active_table = '' + + super(TestClassifyAcl, self).tearDown() + + def show_commands_at_teardown(self): + self.logger.info(self.vapi.ppcli("show inacl type l2")) + self.logger.info(self.vapi.ppcli("show outacl type l2")) + self.logger.info(self.vapi.ppcli("show classify tables verbose")) + self.logger.info(self.vapi.ppcli("show bridge-domain %s detail" + % self.bd_id)) + + @staticmethod + def build_mac_mask(dst_mac='', src_mac='', ether_type=''): + """Build MAC ACL mask data with hexstring format + + :param str dst_mac: source MAC address <0-ffffffffffff> + :param str src_mac: destination MAC address <0-ffffffffffff> + :param str ether_type: ethernet type <0-ffff> + """ + + return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format( + dst_mac, src_mac, ether_type)).rstrip('0') + + @staticmethod + def build_mac_match(dst_mac='', src_mac='', ether_type=''): + """Build MAC ACL match data with hexstring format + + :param str dst_mac: source MAC address + :param str src_mac: destination MAC address + :param str ether_type: ethernet type <0-ffff> + """ + if dst_mac: + dst_mac = dst_mac.replace(':', '') + if src_mac: + src_mac = src_mac.replace(':', '') + + return ('{!s:0>12}{!s:0>12}{!s:0>4}'.format( + dst_mac, src_mac, ether_type)).rstrip('0') + + def create_classify_table(self, key, mask, data_offset=0, is_add=1): + """Create Classify Table + + :param str key: key for classify table (ex, ACL name). + :param str mask: mask value for interested traffic. + :param int match_n_vectors: + :param int is_add: option to configure classify table. + - create(1) or delete(0) + """ + r = self.vapi.classify_add_del_table( + is_add, + binascii.unhexlify(mask), + match_n_vectors=(len(mask) - 1) // 32 + 1, + miss_next_index=0, + current_data_flag=1, + current_data_offset=data_offset) + self.assertIsNotNone(r, 'No response msg for add_del_table') + self.acl_tbl_idx[key] = r.new_table_index + + def create_classify_session(self, intf, table_index, match, + hit_next_index=0xffffffff, is_add=1): + """Create Classify Session + + :param VppInterface intf: Interface to apply classify session. + :param int table_index: table index to identify classify table. + :param str match: matched value for interested traffic. + :param int pbr_action: enable/disable PBR feature. + :param int vrfid: VRF id. + :param int is_add: option to configure classify session. + - create(1) or delete(0) + """ + r = self.vapi.classify_add_del_session( + is_add, + table_index, + binascii.unhexlify(match), + hit_next_index=hit_next_index) + self.assertIsNotNone(r, 'No response msg for add_del_session') + + def input_acl_set_interface(self, intf, table_index, is_add=1): + """Configure Input ACL interface + + :param VppInterface intf: Interface to apply Input ACL feature. + :param int table_index: table index to identify classify table. + :param int is_add: option to configure classify session. + - enable(1) or disable(0) + """ + r = self.vapi.input_acl_set_interface( + is_add, + intf.sw_if_index, + l2_table_index=table_index) + self.assertIsNotNone(r, 'No response msg for acl_set_interface') + + def output_acl_set_interface(self, intf, table_index, is_add=1): + """Configure Output ACL interface + + :param VppInterface intf: Interface to apply Output ACL feature. + :param int table_index: table index to identify classify table. + :param int is_add: option to configure classify session. + - enable(1) or disable(0) + """ + r = self.vapi.output_acl_set_interface( + is_add, + intf.sw_if_index, + l2_table_index=table_index) + self.assertIsNotNone(r, 'No response msg for acl_set_interface') + + def create_hosts(self, count, start=0): + """ + Create required number of host MAC addresses and distribute them among + interfaces. Create host IPv4 address for every host MAC address. + + :param int count: Number of hosts to create MAC/IPv4 addresses for. + :param int start: Number to start numbering from. + """ + n_int = len(self.pg_interfaces) + macs_per_if = count / n_int + i = -1 + for pg_if in self.pg_interfaces: + i += 1 + start_nr = macs_per_if * i + start + end_nr = count + start if i == (n_int - 1) \ + else macs_per_if * (i + 1) + start + hosts = self.hosts_by_pg_idx[pg_if.sw_if_index] + for j in range(start_nr, end_nr): + host = Host( + "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j), + "172.17.1%02x.%u" % (pg_if.sw_if_index, j), + "2017:dead:%02x::%u" % (pg_if.sw_if_index, j)) + hosts.append(host) + + def create_upper_layer(self, packet_index, proto, ports=0): + p = self.proto_map[proto] + if p == 'UDP': + if ports == 0: + return UDP(sport=random.randint(self.udp_sport_from, + self.udp_sport_to), + dport=random.randint(self.udp_dport_from, + self.udp_dport_to)) + else: + return UDP(sport=ports, dport=ports) + elif p == 'TCP': + if ports == 0: + return TCP(sport=random.randint(self.tcp_sport_from, + self.tcp_sport_to), + dport=random.randint(self.tcp_dport_from, + self.tcp_dport_to)) + else: + return TCP(sport=ports, dport=ports) + return '' + + def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0, + proto=-1, ports=0, fragments=False, + pkt_raw=True, etype=-1): + """ + Create input packet stream for defined interface using hosts or + deleted_hosts list. + + :param object src_if: Interface to create packet stream for. + :param list packet_sizes: List of required packet sizes. + :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise. + :return: Stream of packets. + """ + pkts = [] + if self.flows.__contains__(src_if): + src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index] + for dst_if in self.flows[src_if]: + dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index] + n_int = len(dst_hosts) * len(src_hosts) + for i in range(0, n_int): + dst_host = dst_hosts[i / len(src_hosts)] + src_host = src_hosts[i % len(src_hosts)] + pkt_info = self.create_packet_info(src_if, dst_if) + if ipv6 == 1: + pkt_info.ip = 1 + elif ipv6 == 0: + pkt_info.ip = 0 + else: + pkt_info.ip = random.choice([0, 1]) + if proto == -1: + pkt_info.proto = random.choice(self.proto[self.IP]) + else: + pkt_info.proto = proto + payload = self.info_to_payload(pkt_info) + p = Ether(dst=dst_host.mac, src=src_host.mac) + if etype > 0: + p = Ether(dst=dst_host.mac, + src=src_host.mac, + type=etype) + if pkt_info.ip: + p /= IPv6(dst=dst_host.ip6, src=src_host.ip6) + if fragments: + p /= IPv6ExtHdrFragment(offset=64, m=1) + else: + if fragments: + p /= IP(src=src_host.ip4, dst=dst_host.ip4, + flags=1, frag=64) + else: + p /= IP(src=src_host.ip4, dst=dst_host.ip4) + if traffic_type == self.ICMP: + if pkt_info.ip: + p /= ICMPv6EchoRequest(type=self.icmp6_type, + code=self.icmp6_code) + else: + p /= ICMP(type=self.icmp4_type, + code=self.icmp4_code) + else: + p /= self.create_upper_layer(i, pkt_info.proto, ports) + if pkt_raw: + p /= Raw(payload) + pkt_info.data = p.copy() + if pkt_raw: + size = random.choice(packet_sizes) + self.extend_packet(p, size) + pkts.append(p) + return pkts + + def verify_capture(self, pg_if, capture, + traffic_type=0, ip_type=0, etype=-1): + """ + Verify captured input packet stream for defined interface. + + :param object pg_if: Interface to verify captured packet stream for. + :param list capture: Captured packet stream. + :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise. + """ + last_info = dict() + for i in self.pg_interfaces: + last_info[i.sw_if_index] = None + dst_sw_if_index = pg_if.sw_if_index + for packet in capture: + if etype > 0: + if packet[Ether].type != etype: + self.logger.error(ppp("Unexpected ethertype in packet:", + packet)) + else: + continue + try: + # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data + if traffic_type == self.ICMP and ip_type == self.IPV6: + payload_info = self.payload_to_info( + packet[ICMPv6EchoRequest].data) + payload = packet[ICMPv6EchoRequest] + else: + payload_info = self.payload_to_info(packet[Raw]) + payload = packet[self.proto_map[payload_info.proto]] + except: + self.logger.error(ppp("Unexpected or invalid packet " + "(outside network):", packet)) + raise + + if ip_type != 0: + self.assertEqual(payload_info.ip, ip_type) + if traffic_type == self.ICMP: + try: + if payload_info.ip == 0: + self.assertEqual(payload.type, self.icmp4_type) + self.assertEqual(payload.code, self.icmp4_code) + else: + self.assertEqual(payload.type, self.icmp6_type) + self.assertEqual(payload.code, self.icmp6_code) + except: + self.logger.error(ppp("Unexpected or invalid packet " + "(outside network):", packet)) + raise + else: + try: + ip_version = IPv6 if payload_info.ip == 1 else IP + + ip = packet[ip_version] + packet_index = payload_info.index + + self.assertEqual(payload_info.dst, dst_sw_if_index) + self.logger.debug("Got packet on port %s: src=%u (id=%u)" % + (pg_if.name, payload_info.src, + packet_index)) + next_info = self.get_next_packet_info_for_interface2( + payload_info.src, dst_sw_if_index, + last_info[payload_info.src]) + last_info[payload_info.src] = next_info + self.assertTrue(next_info is not None) + self.assertEqual(packet_index, next_info.index) + saved_packet = next_info.data + # Check standard fields + self.assertEqual(ip.src, saved_packet[ip_version].src) + self.assertEqual(ip.dst, saved_packet[ip_version].dst) + p = self.proto_map[payload_info.proto] + if p == 'TCP': + tcp = packet[TCP] + self.assertEqual(tcp.sport, saved_packet[ + TCP].sport) + self.assertEqual(tcp.dport, saved_packet[ + TCP].dport) + elif p == 'UDP': + udp = packet[UDP] + self.assertEqual(udp.sport, saved_packet[ + UDP].sport) + self.assertEqual(udp.dport, saved_packet[ + UDP].dport) + except: + self.logger.error(ppp("Unexpected or invalid packet:", + packet)) + raise + for i in self.pg_interfaces: + remaining_packet = self.get_next_packet_info_for_interface2( + i, dst_sw_if_index, last_info[i.sw_if_index]) + self.assertTrue( + remaining_packet is None, + "Port %u: Packet expected from source %u didn't arrive" % + (dst_sw_if_index, i.sw_if_index)) + + def run_traffic_no_check(self): + # Test + # Create incoming packet streams for packet-generator interfaces + for i in self.pg_interfaces: + if self.flows.__contains__(i): + pkts = self.create_stream(i, self.pg_if_packet_sizes) + if len(pkts) > 0: + i.add_stream(pkts) + + # Enable packet capture and start packet sending + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0, + frags=False, pkt_raw=True, etype=-1): + # Test + # Create incoming packet streams for packet-generator interfaces + pkts_cnt = 0 + for i in self.pg_interfaces: + if self.flows.__contains__(i): + pkts = self.create_stream(i, self.pg_if_packet_sizes, + traffic_type, ip_type, proto, ports, + frags, pkt_raw, etype) + if len(pkts) > 0: + i.add_stream(pkts) + pkts_cnt += len(pkts) + + # Enable packet capture and start packet sendingself.IPV + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + # Verify + # Verify outgoing packet streams per packet-generator interface + for src_if in self.pg_interfaces: + if self.flows.__contains__(src_if): + for dst_if in self.flows[src_if]: + capture = dst_if.get_capture(pkts_cnt) + self.logger.info("Verifying capture on interface %s" % + dst_if.name) + self.verify_capture(dst_if, capture, + traffic_type, ip_type, etype) + + def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1, + ports=0, frags=False, etype=-1): + # Test + self.reset_packet_infos() + for i in self.pg_interfaces: + if self.flows.__contains__(i): + pkts = self.create_stream(i, self.pg_if_packet_sizes, + traffic_type, ip_type, proto, ports, + frags, True, etype) + if len(pkts) > 0: + i.add_stream(pkts) + + # Enable packet capture and start packet sending + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + + # Verify + # Verify outgoing packet streams per packet-generator interface + for src_if in self.pg_interfaces: + if self.flows.__contains__(src_if): + for dst_if in self.flows[src_if]: + self.logger.info("Verifying capture on interface %s" % + dst_if.name) + capture = dst_if.get_capture(0) + self.assertEqual(len(capture), 0) + + def build_classify_table(self, src_mac='', dst_mac='', ether_type='', + etype='', key='mac', hit_next_index=0xffffffff): + # Basic ACL testing + a_mask = self.build_mac_mask(src_mac=src_mac, dst_mac=dst_mac, + ether_type=ether_type) + self.create_classify_table(key, a_mask) + for host in self.hosts_by_pg_idx[self.pg0.sw_if_index]: + s_mac = host.mac if src_mac else '' + if dst_mac: + for dst_if in self.flows[self.pg0]: + for dst_host in self.hosts_by_pg_idx[dst_if.sw_if_index]: + self.create_classify_session( + self.pg0, self.acl_tbl_idx.get(key), + self.build_mac_match(src_mac=s_mac, + dst_mac=dst_host.mac, + ether_type=etype), + hit_next_index=hit_next_index) + else: + self.create_classify_session( + self.pg0, self.acl_tbl_idx.get(key), + self.build_mac_match(src_mac=s_mac, dst_mac='', + ether_type=etype), + hit_next_index=hit_next_index) + + def test_0000_warmup_test(self): + """ Learn the MAC addresses + """ + self.create_hosts(2) + self.run_traffic_no_check() + + def test_0010_inacl_permit_src_mac(self): + """ Input L2 ACL test - permit source MAC + + Test scenario for basic IP ACL with source IP + - Create IPv4 stream for pg0 -> pg1 interface. + - Create ACL with source MAC address. + - Send and verify received packets on pg1 interface. + """ + key = 'mac_in' + self.build_classify_table(src_mac='ffffffffffff', key=key) + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) + self.acl_active_table = key + self.run_verify_test(self.IP, self.IPV4, -1) + + def test_0011_inacl_permit_dst_mac(self): + """ Input L2 ACL test - permit destination MAC + + Test scenario for basic IP ACL with source IP + - Create IPv4 stream for pg0 -> pg1 interface. + - Create ACL with destination MAC address. + - Send and verify received packets on pg1 interface. + """ + key = 'mac_in' + self.build_classify_table(dst_mac='ffffffffffff', key=key) + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) + self.acl_active_table = key + self.run_verify_test(self.IP, self.IPV4, -1) + + def test_0012_inacl_permit_src_dst_mac(self): + """ Input L2 ACL test - permit source and destination MAC + + Test scenario for basic IP ACL with source IP + - Create IPv4 stream for pg0 -> pg1 interface. + - Create ACL with source and destination MAC addresses. + - Send and verify received packets on pg1 interface. + """ + key = 'mac_in' + self.build_classify_table( + src_mac='ffffffffffff', dst_mac='ffffffffffff', key=key) + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) + self.acl_active_table = key + self.run_verify_test(self.IP, self.IPV4, -1) + + def test_0013_inacl_permit_ether_type(self): + """ Input L2 ACL test - permit ether_type + + Test scenario for basic IP ACL with source IP + - Create IPv4 stream for pg0 -> pg1 interface. + - Create ACL with destination MAC address. + - Send and verify received packets on pg1 interface. + """ + key = 'mac_in' + self.build_classify_table( + ether_type='ffff', etype=hex(ETH_P_IP)[2:], key=key) + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) + self.acl_active_table = key + self.run_verify_test(self.IP, self.IPV4, -1) + + def test_0015_inacl_deny(self): + """ Input L2 ACL test - deny + + Test scenario for basic IP ACL with source IP + - Create IPv4 stream for pg0 -> pg1 interface. + + - Create ACL with source MAC address. + - Send and verify no received packets on pg1 interface. + """ + key = 'mac_in' + self.build_classify_table( + src_mac='ffffffffffff', hit_next_index=0, key=key) + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) + self.acl_active_table = key + self.run_verify_negat_test(self.IP, self.IPV4, -1) + + def test_0020_outacl_permit(self): + """ Output L2 ACL test - permit + + Test scenario for basic IP ACL with source IP + - Create IPv4 stream for pg0 -> pg1 interface. + - Create ACL with source MAC address. + - Send and verify received packets on pg1 interface. + """ + key = 'mac_out' + self.build_classify_table(src_mac='ffffffffffff', key=key) + self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key)) + self.acl_active_table = key + self.run_verify_test(self.IP, self.IPV4, -1) + + def test_0025_outacl_deny(self): + """ Output L2 ACL test - deny + + Test scenario for basic IP ACL with source IP + - Create IPv4 stream for pg0 -> pg1 interface. + - Create ACL with source MAC address. + - Send and verify no received packets on pg1 interface. + """ + key = 'mac_out' + self.build_classify_table( + src_mac='ffffffffffff', hit_next_index=0, key=key) + self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key)) + self.acl_active_table = key + self.run_verify_negat_test(self.IP, self.IPV4, -1) + + def test_0030_inoutacl_permit(self): + """ Input+Output L2 ACL test - permit + + Test scenario for basic IP ACL with source IP + - Create IPv4 stream for pg0 -> pg1 interface. + - Create ACLs with source MAC address. + - Send and verify received packets on pg1 interface. + """ + key = 'mac_inout' + self.build_classify_table(src_mac='ffffffffffff', key=key) + self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key)) + self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key)) + self.acl_active_table = key + self.run_verify_test(self.IP, self.IPV4, -1) + +if __name__ == '__main__': + unittest.main(testRunner=VppTestRunner) -- cgit 1.2.3-korg