aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins/acl
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins/acl')
-rw-r--r--src/plugins/acl/test/test_acl_plugin.py1438
-rw-r--r--src/plugins/acl/test/test_acl_plugin_conns.py405
-rw-r--r--src/plugins/acl/test/test_acl_plugin_l2l3.py864
-rw-r--r--src/plugins/acl/test/test_acl_plugin_macip.py1278
-rw-r--r--src/plugins/acl/test/test_classify_l2_acl.py608
-rw-r--r--src/plugins/acl/test/vpp_acl.py476
6 files changed, 0 insertions, 5069 deletions
diff --git a/src/plugins/acl/test/test_acl_plugin.py b/src/plugins/acl/test/test_acl_plugin.py
deleted file mode 100644
index 53d96215949..00000000000
--- a/src/plugins/acl/test/test_acl_plugin.py
+++ /dev/null
@@ -1,1438 +0,0 @@
-#!/usr/bin/env python3
-"""ACL plugin Test Case HLD:
-"""
-
-import unittest
-import random
-
-from scapy.packet import Raw
-from scapy.layers.l2 import Ether
-from scapy.layers.inet import IP, TCP, UDP, ICMP
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
-from scapy.layers.inet6 import IPv6ExtHdrFragment
-from framework import VppTestCase, VppTestRunner
-from framework import tag_fixme_vpp_workers
-from util import Host, ppp
-from ipaddress import IPv4Network, IPv6Network
-
-from vpp_lo_interface import VppLoInterface
-from vpp_acl import AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist
-from vpp_ip import INVALID_INDEX
-
-
-@tag_fixme_vpp_workers
-class TestACLplugin(VppTestCase):
- """ ACL plugin Test Case """
-
- # traffic types
- IP = 0
- ICMP = 1
-
- # IP version
- IPRANDOM = -1
- IPV4 = 0
- IPV6 = 1
-
- # rule types
- DENY = 0
- PERMIT = 1
-
- # supported protocols
- proto = [[6, 17], [1, 58]]
- proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
- ICMPv4 = 0
- ICMPv6 = 1
- TCP = 0
- UDP = 1
- PROTO_ALL = 0
-
- # port ranges
- PORTS_ALL = -1
- PORTS_RANGE = 0
- PORTS_RANGE_2 = 1
- udp_sport_from = 10
- udp_sport_to = udp_sport_from + 5
- udp_dport_from = 20000
- udp_dport_to = udp_dport_from + 5000
- tcp_sport_from = 30
- tcp_sport_to = tcp_sport_from + 5
- tcp_dport_from = 40000
- tcp_dport_to = tcp_dport_from + 5000
-
- udp_sport_from_2 = 90
- udp_sport_to_2 = udp_sport_from_2 + 5
- udp_dport_from_2 = 30000
- udp_dport_to_2 = udp_dport_from_2 + 5000
- tcp_sport_from_2 = 130
- tcp_sport_to_2 = tcp_sport_from_2 + 5
- tcp_dport_from_2 = 20000
- tcp_dport_to_2 = tcp_dport_from_2 + 5000
-
- icmp4_type = 8 # echo request
- icmp4_code = 3
- icmp6_type = 128 # echo request
- icmp6_code = 3
-
- icmp4_type_2 = 8
- icmp4_code_from_2 = 5
- icmp4_code_to_2 = 20
- icmp6_type_2 = 128
- icmp6_code_from_2 = 8
- icmp6_code_to_2 = 42
-
- # Test variables
- bd_id = 1
-
- @classmethod
- def setUpClass(cls):
- """
- Perform standard class setup (defined by class method setUpClass in
- class VppTestCase) before running the test case, set test case related
- variables and configure VPP.
- """
- super(TestACLplugin, cls).setUpClass()
-
- try:
- # Create 2 pg interfaces
- cls.create_pg_interfaces(range(2))
-
- # Packet flows mapping pg0 -> pg1, pg2 etc.
- cls.flows = dict()
- cls.flows[cls.pg0] = [cls.pg1]
-
- # Packet sizes
- cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
-
- # Create BD with MAC learning and unknown unicast flooding disabled
- # and put interfaces to this BD
- cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
- learn=1)
- for pg_if in cls.pg_interfaces:
- cls.vapi.sw_interface_set_l2_bridge(
- rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id)
-
- # Set up all interfaces
- for i in cls.pg_interfaces:
- i.admin_up()
-
- # Mapping between packet-generator index and lists of test hosts
- cls.hosts_by_pg_idx = dict()
- for pg_if in cls.pg_interfaces:
- cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
-
- # Create list of deleted hosts
- cls.deleted_hosts_by_pg_idx = dict()
- for pg_if in cls.pg_interfaces:
- cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
-
- # warm-up the mac address tables
- # self.warmup_test()
- count = 16
- start = 0
- n_int = len(cls.pg_interfaces)
- macs_per_if = count // n_int
- i = -1
- for pg_if in cls.pg_interfaces:
- i += 1
- start_nr = macs_per_if * i + start
- end_nr = count + start if i == (n_int - 1) \
- else macs_per_if * (i + 1) + start
- hosts = cls.hosts_by_pg_idx[pg_if.sw_if_index]
- for j in range(int(start_nr), int(end_nr)):
- host = Host(
- "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
- "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
- "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
- hosts.append(host)
-
- except Exception:
- super(TestACLplugin, cls).tearDownClass()
- raise
-
- @classmethod
- def tearDownClass(cls):
- super(TestACLplugin, cls).tearDownClass()
-
- def setUp(self):
- super(TestACLplugin, self).setUp()
- self.reset_packet_infos()
-
- def tearDown(self):
- """
- Show various debug prints after each test.
- """
- super(TestACLplugin, self).tearDown()
-
- def show_commands_at_teardown(self):
- cli = "show vlib graph l2-input-feat-arc"
- self.logger.info(self.vapi.ppcli(cli))
- cli = "show vlib graph l2-input-feat-arc-end"
- self.logger.info(self.vapi.ppcli(cli))
- cli = "show vlib graph l2-output-feat-arc"
- self.logger.info(self.vapi.ppcli(cli))
- cli = "show vlib graph l2-output-feat-arc-end"
- self.logger.info(self.vapi.ppcli(cli))
- self.logger.info(self.vapi.ppcli("show l2fib verbose"))
- self.logger.info(self.vapi.ppcli("show acl-plugin acl"))
- self.logger.info(self.vapi.ppcli("show acl-plugin interface"))
- self.logger.info(self.vapi.ppcli("show acl-plugin tables"))
- self.logger.info(self.vapi.ppcli("show bridge-domain %s detail"
- % self.bd_id))
-
- def create_rule(self, ip=0, permit_deny=0, ports=PORTS_ALL, proto=-1,
- s_prefix=0, s_ip=0,
- d_prefix=0, d_ip=0):
- if ip:
- src_prefix = IPv6Network((s_ip, s_prefix))
- dst_prefix = IPv6Network((d_ip, d_prefix))
- else:
- src_prefix = IPv4Network((s_ip, s_prefix))
- dst_prefix = IPv4Network((d_ip, d_prefix))
- return AclRule(is_permit=permit_deny, ports=ports, proto=proto,
- src_prefix=src_prefix, dst_prefix=dst_prefix)
-
- def apply_rules(self, rules, tag=None):
- acl = VppAcl(self, rules, tag=tag)
- acl.add_vpp_config()
- self.logger.info("Dumped ACL: " + str(acl.dump()))
- # Apply a ACL on the interface as inbound
- for i in self.pg_interfaces:
- acl_if = VppAclInterface(
- self, sw_if_index=i.sw_if_index, n_input=1, acls=[acl])
- acl_if.add_vpp_config()
- return acl.acl_index
-
- def apply_rules_to(self, rules, tag=None, sw_if_index=INVALID_INDEX):
- acl = VppAcl(self, rules, tag=tag)
- acl.add_vpp_config()
- self.logger.info("Dumped ACL: " + str(acl.dump()))
- # Apply a ACL on the interface as inbound
- acl_if = VppAclInterface(self, sw_if_index=sw_if_index, n_input=1,
- acls=[acl])
- return acl.acl_index
-
- def etype_whitelist(self, whitelist, n_input, add=True):
- # Apply whitelists on all the interfaces
- if add:
- self._wl = []
- for i in self.pg_interfaces:
- self._wl.append(VppEtypeWhitelist(
- self, sw_if_index=i.sw_if_index, whitelist=whitelist,
- n_input=n_input).add_vpp_config())
- else:
- if hasattr(self, "_wl"):
- for wl in self._wl:
- wl.remove_vpp_config()
-
- def create_upper_layer(self, packet_index, proto, ports=0):
- p = self.proto_map[proto]
- if p == 'UDP':
- if ports == 0:
- return UDP(sport=random.randint(self.udp_sport_from,
- self.udp_sport_to),
- dport=random.randint(self.udp_dport_from,
- self.udp_dport_to))
- else:
- return UDP(sport=ports, dport=ports)
- elif p == 'TCP':
- if ports == 0:
- return TCP(sport=random.randint(self.tcp_sport_from,
- self.tcp_sport_to),
- dport=random.randint(self.tcp_dport_from,
- self.tcp_dport_to))
- else:
- return TCP(sport=ports, dport=ports)
- return ''
-
- def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
- proto=-1, ports=0, fragments=False,
- pkt_raw=True, etype=-1):
- """
- Create input packet stream for defined interface using hosts or
- deleted_hosts list.
-
- :param object src_if: Interface to create packet stream for.
- :param list packet_sizes: List of required packet sizes.
- :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
- :return: Stream of packets.
- """
- pkts = []
- if self.flows.__contains__(src_if):
- src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
- for dst_if in self.flows[src_if]:
- dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
- n_int = len(dst_hosts) * len(src_hosts)
- for i in range(0, n_int):
- dst_host = dst_hosts[int(i / len(src_hosts))]
- src_host = src_hosts[i % len(src_hosts)]
- pkt_info = self.create_packet_info(src_if, dst_if)
- if ipv6 == 1:
- pkt_info.ip = 1
- elif ipv6 == 0:
- pkt_info.ip = 0
- else:
- pkt_info.ip = random.choice([0, 1])
- if proto == -1:
- pkt_info.proto = random.choice(self.proto[self.IP])
- else:
- pkt_info.proto = proto
- payload = self.info_to_payload(pkt_info)
- p = Ether(dst=dst_host.mac, src=src_host.mac)
- if etype > 0:
- p = Ether(dst=dst_host.mac,
- src=src_host.mac,
- type=etype)
- if pkt_info.ip:
- p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
- if fragments:
- p /= IPv6ExtHdrFragment(offset=64, m=1)
- else:
- if fragments:
- p /= IP(src=src_host.ip4, dst=dst_host.ip4,
- flags=1, frag=64)
- else:
- p /= IP(src=src_host.ip4, dst=dst_host.ip4)
- if traffic_type == self.ICMP:
- if pkt_info.ip:
- p /= ICMPv6EchoRequest(type=self.icmp6_type,
- code=self.icmp6_code)
- else:
- p /= ICMP(type=self.icmp4_type,
- code=self.icmp4_code)
- else:
- p /= self.create_upper_layer(i, pkt_info.proto, ports)
- if pkt_raw:
- p /= Raw(payload)
- pkt_info.data = p.copy()
- if pkt_raw:
- size = random.choice(packet_sizes)
- self.extend_packet(p, size)
- pkts.append(p)
- return pkts
-
- def verify_capture(self, pg_if, capture,
- traffic_type=0, ip_type=0, etype=-1):
- """
- Verify captured input packet stream for defined interface.
-
- :param object pg_if: Interface to verify captured packet stream for.
- :param list capture: Captured packet stream.
- :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
- """
- last_info = dict()
- for i in self.pg_interfaces:
- last_info[i.sw_if_index] = None
- dst_sw_if_index = pg_if.sw_if_index
- for packet in capture:
- if etype > 0:
- if packet[Ether].type != etype:
- self.logger.error(ppp("Unexpected ethertype in packet:",
- packet))
- else:
- continue
- try:
- # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
- if traffic_type == self.ICMP and ip_type == self.IPV6:
- payload_info = self.payload_to_info(
- packet[ICMPv6EchoRequest], 'data')
- payload = packet[ICMPv6EchoRequest]
- else:
- payload_info = self.payload_to_info(packet[Raw])
- payload = packet[self.proto_map[payload_info.proto]]
- except:
- self.logger.error(ppp("Unexpected or invalid packet "
- "(outside network):", packet))
- raise
-
- if ip_type != 0:
- self.assertEqual(payload_info.ip, ip_type)
- if traffic_type == self.ICMP:
- try:
- if payload_info.ip == 0:
- self.assertEqual(payload.type, self.icmp4_type)
- self.assertEqual(payload.code, self.icmp4_code)
- else:
- self.assertEqual(payload.type, self.icmp6_type)
- self.assertEqual(payload.code, self.icmp6_code)
- except:
- self.logger.error(ppp("Unexpected or invalid packet "
- "(outside network):", packet))
- raise
- else:
- try:
- ip_version = IPv6 if payload_info.ip == 1 else IP
-
- ip = packet[ip_version]
- packet_index = payload_info.index
-
- self.assertEqual(payload_info.dst, dst_sw_if_index)
- self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
- (pg_if.name, payload_info.src,
- packet_index))
- next_info = self.get_next_packet_info_for_interface2(
- payload_info.src, dst_sw_if_index,
- last_info[payload_info.src])
- last_info[payload_info.src] = next_info
- self.assertTrue(next_info is not None)
- self.assertEqual(packet_index, next_info.index)
- saved_packet = next_info.data
- # Check standard fields
- self.assertEqual(ip.src, saved_packet[ip_version].src)
- self.assertEqual(ip.dst, saved_packet[ip_version].dst)
- p = self.proto_map[payload_info.proto]
- if p == 'TCP':
- tcp = packet[TCP]
- self.assertEqual(tcp.sport, saved_packet[
- TCP].sport)
- self.assertEqual(tcp.dport, saved_packet[
- TCP].dport)
- elif p == 'UDP':
- udp = packet[UDP]
- self.assertEqual(udp.sport, saved_packet[
- UDP].sport)
- self.assertEqual(udp.dport, saved_packet[
- UDP].dport)
- except:
- self.logger.error(ppp("Unexpected or invalid packet:",
- packet))
- raise
- for i in self.pg_interfaces:
- remaining_packet = self.get_next_packet_info_for_interface2(
- i, dst_sw_if_index, last_info[i.sw_if_index])
- self.assertTrue(
- remaining_packet is None,
- "Port %u: Packet expected from source %u didn't arrive" %
- (dst_sw_if_index, i.sw_if_index))
-
- def run_traffic_no_check(self):
- # Test
- # Create incoming packet streams for packet-generator interfaces
- for i in self.pg_interfaces:
- if self.flows.__contains__(i):
- pkts = self.create_stream(i, self.pg_if_packet_sizes)
- if len(pkts) > 0:
- i.add_stream(pkts)
-
- # Enable packet capture and start packet sending
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
-
- def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
- frags=False, pkt_raw=True, etype=-1):
- # Test
- # Create incoming packet streams for packet-generator interfaces
- pkts_cnt = 0
- for i in self.pg_interfaces:
- if self.flows.__contains__(i):
- pkts = self.create_stream(i, self.pg_if_packet_sizes,
- traffic_type, ip_type, proto, ports,
- frags, pkt_raw, etype)
- if len(pkts) > 0:
- i.add_stream(pkts)
- pkts_cnt += len(pkts)
-
- # Enable packet capture and start packet sendingself.IPV
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.logger.info("sent packets count: %d" % pkts_cnt)
-
- # Verify
- # Verify outgoing packet streams per packet-generator interface
- for src_if in self.pg_interfaces:
- if self.flows.__contains__(src_if):
- for dst_if in self.flows[src_if]:
- capture = dst_if.get_capture(pkts_cnt)
- self.logger.info("Verifying capture on interface %s" %
- dst_if.name)
- self.verify_capture(dst_if, capture,
- traffic_type, ip_type, etype)
-
- def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
- ports=0, frags=False, etype=-1):
- # Test
- pkts_cnt = 0
- self.reset_packet_infos()
- for i in self.pg_interfaces:
- if self.flows.__contains__(i):
- pkts = self.create_stream(i, self.pg_if_packet_sizes,
- traffic_type, ip_type, proto, ports,
- frags, True, etype)
- if len(pkts) > 0:
- i.add_stream(pkts)
- pkts_cnt += len(pkts)
-
- # Enable packet capture and start packet sending
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- self.logger.info("sent packets count: %d" % pkts_cnt)
-
- # Verify
- # Verify outgoing packet streams per packet-generator interface
- for src_if in self.pg_interfaces:
- if self.flows.__contains__(src_if):
- for dst_if in self.flows[src_if]:
- self.logger.info("Verifying capture on interface %s" %
- dst_if.name)
- capture = dst_if.get_capture(0)
- self.assertEqual(len(capture), 0)
-
- def test_0000_warmup_test(self):
- """ ACL plugin version check; learn MACs
- """
- reply = self.vapi.papi.acl_plugin_get_version()
- self.assertEqual(reply.major, 1)
- self.logger.info("Working with ACL plugin version: %d.%d" % (
- reply.major, reply.minor))
- # minor version changes are non breaking
- # self.assertEqual(reply.minor, 0)
-
- def test_0001_acl_create(self):
- """ ACL create/delete test
- """
-
- self.logger.info("ACLP_TEST_START_0001")
- # Create a permit-1234 ACL
- r = [AclRule(is_permit=1, proto=17, ports=1234, sport_to=1235)]
- # Test 1: add a new ACL
- first_acl = VppAcl(self, rules=r, tag="permit 1234")
- first_acl.add_vpp_config()
- self.assertTrue(first_acl.query_vpp_config())
- # The very first ACL gets #0
- self.assertEqual(first_acl.acl_index, 0)
- rr = first_acl.dump()
- self.logger.info("Dumped ACL: " + str(rr))
- self.assertEqual(len(rr), 1)
- # We should have the same number of ACL entries as we had asked
- self.assertEqual(len(rr[0].r), len(r))
- # The rules should be the same. But because the submitted and returned
- # are different types, we need to iterate over rules and keys to get
- # to basic values.
- for i_rule in range(0, len(r) - 1):
- encoded_rule = r[i_rule].encode()
- for rule_key in encoded_rule:
- self.assertEqual(rr[0].r[i_rule][rule_key],
- encoded_rule[rule_key])
-
- # Create a deny-1234 ACL
- r_deny = [AclRule(is_permit=0, proto=17, ports=1234, sport_to=1235),
- AclRule(is_permit=1, proto=17, ports=0)]
- second_acl = VppAcl(self, rules=r_deny, tag="deny 1234;permit all")
- second_acl.add_vpp_config()
- self.assertTrue(second_acl.query_vpp_config())
- # The second ACL gets #1
- self.assertEqual(second_acl.acl_index, 1)
-
- # Test 2: try to modify a nonexistent ACL
- invalid_acl = VppAcl(self, acl_index=432, rules=r, tag="FFFF:FFFF")
- reply = invalid_acl.add_vpp_config(expect_error=True)
-
- # apply an ACL on an interface inbound, try to delete ACL, must fail
- acl_if_list = VppAclInterface(
- self, sw_if_index=self.pg0.sw_if_index, n_input=1,
- acls=[first_acl])
- acl_if_list.add_vpp_config()
- first_acl.remove_vpp_config(expect_error=True)
- # Unapply an ACL and then try to delete it - must be ok
- acl_if_list.remove_vpp_config()
- first_acl.remove_vpp_config()
-
- # apply an ACL on an interface inbound, try to delete ACL, must fail
- acl_if_list = VppAclInterface(
- self, sw_if_index=self.pg0.sw_if_index, n_input=0,
- acls=[second_acl])
- acl_if_list.add_vpp_config()
- second_acl.remove_vpp_config(expect_error=True)
- # Unapply an ACL and then try to delete it - must be ok
- acl_if_list.remove_vpp_config()
- second_acl.remove_vpp_config()
-
- # try to apply a nonexistent ACL - must fail
- acl_if_list = VppAclInterface(
- self, sw_if_index=self.pg0.sw_if_index, n_input=0,
- acls=[invalid_acl])
- acl_if_list.add_vpp_config(expect_error=True)
-
- self.logger.info("ACLP_TEST_FINISH_0001")
-
- def test_0002_acl_permit_apply(self):
- """ permit ACL apply test
- """
- self.logger.info("ACLP_TEST_START_0002")
-
- rules = []
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- 0, self.proto[self.IP][self.UDP]))
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- 0, self.proto[self.IP][self.TCP]))
-
- # Apply rules
- acl_idx = self.apply_rules(rules, "permit per-flow")
-
- # enable counters
- reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
-
- # Traffic should still pass
- self.run_verify_test(self.IP, self.IPV4, -1)
-
- matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
- self.logger.info("stat segment counters: %s" % repr(matches))
- cli = "show acl-plugin acl"
- self.logger.info(self.vapi.ppcli(cli))
- cli = "show acl-plugin tables"
- self.logger.info(self.vapi.ppcli(cli))
-
- total_hits = matches[0][0]['packets'] + matches[0][1]['packets']
- self.assertEqual(total_hits, 64)
-
- # disable counters
- reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
-
- self.logger.info("ACLP_TEST_FINISH_0002")
-
- def test_0003_acl_deny_apply(self):
- """ deny ACL apply test
- """
- self.logger.info("ACLP_TEST_START_0003")
- # Add a deny-flows ACL
- rules = []
- rules.append(self.create_rule(
- self.IPV4, self.DENY, self.PORTS_ALL,
- self.proto[self.IP][self.UDP]))
- # Permit ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- self.PORTS_ALL, 0))
-
- # Apply rules
- acl_idx = self.apply_rules(rules, "deny per-flow;permit all")
-
- # enable counters
- reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=1)
-
- # Traffic should not pass
- self.run_verify_negat_test(self.IP, self.IPV4,
- self.proto[self.IP][self.UDP])
-
- matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
- self.logger.info("stat segment counters: %s" % repr(matches))
- cli = "show acl-plugin acl"
- self.logger.info(self.vapi.ppcli(cli))
- cli = "show acl-plugin tables"
- self.logger.info(self.vapi.ppcli(cli))
- self.assertEqual(matches[0][0]['packets'], 64)
- # disable counters
- reply = self.vapi.papi.acl_stats_intf_counters_enable(enable=0)
- self.logger.info("ACLP_TEST_FINISH_0003")
- # self.assertEqual(, 0)
-
- def test_0004_vpp624_permit_icmpv4(self):
- """ VPP_624 permit ICMPv4
- """
- self.logger.info("ACLP_TEST_START_0004")
-
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.ICMP][self.ICMPv4]))
- # deny ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "permit icmpv4")
-
- # Traffic should still pass
- self.run_verify_test(self.ICMP, self.IPV4,
- self.proto[self.ICMP][self.ICMPv4])
-
- self.logger.info("ACLP_TEST_FINISH_0004")
-
- def test_0005_vpp624_permit_icmpv6(self):
- """ VPP_624 permit ICMPv6
- """
- self.logger.info("ACLP_TEST_START_0005")
-
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.ICMP][self.ICMPv6]))
- # deny ip any any in the end
- rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "permit icmpv6")
-
- # Traffic should still pass
- self.run_verify_test(self.ICMP, self.IPV6,
- self.proto[self.ICMP][self.ICMPv6])
-
- self.logger.info("ACLP_TEST_FINISH_0005")
-
- def test_0006_vpp624_deny_icmpv4(self):
- """ VPP_624 deny ICMPv4
- """
- self.logger.info("ACLP_TEST_START_0006")
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
- self.proto[self.ICMP][self.ICMPv4]))
- # permit ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "deny icmpv4")
-
- # Traffic should not pass
- self.run_verify_negat_test(self.ICMP, self.IPV4, 0)
-
- self.logger.info("ACLP_TEST_FINISH_0006")
-
- def test_0007_vpp624_deny_icmpv6(self):
- """ VPP_624 deny ICMPv6
- """
- self.logger.info("ACLP_TEST_START_0007")
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
- self.proto[self.ICMP][self.ICMPv6]))
- # deny ip any any in the end
- rules.append(self.create_rule(self.IPV6, self.PERMIT,
- self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "deny icmpv6")
-
- # Traffic should not pass
- self.run_verify_negat_test(self.ICMP, self.IPV6, 0)
-
- self.logger.info("ACLP_TEST_FINISH_0007")
-
- def test_0008_tcp_permit_v4(self):
- """ permit TCPv4
- """
- self.logger.info("ACLP_TEST_START_0008")
-
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
- # deny ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "permit ipv4 tcp")
-
- # Traffic should still pass
- self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
-
- self.logger.info("ACLP_TEST_FINISH_0008")
-
- def test_0009_tcp_permit_v6(self):
- """ permit TCPv6
- """
- self.logger.info("ACLP_TEST_START_0009")
-
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
- # deny ip any any in the end
- rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "permit ip6 tcp")
-
- # Traffic should still pass
- self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
-
- self.logger.info("ACLP_TEST_FINISH_0008")
-
- def test_0010_udp_permit_v4(self):
- """ permit UDPv4
- """
- self.logger.info("ACLP_TEST_START_0010")
-
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.UDP]))
- # deny ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "permit ipv udp")
-
- # Traffic should still pass
- self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
-
- self.logger.info("ACLP_TEST_FINISH_0010")
-
- def test_0011_udp_permit_v6(self):
- """ permit UDPv6
- """
- self.logger.info("ACLP_TEST_START_0011")
-
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.UDP]))
- # deny ip any any in the end
- rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "permit ip6 udp")
-
- # Traffic should still pass
- self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
-
- self.logger.info("ACLP_TEST_FINISH_0011")
-
- def test_0012_tcp_deny(self):
- """ deny TCPv4/v6
- """
- self.logger.info("ACLP_TEST_START_0012")
-
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
- # permit ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- self.PORTS_ALL, 0))
- rules.append(self.create_rule(self.IPV6, self.PERMIT,
- self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "deny ip4/ip6 tcp")
-
- # Traffic should not pass
- self.run_verify_negat_test(self.IP, self.IPRANDOM,
- self.proto[self.IP][self.TCP])
-
- self.logger.info("ACLP_TEST_FINISH_0012")
-
- def test_0013_udp_deny(self):
- """ deny UDPv4/v6
- """
- self.logger.info("ACLP_TEST_START_0013")
-
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
- self.proto[self.IP][self.UDP]))
- rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
- self.proto[self.IP][self.UDP]))
- # permit ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- self.PORTS_ALL, 0))
- rules.append(self.create_rule(self.IPV6, self.PERMIT,
- self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "deny ip4/ip6 udp")
-
- # Traffic should not pass
- self.run_verify_negat_test(self.IP, self.IPRANDOM,
- self.proto[self.IP][self.UDP])
-
- self.logger.info("ACLP_TEST_FINISH_0013")
-
- def test_0014_acl_dump(self):
- """ verify add/dump acls
- """
- self.logger.info("ACLP_TEST_START_0014")
-
- r = [[self.IPV4, self.PERMIT, 1234, self.proto[self.IP][self.TCP]],
- [self.IPV4, self.PERMIT, 2345, self.proto[self.IP][self.UDP]],
- [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
- [self.IPV4, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
- [self.IPV4, self.PERMIT, 5, self.proto[self.ICMP][self.ICMPv4]],
- [self.IPV6, self.PERMIT, 4321, self.proto[self.IP][self.TCP]],
- [self.IPV6, self.PERMIT, 5432, self.proto[self.IP][self.UDP]],
- [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.TCP]],
- [self.IPV6, self.PERMIT, 0, self.proto[self.IP][self.UDP]],
- [self.IPV6, self.PERMIT, 6, self.proto[self.ICMP][self.ICMPv6]],
- [self.IPV4, self.DENY, self.PORTS_ALL, 0],
- [self.IPV4, self.DENY, 1234, self.proto[self.IP][self.TCP]],
- [self.IPV4, self.DENY, 2345, self.proto[self.IP][self.UDP]],
- [self.IPV4, self.DENY, 5, self.proto[self.ICMP][self.ICMPv4]],
- [self.IPV6, self.DENY, 4321, self.proto[self.IP][self.TCP]],
- [self.IPV6, self.DENY, 5432, self.proto[self.IP][self.UDP]],
- [self.IPV6, self.DENY, 6, self.proto[self.ICMP][self.ICMPv6]],
- [self.IPV6, self.DENY, self.PORTS_ALL, 0]
- ]
-
- # Add and verify new ACLs
- rules = []
- for i in range(len(r)):
- rules.append(self.create_rule(r[i][0], r[i][1], r[i][2], r[i][3]))
-
- acl = VppAcl(self, rules=rules)
- acl.add_vpp_config()
- result = acl.dump()
-
- i = 0
- for drules in result:
- for dr in drules.r:
- self.assertEqual(dr.is_permit, r[i][1])
- self.assertEqual(dr.proto, r[i][3])
-
- if r[i][2] > 0:
- self.assertEqual(dr.srcport_or_icmptype_first, r[i][2])
- else:
- if r[i][2] < 0:
- self.assertEqual(dr.srcport_or_icmptype_first, 0)
- self.assertEqual(dr.srcport_or_icmptype_last, 65535)
- else:
- if dr.proto == self.proto[self.IP][self.TCP]:
- self.assertGreater(dr.srcport_or_icmptype_first,
- self.tcp_sport_from-1)
- self.assertLess(dr.srcport_or_icmptype_first,
- self.tcp_sport_to+1)
- self.assertGreater(dr.dstport_or_icmpcode_last,
- self.tcp_dport_from-1)
- self.assertLess(dr.dstport_or_icmpcode_last,
- self.tcp_dport_to+1)
- elif dr.proto == self.proto[self.IP][self.UDP]:
- self.assertGreater(dr.srcport_or_icmptype_first,
- self.udp_sport_from-1)
- self.assertLess(dr.srcport_or_icmptype_first,
- self.udp_sport_to+1)
- self.assertGreater(dr.dstport_or_icmpcode_last,
- self.udp_dport_from-1)
- self.assertLess(dr.dstport_or_icmpcode_last,
- self.udp_dport_to+1)
- i += 1
-
- self.logger.info("ACLP_TEST_FINISH_0014")
-
- def test_0015_tcp_permit_port_v4(self):
- """ permit single TCPv4
- """
- self.logger.info("ACLP_TEST_START_0015")
-
- port = random.randint(16384, 65535)
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
- self.proto[self.IP][self.TCP]))
- # deny ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "permit ip4 tcp %d" % port)
-
- # Traffic should still pass
- self.run_verify_test(self.IP, self.IPV4,
- self.proto[self.IP][self.TCP], port)
-
- self.logger.info("ACLP_TEST_FINISH_0015")
-
- def test_0016_udp_permit_port_v4(self):
- """ permit single UDPv4
- """
- self.logger.info("ACLP_TEST_START_0016")
-
- port = random.randint(16384, 65535)
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
- self.proto[self.IP][self.UDP]))
- # deny ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "permit ip4 tcp %d" % port)
-
- # Traffic should still pass
- self.run_verify_test(self.IP, self.IPV4,
- self.proto[self.IP][self.UDP], port)
-
- self.logger.info("ACLP_TEST_FINISH_0016")
-
- def test_0017_tcp_permit_port_v6(self):
- """ permit single TCPv6
- """
- self.logger.info("ACLP_TEST_START_0017")
-
- port = random.randint(16384, 65535)
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
- self.proto[self.IP][self.TCP]))
- # deny ip any any in the end
- rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "permit ip4 tcp %d" % port)
-
- # Traffic should still pass
- self.run_verify_test(self.IP, self.IPV6,
- self.proto[self.IP][self.TCP], port)
-
- self.logger.info("ACLP_TEST_FINISH_0017")
-
- def test_0018_udp_permit_port_v6(self):
- """ permit single UDPv6
- """
- self.logger.info("ACLP_TEST_START_0018")
-
- port = random.randint(16384, 65535)
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
- self.proto[self.IP][self.UDP]))
- # deny ip any any in the end
- rules.append(self.create_rule(self.IPV6, self.DENY,
- self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "permit ip4 tcp %d" % port)
-
- # Traffic should still pass
- self.run_verify_test(self.IP, self.IPV6,
- self.proto[self.IP][self.UDP], port)
-
- self.logger.info("ACLP_TEST_FINISH_0018")
-
- def test_0019_udp_deny_port(self):
- """ deny single TCPv4/v6
- """
- self.logger.info("ACLP_TEST_START_0019")
-
- port = random.randint(16384, 65535)
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, port,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV6, self.DENY, port,
- self.proto[self.IP][self.TCP]))
- # Permit ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- self.PORTS_ALL, 0))
- rules.append(self.create_rule(self.IPV6, self.PERMIT,
- self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
-
- # Traffic should not pass
- self.run_verify_negat_test(self.IP, self.IPRANDOM,
- self.proto[self.IP][self.TCP], port)
-
- self.logger.info("ACLP_TEST_FINISH_0019")
-
- def test_0020_udp_deny_port(self):
- """ deny single UDPv4/v6
- """
- self.logger.info("ACLP_TEST_START_0020")
-
- port = random.randint(16384, 65535)
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, port,
- self.proto[self.IP][self.UDP]))
- rules.append(self.create_rule(self.IPV6, self.DENY, port,
- self.proto[self.IP][self.UDP]))
- # Permit ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- self.PORTS_ALL, 0))
- rules.append(self.create_rule(self.IPV6, self.PERMIT,
- self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
-
- # Traffic should not pass
- self.run_verify_negat_test(self.IP, self.IPRANDOM,
- self.proto[self.IP][self.UDP], port)
-
- self.logger.info("ACLP_TEST_FINISH_0020")
-
- def test_0021_udp_deny_port_verify_fragment_deny(self):
- """ deny single UDPv4/v6, permit ip any, verify non-initial fragment
- blocked
- """
- self.logger.info("ACLP_TEST_START_0021")
-
- port = random.randint(16384, 65535)
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, port,
- self.proto[self.IP][self.UDP]))
- rules.append(self.create_rule(self.IPV6, self.DENY, port,
- self.proto[self.IP][self.UDP]))
- # deny ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- self.PORTS_ALL, 0))
- rules.append(self.create_rule(self.IPV6, self.PERMIT,
- self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "deny ip4/ip6 udp %d" % port)
-
- # Traffic should not pass
- self.run_verify_negat_test(self.IP, self.IPRANDOM,
- self.proto[self.IP][self.UDP], port, True)
-
- self.logger.info("ACLP_TEST_FINISH_0021")
-
- def test_0022_zero_length_udp_ipv4(self):
- """ VPP-687 zero length udp ipv4 packet"""
- self.logger.info("ACLP_TEST_START_0022")
-
- port = random.randint(16384, 65535)
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV4, self.PERMIT, port,
- self.proto[self.IP][self.UDP]))
- # deny ip any any in the end
- rules.append(
- self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "permit empty udp ip4 %d" % port)
-
- # Traffic should still pass
- # Create incoming packet streams for packet-generator interfaces
- pkts_cnt = 0
- pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
- self.IP, self.IPV4,
- self.proto[self.IP][self.UDP], port,
- False, False)
- if len(pkts) > 0:
- self.pg0.add_stream(pkts)
- pkts_cnt += len(pkts)
-
- # Enable packet capture and start packet sendingself.IPV
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
-
- self.pg1.get_capture(pkts_cnt)
-
- self.logger.info("ACLP_TEST_FINISH_0022")
-
- def test_0023_zero_length_udp_ipv6(self):
- """ VPP-687 zero length udp ipv6 packet"""
- self.logger.info("ACLP_TEST_START_0023")
-
- port = random.randint(16384, 65535)
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV6, self.PERMIT, port,
- self.proto[self.IP][self.UDP]))
- # deny ip any any in the end
- rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "permit empty udp ip6 %d" % port)
-
- # Traffic should still pass
- # Create incoming packet streams for packet-generator interfaces
- pkts_cnt = 0
- pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes,
- self.IP, self.IPV6,
- self.proto[self.IP][self.UDP], port,
- False, False)
- if len(pkts) > 0:
- self.pg0.add_stream(pkts)
- pkts_cnt += len(pkts)
-
- # Enable packet capture and start packet sendingself.IPV
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
-
- # Verify outgoing packet streams per packet-generator interface
- self.pg1.get_capture(pkts_cnt)
-
- self.logger.info("ACLP_TEST_FINISH_0023")
-
- def test_0108_tcp_permit_v4(self):
- """ permit TCPv4 + non-match range
- """
- self.logger.info("ACLP_TEST_START_0108")
-
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
- # deny ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "permit ipv4 tcp")
-
- # Traffic should still pass
- self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP])
-
- self.logger.info("ACLP_TEST_FINISH_0108")
-
- def test_0109_tcp_permit_v6(self):
- """ permit TCPv6 + non-match range
- """
- self.logger.info("ACLP_TEST_START_0109")
-
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
- # deny ip any any in the end
- rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "permit ip6 tcp")
-
- # Traffic should still pass
- self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.TCP])
-
- self.logger.info("ACLP_TEST_FINISH_0109")
-
- def test_0110_udp_permit_v4(self):
- """ permit UDPv4 + non-match range
- """
- self.logger.info("ACLP_TEST_START_0110")
-
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.UDP]))
- rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.UDP]))
- # deny ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "permit ipv4 udp")
-
- # Traffic should still pass
- self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.UDP])
-
- self.logger.info("ACLP_TEST_FINISH_0110")
-
- def test_0111_udp_permit_v6(self):
- """ permit UDPv6 + non-match range
- """
- self.logger.info("ACLP_TEST_START_0111")
-
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.UDP]))
- rules.append(self.create_rule(self.IPV6, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.UDP]))
- # deny ip any any in the end
- rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "permit ip6 udp")
-
- # Traffic should still pass
- self.run_verify_test(self.IP, self.IPV6, self.proto[self.IP][self.UDP])
-
- self.logger.info("ACLP_TEST_FINISH_0111")
-
- def test_0112_tcp_deny(self):
- """ deny TCPv4/v6 + non-match range
- """
- self.logger.info("ACLP_TEST_START_0112")
-
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV6, self.PERMIT,
- self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
- # permit ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- self.PORTS_ALL, 0))
- rules.append(self.create_rule(self.IPV6, self.PERMIT,
- self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "deny ip4/ip6 tcp")
-
- # Traffic should not pass
- self.run_verify_negat_test(self.IP, self.IPRANDOM,
- self.proto[self.IP][self.TCP])
-
- self.logger.info("ACLP_TEST_FINISH_0112")
-
- def test_0113_udp_deny(self):
- """ deny UDPv4/v6 + non-match range
- """
- self.logger.info("ACLP_TEST_START_0113")
-
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- self.PORTS_RANGE_2,
- self.proto[self.IP][self.UDP]))
- rules.append(self.create_rule(self.IPV6, self.PERMIT,
- self.PORTS_RANGE_2,
- self.proto[self.IP][self.UDP]))
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE,
- self.proto[self.IP][self.UDP]))
- rules.append(self.create_rule(self.IPV6, self.DENY, self.PORTS_RANGE,
- self.proto[self.IP][self.UDP]))
- # permit ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.PERMIT,
- self.PORTS_ALL, 0))
- rules.append(self.create_rule(self.IPV6, self.PERMIT,
- self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "deny ip4/ip6 udp")
-
- # Traffic should not pass
- self.run_verify_negat_test(self.IP, self.IPRANDOM,
- self.proto[self.IP][self.UDP])
-
- self.logger.info("ACLP_TEST_FINISH_0113")
-
- def test_0300_tcp_permit_v4_etype_aaaa(self):
- """ permit TCPv4, send 0xAAAA etype
- """
- self.logger.info("ACLP_TEST_START_0300")
-
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
- # deny ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "permit ipv4 tcp")
-
- # Traffic should still pass also for an odd ethertype
- self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
- 0, False, True, 0xaaaa)
- self.logger.info("ACLP_TEST_FINISH_0300")
-
- def test_0305_tcp_permit_v4_etype_blacklist_aaaa(self):
- """ permit TCPv4, whitelist 0x0BBB ethertype, send 0xAAAA-blocked
- """
- self.logger.info("ACLP_TEST_START_0305")
-
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
- # deny ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "permit ipv4 tcp")
- # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
- self.etype_whitelist([0xbbb], 1)
-
- # The oddball ethertype should be blocked
- self.run_verify_negat_test(self.IP, self.IPV4,
- self.proto[self.IP][self.TCP],
- 0, False, 0xaaaa)
-
- # remove the whitelist
- self.etype_whitelist([], 0, add=False)
-
- self.logger.info("ACLP_TEST_FINISH_0305")
-
- def test_0306_tcp_permit_v4_etype_blacklist_aaaa(self):
- """ permit TCPv4, whitelist 0x0BBB ethertype, send 0x0BBB - pass
- """
- self.logger.info("ACLP_TEST_START_0306")
-
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
- # deny ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "permit ipv4 tcp")
- # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
- self.etype_whitelist([0xbbb], 1)
-
- # The whitelisted traffic, should pass
- self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
- 0, False, True, 0x0bbb)
-
- # remove the whitelist, the previously blocked 0xAAAA should pass now
- self.etype_whitelist([], 0, add=False)
-
- self.logger.info("ACLP_TEST_FINISH_0306")
-
- def test_0307_tcp_permit_v4_etype_blacklist_aaaa(self):
- """ permit TCPv4, whitelist 0x0BBB, remove, send 0xAAAA - pass
- """
- self.logger.info("ACLP_TEST_START_0307")
-
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
- # deny ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
-
- # Apply rules
- self.apply_rules(rules, "permit ipv4 tcp")
-
- # whitelist the 0xbbbb etype - so the 0xaaaa should be blocked
- self.etype_whitelist([0xbbb], 1)
- # remove the whitelist, the previously blocked 0xAAAA should pass now
- self.etype_whitelist([], 0, add=False)
-
- # The whitelisted traffic, should pass
- self.run_verify_test(self.IP, self.IPV4, self.proto[self.IP][self.TCP],
- 0, False, True, 0xaaaa)
-
- self.logger.info("ACLP_TEST_FINISH_0306")
-
- def test_0315_del_intf(self):
- """ apply an acl and delete the interface
- """
- self.logger.info("ACLP_TEST_START_0315")
-
- # Add an ACL
- rules = []
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_RANGE_2,
- self.proto[self.IP][self.TCP]))
- rules.append(self.create_rule(self.IPV4, self.PERMIT, self.PORTS_RANGE,
- self.proto[self.IP][self.TCP]))
- # deny ip any any in the end
- rules.append(self.create_rule(self.IPV4, self.DENY, self.PORTS_ALL, 0))
-
- # create an interface
- intf = []
- intf.append(VppLoInterface(self))
-
- # Apply rules
- self.apply_rules_to(rules, "permit ipv4 tcp", intf[0].sw_if_index)
-
- # Remove the interface
- intf[0].remove_vpp_config()
-
- self.logger.info("ACLP_TEST_FINISH_0315")
-
-
-if __name__ == '__main__':
- unittest.main(testRunner=VppTestRunner)
diff --git a/src/plugins/acl/test/test_acl_plugin_conns.py b/src/plugins/acl/test/test_acl_plugin_conns.py
deleted file mode 100644
index c7941fa150b..00000000000
--- a/src/plugins/acl/test/test_acl_plugin_conns.py
+++ /dev/null
@@ -1,405 +0,0 @@
-#!/usr/bin/env python3
-""" ACL plugin extended stateful tests """
-
-import unittest
-from framework import VppTestCase, VppTestRunner, running_extended_tests
-from scapy.layers.l2 import Ether
-from scapy.packet import Raw
-from scapy.layers.inet import IP, UDP, TCP
-from scapy.packet import Packet
-from socket import inet_pton, AF_INET, AF_INET6
-from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest
-from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting
-from scapy.layers.inet6 import IPv6ExtHdrFragment
-from pprint import pprint
-from random import randint
-from util import L4_Conn
-from ipaddress import ip_network
-
-from vpp_acl import AclRule, VppAcl, VppAclInterface
-
-
-def to_acl_rule(self, is_permit, wildcard_sport=False):
- p = self
- rule_family = AF_INET6 if p.haslayer(IPv6) else AF_INET
- rule_prefix_len = 128 if p.haslayer(IPv6) else 32
- rule_l3_layer = IPv6 if p.haslayer(IPv6) else IP
- rule_l4_sport = p.sport
- rule_l4_dport = p.dport
- if p.haslayer(IPv6):
- rule_l4_proto = p[IPv6].nh
- else:
- rule_l4_proto = p[IP].proto
-
- if wildcard_sport:
- rule_l4_sport_first = 0
- rule_l4_sport_last = 65535
- else:
- rule_l4_sport_first = rule_l4_sport
- rule_l4_sport_last = rule_l4_sport
-
- new_rule = AclRule(is_permit=is_permit, proto=rule_l4_proto,
- src_prefix=ip_network(
- (p[rule_l3_layer].src, rule_prefix_len)),
- dst_prefix=ip_network(
- (p[rule_l3_layer].dst, rule_prefix_len)),
- sport_from=rule_l4_sport_first,
- sport_to=rule_l4_sport_last,
- dport_from=rule_l4_dport, dport_to=rule_l4_dport)
-
- return new_rule
-
-
-Packet.to_acl_rule = to_acl_rule
-
-
-class IterateWithSleep():
- def __init__(self, testcase, n_iters, description, sleep_sec):
- self.curr = 0
- self.testcase = testcase
- self.n_iters = n_iters
- self.sleep_sec = sleep_sec
- self.description = description
-
- def __iter__(self):
- for x in range(0, self.n_iters):
- yield x
- self.testcase.sleep(self.sleep_sec)
-
-
-class Conn(L4_Conn):
- def apply_acls(self, reflect_side, acl_side):
- pkts = []
- pkts.append(self.pkt(0))
- pkts.append(self.pkt(1))
- pkt = pkts[reflect_side]
-
- r = []
- r.append(pkt.to_acl_rule(2, wildcard_sport=True))
- r.append(self.wildcard_rule(0))
- reflect_acl = VppAcl(self.testcase, r)
- reflect_acl.add_vpp_config()
-
- r = []
- r.append(self.wildcard_rule(0))
- deny_acl = VppAcl(self.testcase, r)
- deny_acl.add_vpp_config()
-
- if reflect_side == acl_side:
- acl_if0 = VppAclInterface(self.testcase,
- self.ifs[acl_side].sw_if_index,
- [reflect_acl, deny_acl], n_input=1)
- acl_if1 = VppAclInterface(self.testcase,
- self.ifs[1-acl_side].sw_if_index, [],
- n_input=0)
- acl_if0.add_vpp_config()
- acl_if1.add_vpp_config()
- else:
- acl_if0 = VppAclInterface(self.testcase,
- self.ifs[acl_side].sw_if_index,
- [deny_acl, reflect_acl], n_input=1)
- acl_if1 = VppAclInterface(self.testcase,
- self.ifs[1-acl_side].sw_if_index, [],
- n_input=0)
- acl_if0.add_vpp_config()
- acl_if1.add_vpp_config()
-
- def wildcard_rule(self, is_permit):
- any_addr = ["0.0.0.0", "::"]
- rule_family = self.address_family
- is_ip6 = 1 if rule_family == AF_INET6 else 0
- new_rule = AclRule(is_permit=is_permit, proto=0,
- src_prefix=ip_network(
- (any_addr[is_ip6], 0)),
- dst_prefix=ip_network(
- (any_addr[is_ip6], 0)),
- sport_from=0, sport_to=65535, dport_from=0,
- dport_to=65535)
- return new_rule
-
-
-@unittest.skipUnless(running_extended_tests, "part of extended tests")
-class ACLPluginConnTestCase(VppTestCase):
- """ ACL plugin connection-oriented extended testcases """
-
- @classmethod
- def setUpClass(cls):
- super(ACLPluginConnTestCase, cls).setUpClass()
- # create pg0 and pg1
- cls.create_pg_interfaces(range(2))
- cmd = "set acl-plugin session table event-trace 1"
- cls.logger.info(cls.vapi.cli(cmd))
- for i in cls.pg_interfaces:
- i.admin_up()
- i.config_ip4()
- i.config_ip6()
- i.resolve_arp()
- i.resolve_ndp()
-
- @classmethod
- def tearDownClass(cls):
- super(ACLPluginConnTestCase, cls).tearDownClass()
-
- def tearDown(self):
- """Run standard test teardown and log various show commands
- """
- super(ACLPluginConnTestCase, self).tearDown()
-
- def show_commands_at_teardown(self):
- self.logger.info(self.vapi.cli("show ip neighbors"))
- self.logger.info(self.vapi.cli("show ip6 neighbors"))
- self.logger.info(self.vapi.cli("show acl-plugin sessions"))
- self.logger.info(self.vapi.cli("show acl-plugin acl"))
- self.logger.info(self.vapi.cli("show acl-plugin interface"))
- self.logger.info(self.vapi.cli("show acl-plugin tables"))
- self.logger.info(self.vapi.cli("show event-logger all"))
-
- def run_basic_conn_test(self, af, acl_side):
- """ Basic conn timeout test """
- conn1 = Conn(self, self.pg0, self.pg1, af, UDP, 42001, 4242)
- conn1.apply_acls(0, acl_side)
- conn1.send_through(0)
- # the return packets should pass
- conn1.send_through(1)
- # send some packets on conn1, ensure it doesn't go away
- for i in IterateWithSleep(self, 20, "Keep conn active", 0.3):
- conn1.send_through(1)
- # allow the conn to time out
- for i in IterateWithSleep(self, 30, "Wait for timeout", 0.1):
- pass
- # now try to send a packet on the reflected side
- try:
- p2 = conn1.send_through(1).command()
- except:
- # If we asserted while waiting, it's good.
- # the conn should have timed out.
- p2 = None
- self.assert_equal(p2, None, "packet on long-idle conn")
-
- def run_active_conn_test(self, af, acl_side):
- """ Idle connection behind active connection test """
- base = 10000 + 1000*acl_side
- conn1 = Conn(self, self.pg0, self.pg1, af, UDP, base + 1, 2323)
- conn2 = Conn(self, self.pg0, self.pg1, af, UDP, base + 2, 2323)
- conn3 = Conn(self, self.pg0, self.pg1, af, UDP, base + 3, 2323)
- conn1.apply_acls(0, acl_side)
- conn1.send(0)
- conn1.recv(1)
- # create and check that the conn2/3 work
- self.sleep(0.1)
- conn2.send_pingpong(0)
- self.sleep(0.1)
- conn3.send_pingpong(0)
- # send some packets on conn1, keep conn2/3 idle
- for i in IterateWithSleep(self, 20, "Keep conn active", 0.2):
- conn1.send_through(1)
- try:
- p2 = conn2.send_through(1).command()
- except:
- # If we asserted while waiting, it's good.
- # the conn should have timed out.
- p2 = None
- # We should have not received the packet on a long-idle
- # connection, because it should have timed out
- # If it didn't - it is a problem
- self.assert_equal(p2, None, "packet on long-idle conn")
-
- def run_clear_conn_test(self, af, acl_side):
- """ Clear the connections via CLI """
- conn1 = Conn(self, self.pg0, self.pg1, af, UDP, 42001, 4242)
- conn1.apply_acls(0, acl_side)
- conn1.send_through(0)
- # the return packets should pass
- conn1.send_through(1)
- # send some packets on conn1, ensure it doesn't go away
- for i in IterateWithSleep(self, 20, "Keep conn active", 0.3):
- conn1.send_through(1)
- # clear all connections
- self.vapi.ppcli("clear acl-plugin sessions")
- # now try to send a packet on the reflected side
- try:
- p2 = conn1.send_through(1).command()
- except:
- # If we asserted while waiting, it's good.
- # the conn should have timed out.
- p2 = None
- self.assert_equal(p2, None, "packet on supposedly deleted conn")
-
- def run_tcp_transient_setup_conn_test(self, af, acl_side):
- conn1 = Conn(self, self.pg0, self.pg1, af, TCP, 53001, 5151)
- conn1.apply_acls(0, acl_side)
- conn1.send_through(0, 'S')
- # the return packets should pass
- conn1.send_through(1, 'SA')
- # allow the conn to time out
- for i in IterateWithSleep(self, 30, "Wait for timeout", 0.1):
- pass
- # ensure conn times out
- try:
- p2 = conn1.send_through(1).command()
- except:
- # If we asserted while waiting, it's good.
- # the conn should have timed out.
- p2 = None
- self.assert_equal(p2, None, "packet on supposedly deleted conn")
-
- def run_tcp_established_conn_test(self, af, acl_side):
- conn1 = Conn(self, self.pg0, self.pg1, af, TCP, 53002, 5052)
- conn1.apply_acls(0, acl_side)
- conn1.send_through(0, 'S')
- # the return packets should pass
- conn1.send_through(1, 'SA')
- # complete the threeway handshake
- # (NB: sequence numbers not tracked, so not set!)
- conn1.send_through(0, 'A')
- # allow the conn to time out if it's in embryonic timer
- for i in IterateWithSleep(self, 30, "Wait for transient timeout", 0.1):
- pass
- # Try to send the packet from the "forbidden" side - it must pass
- conn1.send_through(1, 'A')
- # ensure conn times out for real
- for i in IterateWithSleep(self, 130, "Wait for timeout", 0.1):
- pass
- try:
- p2 = conn1.send_through(1).command()
- except:
- # If we asserted while waiting, it's good.
- # the conn should have timed out.
- p2 = None
- self.assert_equal(p2, None, "packet on supposedly deleted conn")
-
- def run_tcp_transient_teardown_conn_test(self, af, acl_side):
- conn1 = Conn(self, self.pg0, self.pg1, af, TCP, 53002, 5052)
- conn1.apply_acls(0, acl_side)
- conn1.send_through(0, 'S')
- # the return packets should pass
- conn1.send_through(1, 'SA')
- # complete the threeway handshake
- # (NB: sequence numbers not tracked, so not set!)
- conn1.send_through(0, 'A')
- # allow the conn to time out if it's in embryonic timer
- for i in IterateWithSleep(self, 30, "Wait for transient timeout", 0.1):
- pass
- # Try to send the packet from the "forbidden" side - it must pass
- conn1.send_through(1, 'A')
- # Send the FIN to bounce the session out of established
- conn1.send_through(1, 'FA')
- # If conn landed on transient timer it will time out here
- for i in IterateWithSleep(self, 30, "Wait for transient timeout", 0.1):
- pass
- # Now it should have timed out already
- try:
- p2 = conn1.send_through(1).command()
- except:
- # If we asserted while waiting, it's good.
- # the conn should have timed out.
- p2 = None
- self.assert_equal(p2, None, "packet on supposedly deleted conn")
-
- def test_0000_conn_prepare_test(self):
- """ Prepare the settings """
- self.vapi.ppcli("set acl-plugin session timeout udp idle 1")
-
- def test_0001_basic_conn_test(self):
- """ IPv4: Basic conn timeout test reflect on ingress """
- self.run_basic_conn_test(AF_INET, 0)
-
- def test_0002_basic_conn_test(self):
- """ IPv4: Basic conn timeout test reflect on egress """
- self.run_basic_conn_test(AF_INET, 1)
-
- def test_0005_clear_conn_test(self):
- """ IPv4: reflect egress, clear conn """
- self.run_clear_conn_test(AF_INET, 1)
-
- def test_0006_clear_conn_test(self):
- """ IPv4: reflect ingress, clear conn """
- self.run_clear_conn_test(AF_INET, 0)
-
- def test_0011_active_conn_test(self):
- """ IPv4: Idle conn behind active conn, reflect on ingress """
- self.run_active_conn_test(AF_INET, 0)
-
- def test_0012_active_conn_test(self):
- """ IPv4: Idle conn behind active conn, reflect on egress """
- self.run_active_conn_test(AF_INET, 1)
-
- def test_1001_basic_conn_test(self):
- """ IPv6: Basic conn timeout test reflect on ingress """
- self.run_basic_conn_test(AF_INET6, 0)
-
- def test_1002_basic_conn_test(self):
- """ IPv6: Basic conn timeout test reflect on egress """
- self.run_basic_conn_test(AF_INET6, 1)
-
- def test_1005_clear_conn_test(self):
- """ IPv6: reflect egress, clear conn """
- self.run_clear_conn_test(AF_INET6, 1)
-
- def test_1006_clear_conn_test(self):
- """ IPv6: reflect ingress, clear conn """
- self.run_clear_conn_test(AF_INET6, 0)
-
- def test_1011_active_conn_test(self):
- """ IPv6: Idle conn behind active conn, reflect on ingress """
- self.run_active_conn_test(AF_INET6, 0)
-
- def test_1012_active_conn_test(self):
- """ IPv6: Idle conn behind active conn, reflect on egress """
- self.run_active_conn_test(AF_INET6, 1)
-
- def test_2000_prepare_for_tcp_test(self):
- """ Prepare for TCP session tests """
- # ensure the session hangs on if it gets treated as UDP
- self.vapi.ppcli("set acl-plugin session timeout udp idle 200")
- # let the TCP connection time out at 5 seconds
- self.vapi.ppcli("set acl-plugin session timeout tcp idle 10")
- self.vapi.ppcli("set acl-plugin session timeout tcp transient 1")
-
- def test_2001_tcp_transient_conn_test(self):
- """ IPv4: transient TCP session (incomplete 3WHS), ref. on ingress """
- self.run_tcp_transient_setup_conn_test(AF_INET, 0)
-
- def test_2002_tcp_transient_conn_test(self):
- """ IPv4: transient TCP session (incomplete 3WHS), ref. on egress """
- self.run_tcp_transient_setup_conn_test(AF_INET, 1)
-
- def test_2003_tcp_transient_conn_test(self):
- """ IPv4: established TCP session (complete 3WHS), ref. on ingress """
- self.run_tcp_established_conn_test(AF_INET, 0)
-
- def test_2004_tcp_transient_conn_test(self):
- """ IPv4: established TCP session (complete 3WHS), ref. on egress """
- self.run_tcp_established_conn_test(AF_INET, 1)
-
- def test_2005_tcp_transient_teardown_conn_test(self):
- """ IPv4: transient TCP session (3WHS,ACK,FINACK), ref. on ingress """
- self.run_tcp_transient_teardown_conn_test(AF_INET, 0)
-
- def test_2006_tcp_transient_teardown_conn_test(self):
- """ IPv4: transient TCP session (3WHS,ACK,FINACK), ref. on egress """
- self.run_tcp_transient_teardown_conn_test(AF_INET, 1)
-
- def test_3001_tcp_transient_conn_test(self):
- """ IPv6: transient TCP session (incomplete 3WHS), ref. on ingress """
- self.run_tcp_transient_setup_conn_test(AF_INET6, 0)
-
- def test_3002_tcp_transient_conn_test(self):
- """ IPv6: transient TCP session (incomplete 3WHS), ref. on egress """
- self.run_tcp_transient_setup_conn_test(AF_INET6, 1)
-
- def test_3003_tcp_transient_conn_test(self):
- """ IPv6: established TCP session (complete 3WHS), ref. on ingress """
- self.run_tcp_established_conn_test(AF_INET6, 0)
-
- def test_3004_tcp_transient_conn_test(self):
- """ IPv6: established TCP session (complete 3WHS), ref. on egress """
- self.run_tcp_established_conn_test(AF_INET6, 1)
-
- def test_3005_tcp_transient_teardown_conn_test(self):
- """ IPv6: transient TCP session (3WHS,ACK,FINACK), ref. on ingress """
- self.run_tcp_transient_teardown_conn_test(AF_INET6, 0)
-
- def test_3006_tcp_transient_teardown_conn_test(self):
- """ IPv6: transient TCP session (3WHS,ACK,FINACK), ref. on egress """
- self.run_tcp_transient_teardown_conn_test(AF_INET6, 1)
diff --git a/src/plugins/acl/test/test_acl_plugin_l2l3.py b/src/plugins/acl/test/test_acl_plugin_l2l3.py
deleted file mode 100644
index 48faafb7398..00000000000
--- a/src/plugins/acl/test/test_acl_plugin_l2l3.py
+++ /dev/null
@@ -1,864 +0,0 @@
-#!/usr/bin/env python3
-"""ACL IRB Test Case HLD:
-
-**config**
- - L2 MAC learning enabled in l2bd
- - 2 routed interfaces untagged, bvi (Bridge Virtual Interface)
- - 2 bridged interfaces in l2bd with bvi
-
-**test**
- - sending ip4 eth pkts between routed interfaces
- - 2 routed interfaces
- - 2 bridged interfaces
-
- - 64B, 512B, 1518B, 9200B (ether_size)
-
- - burst of pkts per interface
- - 257pkts per burst
- - routed pkts hitting different FIB entries
- - bridged pkts hitting different MAC entries
-
-**verify**
- - all packets received correctly
-
-"""
-
-import copy
-import unittest
-from socket import inet_pton, AF_INET, AF_INET6
-from random import choice, shuffle
-from pprint import pprint
-from ipaddress import ip_network
-
-import scapy.compat
-from scapy.packet import Raw
-from scapy.layers.l2 import Ether
-from scapy.layers.inet import IP, UDP, ICMP, TCP
-from scapy.layers.inet6 import IPv6, ICMPv6Unknown, ICMPv6EchoRequest
-from scapy.layers.inet6 import ICMPv6EchoReply, IPv6ExtHdrRouting
-from scapy.layers.inet6 import IPv6ExtHdrFragment
-
-from framework import VppTestCase, VppTestRunner
-from vpp_l2 import L2_PORT_TYPE
-import time
-
-from vpp_acl import AclRule, VppAcl, VppAclInterface
-
-
-class TestACLpluginL2L3(VppTestCase):
- """TestACLpluginL2L3 Test Case"""
-
- @classmethod
- def setUpClass(cls):
- """
- #. Create BD with MAC learning enabled and put interfaces to this BD.
- #. Configure IPv4 addresses on loopback interface and routed interface.
- #. Configure MAC address binding to IPv4 neighbors on loop0.
- #. Configure MAC address on pg2.
- #. Loopback BVI interface has remote hosts, one half of hosts are
- behind pg0 second behind pg1.
- """
- super(TestACLpluginL2L3, cls).setUpClass()
-
- cls.pg_if_packet_sizes = [64, 512, 1518, 9018] # packet sizes
- cls.bd_id = 10
- cls.remote_hosts_count = 250
-
- # create 3 pg interfaces, 1 loopback interface
- cls.create_pg_interfaces(range(3))
- cls.create_loopback_interfaces(1)
-
- cls.interfaces = list(cls.pg_interfaces)
- cls.interfaces.extend(cls.lo_interfaces)
-
- for i in cls.interfaces:
- i.admin_up()
-
- # Create BD with MAC learning enabled and put interfaces to this BD
- cls.vapi.sw_interface_set_l2_bridge(
- rx_sw_if_index=cls.loop0.sw_if_index, bd_id=cls.bd_id,
- port_type=L2_PORT_TYPE.BVI)
- cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg0.sw_if_index,
- bd_id=cls.bd_id)
- cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=cls.pg1.sw_if_index,
- bd_id=cls.bd_id)
-
- # Configure IPv4 addresses on loopback interface and routed interface
- cls.loop0.config_ip4()
- cls.loop0.config_ip6()
- cls.pg2.config_ip4()
- cls.pg2.config_ip6()
-
- # Configure MAC address binding to IPv4 neighbors on loop0
- cls.loop0.generate_remote_hosts(cls.remote_hosts_count)
- cls.loop0.configure_ipv4_neighbors()
- cls.loop0.configure_ipv6_neighbors()
- # configure MAC address on pg2
- cls.pg2.resolve_arp()
- cls.pg2.resolve_ndp()
-
- cls.WITHOUT_EH = False
- cls.WITH_EH = True
- cls.STATELESS_ICMP = False
- cls.STATEFUL_ICMP = True
-
- # Loopback BVI interface has remote hosts, one half of hosts are behind
- # pg0 second behind pg1
- half = cls.remote_hosts_count // 2
- cls.pg0.remote_hosts = cls.loop0.remote_hosts[:half]
- cls.pg1.remote_hosts = cls.loop0.remote_hosts[half:]
- reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=1)
-
- @classmethod
- def tearDownClass(cls):
- reply = cls.vapi.papi.acl_stats_intf_counters_enable(enable=0)
- super(TestACLpluginL2L3, cls).tearDownClass()
-
- def tearDown(self):
- """Run standard test teardown and log ``show l2patch``,
- ``show l2fib verbose``,``show bridge-domain <bd_id> detail``,
- ``show ip neighbors``.
- """
- super(TestACLpluginL2L3, self).tearDown()
-
- def show_commands_at_teardown(self):
- self.logger.info(self.vapi.cli("show l2patch"))
- self.logger.info(self.vapi.cli("show classify tables"))
- self.logger.info(self.vapi.cli("show l2fib verbose"))
- self.logger.info(self.vapi.cli("show bridge-domain %s detail" %
- self.bd_id))
- self.logger.info(self.vapi.cli("show ip neighbors"))
- cmd = "show acl-plugin sessions verbose 1"
- self.logger.info(self.vapi.cli(cmd))
- self.logger.info(self.vapi.cli("show acl-plugin acl"))
- self.logger.info(self.vapi.cli("show acl-plugin interface"))
- self.logger.info(self.vapi.cli("show acl-plugin tables"))
-
- def create_stream(self, src_ip_if, dst_ip_if, reverse, packet_sizes,
- is_ip6, expect_blocked, expect_established,
- add_extension_header, icmp_stateful=False):
- pkts = []
- rules = []
- permit_rules = []
- permit_and_reflect_rules = []
- total_packet_count = 8
- for i in range(0, total_packet_count):
- modulo = (i//2) % 2
- icmp_type_delta = i % 2
- icmp_code = i
- is_udp_packet = (modulo == 0)
- if is_udp_packet and icmp_stateful:
- continue
- is_reflectable_icmp = (icmp_stateful and icmp_type_delta == 0 and
- not is_udp_packet)
- is_reflected_icmp = is_reflectable_icmp and expect_established
- can_reflect_this_packet = is_udp_packet or is_reflectable_icmp
- is_permit = i % 2
- remote_dst_index = i % len(dst_ip_if.remote_hosts)
- remote_dst_host = dst_ip_if.remote_hosts[remote_dst_index]
- if is_permit == 1:
- info = self.create_packet_info(src_ip_if, dst_ip_if)
- payload = self.info_to_payload(info)
- else:
- to_be_blocked = False
- if (expect_blocked and not expect_established):
- to_be_blocked = True
- if (not can_reflect_this_packet):
- to_be_blocked = True
- if to_be_blocked:
- payload = "to be blocked"
- else:
- info = self.create_packet_info(src_ip_if, dst_ip_if)
- payload = self.info_to_payload(info)
- if reverse:
- dst_mac = 'de:ad:00:00:00:00'
- src_mac = remote_dst_host._mac
- dst_ip6 = src_ip_if.remote_ip6
- src_ip6 = remote_dst_host.ip6
- dst_ip4 = src_ip_if.remote_ip4
- src_ip4 = remote_dst_host.ip4
- dst_l4 = 1234 + i
- src_l4 = 4321 + i
- else:
- dst_mac = src_ip_if.local_mac
- src_mac = src_ip_if.remote_mac
- src_ip6 = src_ip_if.remote_ip6
- dst_ip6 = remote_dst_host.ip6
- src_ip4 = src_ip_if.remote_ip4
- dst_ip4 = remote_dst_host.ip4
- src_l4 = 1234 + i
- dst_l4 = 4321 + i
- if is_reflected_icmp:
- icmp_type_delta = 1
-
- # default ULP should be something we do not use in tests
- ulp_l4 = TCP(sport=src_l4, dport=dst_l4)
- # potentially a chain of protocols leading to ULP
- ulp = ulp_l4
-
- if is_udp_packet:
- if is_ip6:
- ulp_l4 = UDP(sport=src_l4, dport=dst_l4)
- if add_extension_header:
- # prepend some extension headers
- ulp = (IPv6ExtHdrRouting() / IPv6ExtHdrRouting() /
- IPv6ExtHdrFragment(offset=0, m=1) / ulp_l4)
- # uncomment below to test invalid ones
- # ulp = IPv6ExtHdrRouting(len = 200) / ulp_l4
- else:
- ulp = ulp_l4
- p = (Ether(dst=dst_mac, src=src_mac) /
- IPv6(src=src_ip6, dst=dst_ip6) /
- ulp /
- Raw(payload))
- else:
- ulp_l4 = UDP(sport=src_l4, dport=dst_l4)
- # IPv4 does not allow extension headers,
- # but we rather make it a first fragment
- flags = 1 if add_extension_header else 0
- ulp = ulp_l4
- p = (Ether(dst=dst_mac, src=src_mac) /
- IP(src=src_ip4, dst=dst_ip4, frag=0, flags=flags) /
- ulp /
- Raw(payload))
- elif modulo == 1:
- if is_ip6:
- ulp_l4 = ICMPv6Unknown(type=128 + icmp_type_delta,
- code=icmp_code)
- ulp = ulp_l4
- p = (Ether(dst=dst_mac, src=src_mac) /
- IPv6(src=src_ip6, dst=dst_ip6) /
- ulp /
- Raw(payload))
- else:
- ulp_l4 = ICMP(type=8 - 8*icmp_type_delta, code=icmp_code)
- ulp = ulp_l4
- p = (Ether(dst=dst_mac, src=src_mac) /
- IP(src=src_ip4, dst=dst_ip4) /
- ulp /
- Raw(payload))
-
- if i % 2 == 1:
- info.data = p.copy()
- size = packet_sizes[(i // 2) % len(packet_sizes)]
- self.extend_packet(p, size)
- pkts.append(p)
-
- rule_family = AF_INET6 if p.haslayer(IPv6) else AF_INET
- rule_prefix_len = 128 if p.haslayer(IPv6) else 32
- rule_l3_layer = IPv6 if p.haslayer(IPv6) else IP
-
- if p.haslayer(UDP):
- rule_l4_sport = p[UDP].sport
- rule_l4_dport = p[UDP].dport
- else:
- if p.haslayer(ICMP):
- rule_l4_sport = p[ICMP].type
- rule_l4_dport = p[ICMP].code
- else:
- rule_l4_sport = p[ICMPv6Unknown].type
- rule_l4_dport = p[ICMPv6Unknown].code
- if p.haslayer(IPv6):
- rule_l4_proto = ulp_l4.overload_fields[IPv6]['nh']
- else:
- rule_l4_proto = p[IP].proto
-
- new_rule = AclRule(is_permit=is_permit, proto=rule_l4_proto,
- src_prefix=ip_network(
- (p[rule_l3_layer].src, rule_prefix_len)),
- dst_prefix=ip_network(
- (p[rule_l3_layer].dst, rule_prefix_len)),
- sport_from=rule_l4_sport,
- sport_to=rule_l4_sport,
- dport_from=rule_l4_dport,
- dport_to=rule_l4_dport)
-
- rules.append(new_rule)
- new_rule_permit = copy.copy(new_rule)
- new_rule_permit.is_permit = 1
- permit_rules.append(new_rule_permit)
-
- new_rule_permit_and_reflect = copy.copy(new_rule)
- if can_reflect_this_packet:
- new_rule_permit_and_reflect.is_permit = 2
- else:
- new_rule_permit_and_reflect.is_permit = is_permit
-
- permit_and_reflect_rules.append(new_rule_permit_and_reflect)
- self.logger.info("create_stream pkt#%d: %s" % (i, payload))
-
- return {'stream': pkts,
- 'rules': rules,
- 'permit_rules': permit_rules,
- 'permit_and_reflect_rules': permit_and_reflect_rules}
-
- def verify_capture(self, dst_ip_if, src_ip_if, capture, reverse):
- last_info = dict()
- for i in self.interfaces:
- last_info[i.sw_if_index] = None
-
- dst_ip_sw_if_index = dst_ip_if.sw_if_index
-
- for packet in capture:
- l3 = IP if packet.haslayer(IP) else IPv6
- ip = packet[l3]
- if packet.haslayer(UDP):
- l4 = UDP
- else:
- if packet.haslayer(ICMP):
- l4 = ICMP
- else:
- l4 = ICMPv6Unknown
-
- # Scapy IPv6 stuff is too smart for its own good.
- # So we do this and coerce the ICMP into unknown type
- if packet.haslayer(UDP):
- data = scapy.compat.raw(packet[UDP][Raw])
- else:
- if l3 == IP:
- data = scapy.compat.raw(ICMP(
- scapy.compat.raw(packet[l3].payload))[Raw])
- else:
- data = scapy.compat.raw(ICMPv6Unknown(
- scapy.compat.raw(packet[l3].payload)).msgbody)
- udp_or_icmp = packet[l3].payload
- data_obj = Raw(data)
- # FIXME: make framework believe we are on object
- payload_info = self.payload_to_info(data_obj)
- packet_index = payload_info.index
-
- self.assertEqual(payload_info.dst, dst_ip_sw_if_index)
-
- next_info = self.get_next_packet_info_for_interface2(
- payload_info.src, dst_ip_sw_if_index,
- last_info[payload_info.src])
- last_info[payload_info.src] = next_info
- self.assertTrue(next_info is not None)
- self.assertEqual(packet_index, next_info.index)
- saved_packet = next_info.data
- self.assertTrue(next_info is not None)
-
- # MAC: src, dst
- if not reverse:
- self.assertEqual(packet.src, dst_ip_if.local_mac)
- host = dst_ip_if.host_by_mac(packet.dst)
-
- # IP: src, dst
- # self.assertEqual(ip.src, src_ip_if.remote_ip4)
- if saved_packet is not None:
- self.assertEqual(ip.src, saved_packet[l3].src)
- self.assertEqual(ip.dst, saved_packet[l3].dst)
- if l4 == UDP:
- self.assertEqual(udp_or_icmp.sport, saved_packet[l4].sport)
- self.assertEqual(udp_or_icmp.dport, saved_packet[l4].dport)
- # self.assertEqual(ip.dst, host.ip4)
-
- # UDP:
-
- def applied_acl_shuffle(self, acl_if):
- saved_n_input = acl_if.n_input
- # TOTO: maybe copy each one??
- saved_acls = acl_if.acls
-
- # now create a list of all the rules in all ACLs
- all_rules = []
- for old_acl in saved_acls:
- for rule in old_acl.rules:
- all_rules.append(rule)
-
- # Add a few ACLs made from shuffled rules
- shuffle(all_rules)
- acl1 = VppAcl(self, rules=all_rules[::2], tag="shuffle 1. acl")
- acl1.add_vpp_config()
-
- shuffle(all_rules)
- acl2 = VppAcl(self, rules=all_rules[::3], tag="shuffle 2. acl")
- acl2.add_vpp_config()
-
- shuffle(all_rules)
- acl3 = VppAcl(self, rules=all_rules[::2], tag="shuffle 3. acl")
- acl3.add_vpp_config()
-
- # apply the shuffle ACLs in front
- input_acls = [acl1, acl2]
- output_acls = [acl1, acl2]
-
- # add the currently applied ACLs
- n_input = acl_if.n_input
- input_acls.extend(saved_acls[:n_input])
- output_acls.extend(saved_acls[n_input:])
-
- # and the trailing shuffle ACL(s)
- input_acls.extend([acl3])
- output_acls.extend([acl3])
-
- # set the interface ACL list to the result
- acl_if.n_input = len(input_acls)
- acl_if.acls = input_acls + output_acls
- acl_if.add_vpp_config()
-
- # change the ACLs a few times
- for i in range(1, 10):
- shuffle(all_rules)
- acl1.modify_vpp_config(all_rules[::1+(i % 2)])
-
- shuffle(all_rules)
- acl2.modify_vpp_config(all_rules[::1+(i % 3)])
-
- shuffle(all_rules)
- acl3.modify_vpp_config(all_rules[::1+(i % 5)])
-
- # restore to how it was before and clean up
- acl_if.n_input = saved_n_input
- acl_if.acls = saved_acls
- acl_if.add_vpp_config()
-
- acl1.remove_vpp_config()
- acl2.remove_vpp_config()
- acl3.remove_vpp_config()
-
- def create_acls_for_a_stream(self, stream_dict,
- test_l2_action, is_reflect):
- r = stream_dict['rules']
- r_permit = stream_dict['permit_rules']
- r_permit_reflect = stream_dict['permit_and_reflect_rules']
- r_action = r_permit_reflect if is_reflect else r
- action_acl = VppAcl(self, rules=r_action, tag="act. acl")
- action_acl.add_vpp_config()
- permit_acl = VppAcl(self, rules=r_permit, tag="perm. acl")
- permit_acl.add_vpp_config()
-
- return {'L2': action_acl if test_l2_action else permit_acl,
- 'L3': permit_acl if test_l2_action else action_acl,
- 'permit': permit_acl, 'action': action_acl}
-
- def apply_acl_ip46_x_to_y(self, bridged_to_routed, test_l2_deny,
- is_ip6, is_reflect, add_eh):
- """ Apply the ACLs
- """
- self.reset_packet_infos()
- stream_dict = self.create_stream(
- self.pg2, self.loop0,
- bridged_to_routed,
- self.pg_if_packet_sizes, is_ip6,
- not is_reflect, False, add_eh)
- stream = stream_dict['stream']
- acl_idx = self.create_acls_for_a_stream(stream_dict, test_l2_deny,
- is_reflect)
- n_input_l3 = 0 if bridged_to_routed else 1
- n_input_l2 = 1 if bridged_to_routed else 0
-
- acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index,
- n_input=n_input_l3, acls=[acl_idx['L3']])
- acl_if_pg2.add_vpp_config()
-
- acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index,
- n_input=n_input_l2, acls=[acl_idx['L2']])
- acl_if_pg0.add_vpp_config()
-
- acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index,
- n_input=n_input_l2, acls=[acl_idx['L2']])
- acl_if_pg1.add_vpp_config()
-
- self.applied_acl_shuffle(acl_if_pg0)
- self.applied_acl_shuffle(acl_if_pg1)
- return {'L2': acl_idx['L2'], 'L3': acl_idx['L3']}
-
- def apply_acl_ip46_both_directions_reflect(self,
- primary_is_bridged_to_routed,
- reflect_on_l2, is_ip6, add_eh,
- stateful_icmp):
- primary_is_routed_to_bridged = not primary_is_bridged_to_routed
- self.reset_packet_infos()
- stream_dict_fwd = self.create_stream(self.pg2, self.loop0,
- primary_is_bridged_to_routed,
- self.pg_if_packet_sizes, is_ip6,
- False, False, add_eh,
- stateful_icmp)
- acl_idx_fwd = self.create_acls_for_a_stream(stream_dict_fwd,
- reflect_on_l2, True)
-
- stream_dict_rev = self.create_stream(self.pg2, self.loop0,
- not primary_is_bridged_to_routed,
- self.pg_if_packet_sizes, is_ip6,
- True, True, add_eh, stateful_icmp)
- # We want the primary action to be "deny" rather than reflect
- acl_idx_rev = self.create_acls_for_a_stream(stream_dict_rev,
- reflect_on_l2, False)
-
- if primary_is_bridged_to_routed:
- inbound_l2_acl = acl_idx_fwd['L2']
- else:
- inbound_l2_acl = acl_idx_rev['L2']
-
- if primary_is_routed_to_bridged:
- outbound_l2_acl = acl_idx_fwd['L2']
- else:
- outbound_l2_acl = acl_idx_rev['L2']
-
- if primary_is_routed_to_bridged:
- inbound_l3_acl = acl_idx_fwd['L3']
- else:
- inbound_l3_acl = acl_idx_rev['L3']
-
- if primary_is_bridged_to_routed:
- outbound_l3_acl = acl_idx_fwd['L3']
- else:
- outbound_l3_acl = acl_idx_rev['L3']
-
- acl_if_pg2 = VppAclInterface(self, sw_if_index=self.pg2.sw_if_index,
- n_input=1,
- acls=[inbound_l3_acl, outbound_l3_acl])
- acl_if_pg2.add_vpp_config()
-
- acl_if_pg0 = VppAclInterface(self, sw_if_index=self.pg0.sw_if_index,
- n_input=1,
- acls=[inbound_l2_acl, outbound_l2_acl])
- acl_if_pg0.add_vpp_config()
-
- acl_if_pg1 = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index,
- n_input=1,
- acls=[inbound_l2_acl, outbound_l2_acl])
- acl_if_pg1.add_vpp_config()
-
- self.applied_acl_shuffle(acl_if_pg0)
- self.applied_acl_shuffle(acl_if_pg2)
-
- def apply_acl_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
- is_reflect, add_eh):
- return self.apply_acl_ip46_x_to_y(False, test_l2_deny, is_ip6,
- is_reflect, add_eh)
-
- def apply_acl_ip46_bridged_to_routed(self, test_l2_deny, is_ip6,
- is_reflect, add_eh):
- return self.apply_acl_ip46_x_to_y(True, test_l2_deny, is_ip6,
- is_reflect, add_eh)
-
- def verify_acl_packet_count(self, acl_idx, packet_count):
- matches = self.statistics.get_counter('/acl/%d/matches' % acl_idx)
- self.logger.info("stat seg for ACL %d: %s" % (acl_idx, repr(matches)))
- total_count = 0
- for m in matches:
- for p in m:
- total_count = total_count + p['packets']
- self.assertEqual(total_count, packet_count)
-
- def run_traffic_ip46_x_to_y(self, bridged_to_routed,
- test_l2_deny, is_ip6,
- is_reflect, is_established, add_eh,
- stateful_icmp=False):
- self.reset_packet_infos()
- stream_dict = self.create_stream(self.pg2, self.loop0,
- bridged_to_routed,
- self.pg_if_packet_sizes, is_ip6,
- not is_reflect, is_established,
- add_eh, stateful_icmp)
- stream = stream_dict['stream']
-
- tx_if = self.pg0 if bridged_to_routed else self.pg2
- rx_if = self.pg2 if bridged_to_routed else self.pg0
-
- tx_if.add_stream(stream)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- packet_count = self.get_packet_count_for_if_idx(self.loop0.sw_if_index)
- rcvd1 = rx_if.get_capture(packet_count)
- self.verify_capture(self.loop0, self.pg2, rcvd1, bridged_to_routed)
- return len(stream)
-
- def run_traffic_ip46_routed_to_bridged(self, test_l2_deny, is_ip6,
- is_reflect, is_established, add_eh,
- stateful_icmp=False):
- return self.run_traffic_ip46_x_to_y(False, test_l2_deny, is_ip6,
- is_reflect, is_established, add_eh,
- stateful_icmp)
-
- def run_traffic_ip46_bridged_to_routed(self, test_l2_deny, is_ip6,
- is_reflect, is_established, add_eh,
- stateful_icmp=False):
- return self.run_traffic_ip46_x_to_y(True, test_l2_deny, is_ip6,
- is_reflect, is_established, add_eh,
- stateful_icmp)
-
- def run_test_ip46_routed_to_bridged(self, test_l2_deny,
- is_ip6, is_reflect, add_eh):
- acls = self.apply_acl_ip46_routed_to_bridged(test_l2_deny,
- is_ip6, is_reflect,
- add_eh)
- pkts = self.run_traffic_ip46_routed_to_bridged(test_l2_deny, is_ip6,
- is_reflect, False,
- add_eh)
- self.verify_acl_packet_count(acls['L3'].acl_index, pkts)
-
- def run_test_ip46_bridged_to_routed(self, test_l2_deny,
- is_ip6, is_reflect, add_eh):
- acls = self.apply_acl_ip46_bridged_to_routed(test_l2_deny,
- is_ip6, is_reflect,
- add_eh)
- pkts = self.run_traffic_ip46_bridged_to_routed(test_l2_deny, is_ip6,
- is_reflect, False,
- add_eh)
- self.verify_acl_packet_count(acls['L2'].acl_index, pkts)
-
- def run_test_ip46_routed_to_bridged_and_back(self, test_l2_action,
- is_ip6, add_eh,
- stateful_icmp=False):
- self.apply_acl_ip46_both_directions_reflect(False, test_l2_action,
- is_ip6, add_eh,
- stateful_icmp)
- self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6,
- True, False, add_eh,
- stateful_icmp)
- self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6,
- False, True, add_eh,
- stateful_icmp)
-
- def run_test_ip46_bridged_to_routed_and_back(self, test_l2_action,
- is_ip6, add_eh,
- stateful_icmp=False):
- self.apply_acl_ip46_both_directions_reflect(True, test_l2_action,
- is_ip6, add_eh,
- stateful_icmp)
- self.run_traffic_ip46_bridged_to_routed(test_l2_action, is_ip6,
- True, False, add_eh,
- stateful_icmp)
- self.run_traffic_ip46_routed_to_bridged(test_l2_action, is_ip6,
- False, True, add_eh,
- stateful_icmp)
-
- def test_0000_ip6_irb_1(self):
- """ ACL plugin prepare"""
- if not self.vpp_dead:
- cmd = "set acl-plugin session timeout udp idle 2000"
- self.logger.info(self.vapi.ppcli(cmd))
- # uncomment to not skip past the routing header
- # and watch the EH tests fail
- # self.logger.info(self.vapi.ppcli(
- # "set acl-plugin skip-ipv6-extension-header 43 0"))
- # uncomment to test the session limit (stateful tests will fail)
- # self.logger.info(self.vapi.ppcli(
- # "set acl-plugin session table max-entries 1"))
- # new datapath is the default, but just in case
- # self.logger.info(self.vapi.ppcli(
- # "set acl-plugin l2-datapath new"))
- # If you want to see some tests fail, uncomment the next line
- # self.logger.info(self.vapi.ppcli(
- # "set acl-plugin l2-datapath old"))
-
- def test_0001_ip6_irb_1(self):
- """ ACL IPv6 routed -> bridged, L2 ACL deny"""
- self.run_test_ip46_routed_to_bridged(True, True, False,
- self.WITHOUT_EH)
-
- def test_0002_ip6_irb_1(self):
- """ ACL IPv6 routed -> bridged, L3 ACL deny"""
- self.run_test_ip46_routed_to_bridged(False, True, False,
- self.WITHOUT_EH)
-
- def test_0003_ip4_irb_1(self):
- """ ACL IPv4 routed -> bridged, L2 ACL deny"""
- self.run_test_ip46_routed_to_bridged(True, False, False,
- self.WITHOUT_EH)
-
- def test_0004_ip4_irb_1(self):
- """ ACL IPv4 routed -> bridged, L3 ACL deny"""
- self.run_test_ip46_routed_to_bridged(False, False, False,
- self.WITHOUT_EH)
-
- def test_0005_ip6_irb_1(self):
- """ ACL IPv6 bridged -> routed, L2 ACL deny """
- self.run_test_ip46_bridged_to_routed(True, True, False,
- self.WITHOUT_EH)
-
- def test_0006_ip6_irb_1(self):
- """ ACL IPv6 bridged -> routed, L3 ACL deny """
- self.run_test_ip46_bridged_to_routed(False, True, False,
- self.WITHOUT_EH)
-
- def test_0007_ip6_irb_1(self):
- """ ACL IPv4 bridged -> routed, L2 ACL deny """
- self.run_test_ip46_bridged_to_routed(True, False, False,
- self.WITHOUT_EH)
-
- def test_0008_ip6_irb_1(self):
- """ ACL IPv4 bridged -> routed, L3 ACL deny """
- self.run_test_ip46_bridged_to_routed(False, False, False,
- self.WITHOUT_EH)
-
- # Stateful ACL tests
- def test_0101_ip6_irb_1(self):
- """ ACL IPv6 routed -> bridged, L2 ACL permit+reflect"""
- self.run_test_ip46_routed_to_bridged_and_back(True, True,
- self.WITHOUT_EH)
-
- def test_0102_ip6_irb_1(self):
- """ ACL IPv6 bridged -> routed, L2 ACL permit+reflect"""
- self.run_test_ip46_bridged_to_routed_and_back(True, True,
- self.WITHOUT_EH)
-
- def test_0103_ip6_irb_1(self):
- """ ACL IPv4 routed -> bridged, L2 ACL permit+reflect"""
- self.run_test_ip46_routed_to_bridged_and_back(True, False,
- self.WITHOUT_EH)
-
- def test_0104_ip6_irb_1(self):
- """ ACL IPv4 bridged -> routed, L2 ACL permit+reflect"""
- self.run_test_ip46_bridged_to_routed_and_back(True, False,
- self.WITHOUT_EH)
-
- def test_0111_ip6_irb_1(self):
- """ ACL IPv6 routed -> bridged, L3 ACL permit+reflect"""
- self.run_test_ip46_routed_to_bridged_and_back(False, True,
- self.WITHOUT_EH)
-
- def test_0112_ip6_irb_1(self):
- """ ACL IPv6 bridged -> routed, L3 ACL permit+reflect"""
- self.run_test_ip46_bridged_to_routed_and_back(False, True,
- self.WITHOUT_EH)
-
- def test_0113_ip6_irb_1(self):
- """ ACL IPv4 routed -> bridged, L3 ACL permit+reflect"""
- self.run_test_ip46_routed_to_bridged_and_back(False, False,
- self.WITHOUT_EH)
-
- def test_0114_ip6_irb_1(self):
- """ ACL IPv4 bridged -> routed, L3 ACL permit+reflect"""
- self.run_test_ip46_bridged_to_routed_and_back(False, False,
- self.WITHOUT_EH)
-
- # A block of tests with extension headers
-
- def test_1001_ip6_irb_1(self):
- """ ACL IPv6+EH routed -> bridged, L2 ACL deny"""
- self.run_test_ip46_routed_to_bridged(True, True, False,
- self.WITH_EH)
-
- def test_1002_ip6_irb_1(self):
- """ ACL IPv6+EH routed -> bridged, L3 ACL deny"""
- self.run_test_ip46_routed_to_bridged(False, True, False,
- self.WITH_EH)
-
- def test_1005_ip6_irb_1(self):
- """ ACL IPv6+EH bridged -> routed, L2 ACL deny """
- self.run_test_ip46_bridged_to_routed(True, True, False,
- self.WITH_EH)
-
- def test_1006_ip6_irb_1(self):
- """ ACL IPv6+EH bridged -> routed, L3 ACL deny """
- self.run_test_ip46_bridged_to_routed(False, True, False,
- self.WITH_EH)
-
- def test_1101_ip6_irb_1(self):
- """ ACL IPv6+EH routed -> bridged, L2 ACL permit+reflect"""
- self.run_test_ip46_routed_to_bridged_and_back(True, True,
- self.WITH_EH)
-
- def test_1102_ip6_irb_1(self):
- """ ACL IPv6+EH bridged -> routed, L2 ACL permit+reflect"""
- self.run_test_ip46_bridged_to_routed_and_back(True, True,
- self.WITH_EH)
-
- def test_1111_ip6_irb_1(self):
- """ ACL IPv6+EH routed -> bridged, L3 ACL permit+reflect"""
- self.run_test_ip46_routed_to_bridged_and_back(False, True,
- self.WITH_EH)
-
- def test_1112_ip6_irb_1(self):
- """ ACL IPv6+EH bridged -> routed, L3 ACL permit+reflect"""
- self.run_test_ip46_bridged_to_routed_and_back(False, True,
- self.WITH_EH)
-
- # IPv4 with "MF" bit set
-
- def test_1201_ip6_irb_1(self):
- """ ACL IPv4+MF routed -> bridged, L2 ACL deny"""
- self.run_test_ip46_routed_to_bridged(True, False, False,
- self.WITH_EH)
-
- def test_1202_ip6_irb_1(self):
- """ ACL IPv4+MF routed -> bridged, L3 ACL deny"""
- self.run_test_ip46_routed_to_bridged(False, False, False,
- self.WITH_EH)
-
- def test_1205_ip6_irb_1(self):
- """ ACL IPv4+MF bridged -> routed, L2 ACL deny """
- self.run_test_ip46_bridged_to_routed(True, False, False,
- self.WITH_EH)
-
- def test_1206_ip6_irb_1(self):
- """ ACL IPv4+MF bridged -> routed, L3 ACL deny """
- self.run_test_ip46_bridged_to_routed(False, False, False,
- self.WITH_EH)
-
- def test_1301_ip6_irb_1(self):
- """ ACL IPv4+MF routed -> bridged, L2 ACL permit+reflect"""
- self.run_test_ip46_routed_to_bridged_and_back(True, False,
- self.WITH_EH)
-
- def test_1302_ip6_irb_1(self):
- """ ACL IPv4+MF bridged -> routed, L2 ACL permit+reflect"""
- self.run_test_ip46_bridged_to_routed_and_back(True, False,
- self.WITH_EH)
-
- def test_1311_ip6_irb_1(self):
- """ ACL IPv4+MF routed -> bridged, L3 ACL permit+reflect"""
- self.run_test_ip46_routed_to_bridged_and_back(False, False,
- self.WITH_EH)
-
- def test_1312_ip6_irb_1(self):
- """ ACL IPv4+MF bridged -> routed, L3 ACL permit+reflect"""
- self.run_test_ip46_bridged_to_routed_and_back(False, False,
- self.WITH_EH)
- # Stateful ACL tests with stateful ICMP
-
- def test_1401_ip6_irb_1(self):
- """ IPv6 routed -> bridged, L2 ACL permit+reflect, ICMP reflect"""
- self.run_test_ip46_routed_to_bridged_and_back(True, True,
- self.WITHOUT_EH,
- self.STATEFUL_ICMP)
-
- def test_1402_ip6_irb_1(self):
- """ IPv6 bridged -> routed, L2 ACL permit+reflect, ICMP reflect"""
- self.run_test_ip46_bridged_to_routed_and_back(True, True,
- self.WITHOUT_EH,
- self.STATEFUL_ICMP)
-
- def test_1403_ip4_irb_1(self):
- """ IPv4 routed -> bridged, L2 ACL permit+reflect, ICMP reflect"""
- self.run_test_ip46_routed_to_bridged_and_back(True, False,
- self.WITHOUT_EH,
- self.STATEFUL_ICMP)
-
- def test_1404_ip4_irb_1(self):
- """ IPv4 bridged -> routed, L2 ACL permit+reflect, ICMP reflect"""
- self.run_test_ip46_bridged_to_routed_and_back(True, False,
- self.WITHOUT_EH,
- self.STATEFUL_ICMP)
-
- def test_1411_ip6_irb_1(self):
- """ IPv6 routed -> bridged, L3 ACL permit+reflect, ICMP reflect"""
- self.run_test_ip46_routed_to_bridged_and_back(False, True,
- self.WITHOUT_EH,
- self.STATEFUL_ICMP)
-
- def test_1412_ip6_irb_1(self):
- """ IPv6 bridged -> routed, L3 ACL permit+reflect, ICMP reflect"""
- self.run_test_ip46_bridged_to_routed_and_back(False, True,
- self.WITHOUT_EH,
- self.STATEFUL_ICMP)
-
- def test_1413_ip4_irb_1(self):
- """ IPv4 routed -> bridged, L3 ACL permit+reflect, ICMP reflect"""
- self.run_test_ip46_routed_to_bridged_and_back(False, False,
- self.WITHOUT_EH,
- self.STATEFUL_ICMP)
-
- def test_1414_ip4_irb_1(self):
- """ IPv4 bridged -> routed, L3 ACL permit+reflect, ICMP reflect"""
- self.run_test_ip46_bridged_to_routed_and_back(False, False,
- self.WITHOUT_EH,
- self.STATEFUL_ICMP)
-
-
-if __name__ == '__main__':
- unittest.main(testRunner=VppTestRunner)
diff --git a/src/plugins/acl/test/test_acl_plugin_macip.py b/src/plugins/acl/test/test_acl_plugin_macip.py
deleted file mode 100644
index 5edd7b03258..00000000000
--- a/src/plugins/acl/test/test_acl_plugin_macip.py
+++ /dev/null
@@ -1,1278 +0,0 @@
-#!/usr/bin/env python3
-from __future__ import print_function
-"""ACL plugin - MACIP tests
-"""
-import binascii
-import ipaddress
-import random
-from socket import inet_ntop, inet_pton, AF_INET, AF_INET6
-from struct import pack, unpack
-import re
-import unittest
-from ipaddress import ip_network, IPv4Network, IPv6Network
-
-import scapy.compat
-from scapy.packet import Raw
-from scapy.layers.l2 import Ether
-from scapy.layers.inet import IP, UDP
-from scapy.layers.inet6 import IPv6
-
-from framework import VppTestCase, VppTestRunner, running_extended_tests
-from vpp_lo_interface import VppLoInterface
-from vpp_l2 import L2_PORT_TYPE
-from vpp_sub_interface import L2_VTR_OP, VppSubInterface, VppDot1QSubint, \
- VppDot1ADSubint
-from vpp_acl import AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist, \
- VppMacipAclInterface, VppMacipAcl, MacipRule
-from vpp_papi import MACAddress
-
-
-class MethodHolder(VppTestCase):
- DEBUG = False
-
- BRIDGED = True
- ROUTED = False
-
- IS_IP4 = False
- IS_IP6 = True
-
- DOT1AD = "dot1ad"
- DOT1Q = "dot1q"
- PERMIT_TAGS = True
- DENY_TAGS = False
-
- # rule types
- DENY = 0
- PERMIT = 1
-
- # ACL types
- EXACT_IP = 1
- SUBNET_IP = 2
- WILD_IP = 3
-
- EXACT_MAC = 1
- WILD_MAC = 2
- OUI_MAC = 3
-
- ACLS = []
-
- @classmethod
- def setUpClass(cls):
- """
- Perform standard class setup (defined by class method setUpClass in
- class VppTestCase) before running the test case, set test case related
- variables and configure VPP.
- """
- super(MethodHolder, cls).setUpClass()
-
- cls.pg_if_packet_sizes = [64, 512, 1518, 9018] # packet sizes
- cls.bd_id = 111
- cls.remote_hosts_count = 200
-
- try:
- # create 4 pg interfaces, 1 loopback interface
- cls.create_pg_interfaces(range(4))
- cls.create_loopback_interfaces(1)
-
- # create 2 subinterfaces
- cls.subifs = [
- VppDot1QSubint(cls, cls.pg1, 10),
- VppDot1ADSubint(cls, cls.pg2, 20, 300, 400),
- VppDot1QSubint(cls, cls.pg3, 30),
- VppDot1ADSubint(cls, cls.pg3, 40, 600, 700)]
-
- cls.subifs[0].set_vtr(L2_VTR_OP.L2_POP_1,
- inner=10, push1q=1)
- cls.subifs[1].set_vtr(L2_VTR_OP.L2_POP_2,
- outer=300, inner=400, push1q=1)
- cls.subifs[2].set_vtr(L2_VTR_OP.L2_POP_1,
- inner=30, push1q=1)
- cls.subifs[3].set_vtr(L2_VTR_OP.L2_POP_2,
- outer=600, inner=700, push1q=1)
-
- cls.interfaces = list(cls.pg_interfaces)
- cls.interfaces.extend(cls.lo_interfaces)
- cls.interfaces.extend(cls.subifs)
-
- for i in cls.interfaces:
- i.admin_up()
-
- # Create BD with MAC learning enabled and put interfaces to this BD
- cls.vapi.sw_interface_set_l2_bridge(
- rx_sw_if_index=cls.loop0.sw_if_index, bd_id=cls.bd_id,
- port_type=L2_PORT_TYPE.BVI)
- cls.vapi.sw_interface_set_l2_bridge(
- rx_sw_if_index=cls.pg0.sw_if_index, bd_id=cls.bd_id)
- cls.vapi.sw_interface_set_l2_bridge(
- rx_sw_if_index=cls.pg1.sw_if_index, bd_id=cls.bd_id)
- cls.vapi.sw_interface_set_l2_bridge(
- rx_sw_if_index=cls.subifs[0].sw_if_index, bd_id=cls.bd_id)
- cls.vapi.sw_interface_set_l2_bridge(
- rx_sw_if_index=cls.subifs[1].sw_if_index, bd_id=cls.bd_id)
-
- # Configure IPv4/6 addresses on loop interface and routed interface
- cls.loop0.config_ip4()
- cls.loop0.config_ip6()
- cls.pg2.config_ip4()
- cls.pg2.config_ip6()
- cls.pg3.config_ip4()
- cls.pg3.config_ip6()
-
- # Configure MAC address binding to IPv4 neighbors on loop0
- cls.loop0.generate_remote_hosts(cls.remote_hosts_count)
- # Modify host mac addresses to have different OUI parts
- for i in range(2, cls.remote_hosts_count + 2):
- mac = cls.loop0.remote_hosts[i-2]._mac.split(':')
- mac[2] = format(int(mac[2], 16) + i, "02x")
- cls.loop0.remote_hosts[i - 2]._mac = ":".join(mac)
-
- cls.loop0.configure_ipv4_neighbors()
- cls.loop0.configure_ipv6_neighbors()
-
- # configure MAC address on pg3
- cls.pg3.resolve_arp()
- cls.pg3.resolve_ndp()
-
- # configure MAC address on subifs
- for i in cls.subifs:
- i.config_ip4()
- i.resolve_arp()
- i.config_ip6()
-
- # configure MAC address on pg2
- cls.pg2.resolve_arp()
- cls.pg2.resolve_ndp()
-
- # Loopback BVI interface has remote hosts
- # one half of hosts are behind pg0 second behind pg1,pg2,pg3 subifs
- cls.pg0.remote_hosts = cls.loop0.remote_hosts[:100]
- cls.subifs[0].remote_hosts = cls.loop0.remote_hosts[100:125]
- cls.subifs[1].remote_hosts = cls.loop0.remote_hosts[125:150]
- cls.subifs[2].remote_hosts = cls.loop0.remote_hosts[150:175]
- cls.subifs[3].remote_hosts = cls.loop0.remote_hosts[175:]
-
- except Exception:
- super(MethodHolder, cls).tearDownClass()
- raise
-
- @classmethod
- def tearDownClass(cls):
- super(MethodHolder, cls).tearDownClass()
-
- def setUp(self):
- super(MethodHolder, self).setUp()
- self.reset_packet_infos()
-
- def show_commands_at_teardown(self):
- self.logger.info(self.vapi.ppcli("show interface address"))
- self.logger.info(self.vapi.ppcli("show hardware"))
- self.logger.info(self.vapi.ppcli("sh acl-plugin macip acl"))
- self.logger.info(self.vapi.ppcli("sh acl-plugin macip interface"))
- self.logger.info(self.vapi.ppcli("sh classify tables verbose"))
- self.logger.info(self.vapi.ppcli("sh acl-plugin acl"))
- self.logger.info(self.vapi.ppcli("sh acl-plugin interface"))
- self.logger.info(self.vapi.ppcli("sh acl-plugin tables"))
- # print(self.vapi.ppcli("show interface address"))
- # print(self.vapi.ppcli("show hardware"))
- # print(self.vapi.ppcli("sh acl-plugin macip interface"))
- # print(self.vapi.ppcli("sh acl-plugin macip acl"))
-
- def macip_acl_dump_debug(self):
- acls = self.vapi.macip_acl_dump()
- if self.DEBUG:
- for acl in acls:
- # print("ACL #"+str(acl.acl_index))
- for r in acl.r:
- rule = "ACTION"
- if r.is_permit == 1:
- rule = "PERMIT"
- elif r.is_permit == 0:
- rule = "DENY "
- """
- print(" IP6" if r.is_ipv6 else " IP4",
- rule,
- binascii.hexlify(r.src_mac),
- binascii.hexlify(r.src_mac_mask),
- unpack('<16B', r.src_ip_addr),
- r.src_ip_prefix_len)
- """
- return acls
-
- def create_rules(self, mac_type=EXACT_MAC, ip_type=EXACT_IP,
- acl_count=1, rules_count=None):
- acls = []
- if rules_count is None:
- rules_count = [1]
- src_mac = int("220000dead00", 16)
- for acl in range(2, (acl_count+1) * 2):
- rules = []
- host = random.choice(self.loop0.remote_hosts)
- is_ip6 = acl % 2
- ip4 = host.ip4.split('.')
- ip6 = list(unpack('<16B', inet_pton(AF_INET6, host.ip6)))
-
- if ip_type == self.EXACT_IP:
- prefix_len4 = 32
- prefix_len6 = 128
- elif ip_type == self.WILD_IP:
- ip4 = [0, 0, 0, 0]
- ip6 = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
- prefix_len4 = 0
- prefix_len6 = 0
- rules_count[int((acl / 2) - 1)] = 1
- else:
- prefix_len4 = 24
- prefix_len6 = 64
-
- if mac_type == self.EXACT_MAC:
- mask = "ff:ff:ff:ff:ff:ff"
- elif mac_type == self.WILD_MAC:
- mask = "00:00:00:00:00:00"
- elif mac_type == self.OUI_MAC:
- mask = "ff:ff:ff:00:00:00"
- else:
- mask = "ff:ff:ff:ff:ff:00"
-
- ip = ip6 if is_ip6 else ip4
- ip_len = prefix_len6 if is_ip6 else prefix_len4
-
- for i in range(0, (rules_count[int((acl / 2) - 1)])):
- src_mac += 16777217
- if mac_type == self.WILD_MAC:
- mac = "00:00:00:00:00:00"
- elif mac_type == self.OUI_MAC:
- mac = ':'.join(re.findall('..', '{:02x}'.format(
- src_mac))[:3])+":00:00:00"
- else:
- mac = ':'.join(re.findall(
- '..', '{:02x}'.format(src_mac)))
-
- if ip_type == self.EXACT_IP:
- ip4[3] = random.randint(100, 200)
- ip6[15] = random.randint(100, 200)
- elif ip_type == self.SUBNET_IP:
- ip4[2] = random.randint(100, 200)
- ip4[3] = 0
- ip6[7] = random.randint(100, 200)
- ip6[15] = 0
- ip_pack = b''
- for j in range(0, len(ip)):
- ip_pack += pack('<B', int(ip[j]))
-
- rule = MacipRule(is_permit=self.PERMIT,
- src_prefix=ip_network((ip_pack, ip_len)),
- src_mac=MACAddress(mac).packed,
- src_mac_mask=MACAddress(mask).packed)
- rules.append(rule)
- if ip_type == self.WILD_IP:
- break
-
- acls.append(rules)
- src_mac += 1099511627776
- return acls
-
- def apply_macip_rules(self, acls):
- macip_acls = []
- for acl in acls:
- macip_acl = VppMacipAcl(self, rules=acl)
- macip_acl.add_vpp_config()
- macip_acls.append(macip_acl)
- return macip_acls
-
- def verify_macip_acls(self, acl_count, rules_count, expected_count=2):
- reply = self.macip_acl_dump_debug()
- for acl in range(2, (acl_count+1) * 2):
- self.assertEqual(reply[acl - 2].count, rules_count[acl//2-1])
-
- self.vapi.macip_acl_interface_get()
-
- self.vapi.macip_acl_interface_add_del(sw_if_index=0, acl_index=0)
- self.vapi.macip_acl_interface_add_del(sw_if_index=1, acl_index=1)
-
- reply = self.vapi.macip_acl_interface_get()
- self.assertEqual(reply.count, expected_count)
-
- def create_stream(self, mac_type, ip_type, packet_count,
- src_if, dst_if, traffic, is_ip6, tags=PERMIT_TAGS):
- # exact MAC and exact IP
- # exact MAC and subnet of IPs
- # exact MAC and wildcard IP
- # wildcard MAC and exact IP
- # wildcard MAC and subnet of IPs
- # wildcard MAC and wildcard IP
- # OUI restricted MAC and exact IP
- # OUI restricted MAC and subnet of IPs
- # OUI restricted MAC and wildcard IP
-
- packets = []
- macip_rules = []
- acl_rules = []
- ip_permit = ""
- mac_permit = ""
- dst_mac = ""
- mac_rule = "00:00:00:00:00:00"
- mac_mask = "00:00:00:00:00:00"
- for p in range(0, packet_count):
- remote_dst_index = p % len(dst_if.remote_hosts)
- remote_dst_host = dst_if.remote_hosts[remote_dst_index]
-
- dst_port = 1234 + p
- src_port = 4321 + p
- is_permit = self.PERMIT if p % 3 == 0 else self.DENY
- denyMAC = True if not is_permit and p % 3 == 1 else False
- denyIP = True if not is_permit and p % 3 == 2 else False
- if not is_permit and ip_type == self.WILD_IP:
- denyMAC = True
- if not is_permit and mac_type == self.WILD_MAC:
- denyIP = True
-
- if traffic == self.BRIDGED:
- if is_permit:
- src_mac = remote_dst_host._mac
- dst_mac = 'de:ad:00:00:00:00'
- src_ip4 = remote_dst_host.ip4
- dst_ip4 = src_if.remote_ip4
- src_ip6 = remote_dst_host.ip6
- dst_ip6 = src_if.remote_ip6
- ip_permit = src_ip6 if is_ip6 else src_ip4
- mac_permit = src_mac
- if denyMAC:
- mac = src_mac.split(':')
- mac[0] = format(int(mac[0], 16)+1, "02x")
- src_mac = ":".join(mac)
- if is_ip6:
- src_ip6 = ip_permit
- else:
- src_ip4 = ip_permit
- if denyIP:
- if ip_type != self.WILD_IP:
- src_mac = mac_permit
- src_ip4 = remote_dst_host.ip4
- dst_ip4 = src_if.remote_ip4
- src_ip6 = remote_dst_host.ip6
- dst_ip6 = src_if.remote_ip6
- else:
- if is_permit:
- src_mac = remote_dst_host._mac
- dst_mac = src_if.local_mac
- src_ip4 = src_if.remote_ip4
- dst_ip4 = remote_dst_host.ip4
- src_ip6 = src_if.remote_ip6
- dst_ip6 = remote_dst_host.ip6
- ip_permit = src_ip6 if is_ip6 else src_ip4
- mac_permit = src_mac
- if denyMAC:
- mac = src_mac.split(':')
- mac[0] = format(int(mac[0], 16) + 1, "02x")
- src_mac = ":".join(mac)
- if is_ip6:
- src_ip6 = ip_permit
- else:
- src_ip4 = ip_permit
- if denyIP:
- src_mac = remote_dst_host._mac
- if ip_type != self.WILD_IP:
- src_mac = mac_permit
- src_ip4 = remote_dst_host.ip4
- dst_ip4 = src_if.remote_ip4
- src_ip6 = remote_dst_host.ip6
- dst_ip6 = src_if.remote_ip6
-
- if is_permit:
- info = self.create_packet_info(src_if, dst_if)
- payload = self.info_to_payload(info)
- else:
- payload = "to be blocked"
-
- if mac_type == self.WILD_MAC:
- mac = src_mac.split(':')
- for i in range(1, 5):
- mac[i] = format(random.randint(0, 255), "02x")
- src_mac = ":".join(mac)
-
- # create packet
- packet = Ether(src=src_mac, dst=dst_mac)
- ip_rule = src_ip6 if is_ip6 else src_ip4
- if is_ip6:
- if ip_type != self.EXACT_IP:
- sub_ip = list(unpack('<16B', inet_pton(AF_INET6, ip_rule)))
- if ip_type == self.WILD_IP:
- sub_ip[0] = random.randint(240, 254)
- sub_ip[1] = random.randint(230, 239)
- sub_ip[14] = random.randint(100, 199)
- sub_ip[15] = random.randint(200, 255)
- elif ip_type == self.SUBNET_IP:
- if denyIP:
- sub_ip[2] = int(sub_ip[2]) + 1
- sub_ip[14] = random.randint(100, 199)
- sub_ip[15] = random.randint(200, 255)
- packed_src_ip6 = b''.join(
- [scapy.compat.chb(x) for x in sub_ip])
- src_ip6 = inet_ntop(AF_INET6, packed_src_ip6)
- packet /= IPv6(src=src_ip6, dst=dst_ip6)
- else:
- if ip_type != self.EXACT_IP:
- sub_ip = ip_rule.split('.')
- if ip_type == self.WILD_IP:
- sub_ip[0] = random.randint(1, 49)
- sub_ip[1] = random.randint(50, 99)
- sub_ip[2] = random.randint(100, 199)
- sub_ip[3] = random.randint(200, 255)
- elif ip_type == self.SUBNET_IP:
- if denyIP:
- sub_ip[1] = int(sub_ip[1])+1
- sub_ip[2] = random.randint(100, 199)
- sub_ip[3] = random.randint(200, 255)
- src_ip4 = '.'.join(['{!s}'.format(x) for x in sub_ip])
- packet /= IP(src=src_ip4, dst=dst_ip4, frag=0, flags=0)
-
- packet /= UDP(sport=src_port, dport=dst_port)/Raw(payload)
-
- packet[Raw].load += b" mac:%s" % src_mac.encode('utf-8')
-
- size = self.pg_if_packet_sizes[p % len(self.pg_if_packet_sizes)]
- if isinstance(src_if, VppSubInterface):
- size = size + 4
- if isinstance(src_if, VppDot1QSubint):
- if src_if is self.subifs[0]:
- if tags == self.PERMIT_TAGS:
- packet = src_if.add_dot1q_layer(packet, 10)
- else:
- packet = src_if.add_dot1q_layer(packet, 11)
- else:
- if tags == self.PERMIT_TAGS:
- packet = src_if.add_dot1q_layer(packet, 30)
- else:
- packet = src_if.add_dot1q_layer(packet, 33)
- elif isinstance(src_if, VppDot1ADSubint):
- if src_if is self.subifs[1]:
- if tags == self.PERMIT_TAGS:
- packet = src_if.add_dot1ad_layer(packet, 300, 400)
- else:
- packet = src_if.add_dot1ad_layer(packet, 333, 444)
- else:
- if tags == self.PERMIT_TAGS:
- packet = src_if.add_dot1ad_layer(packet, 600, 700)
- else:
- packet = src_if.add_dot1ad_layer(packet, 666, 777)
- self.extend_packet(packet, size)
- packets.append(packet)
-
- # create suitable MACIP rule
- if mac_type == self.EXACT_MAC:
- mac_rule = src_mac
- mac_mask = "ff:ff:ff:ff:ff:ff"
- elif mac_type == self.WILD_MAC:
- mac_rule = "00:00:00:00:00:00"
- mac_mask = "00:00:00:00:00:00"
- elif mac_type == self.OUI_MAC:
- mac = src_mac.split(':')
- mac[3] = mac[4] = mac[5] = '00'
- mac_rule = ":".join(mac)
- mac_mask = "ff:ff:ff:00:00:00"
-
- if is_ip6:
- if ip_type == self.WILD_IP:
- ip = "0::0"
- else:
- ip = src_ip6
- if ip_type == self.SUBNET_IP:
- sub_ip = list(unpack('<16B', inet_pton(AF_INET6, ip)))
- for i in range(8, 16):
- sub_ip[i] = 0
- packed_ip = b''.join(
- [scapy.compat.chb(x) for x in sub_ip])
- ip = inet_ntop(AF_INET6, packed_ip)
- else:
- if ip_type == self.WILD_IP:
- ip = "0.0.0.0"
- else:
- ip = src_ip4
- if ip_type == self.SUBNET_IP:
- sub_ip = ip.split('.')
- sub_ip[2] = sub_ip[3] = '0'
- ip = ".".join(sub_ip)
-
- prefix_len = 128 if is_ip6 else 32
- if ip_type == self.WILD_IP:
- prefix_len = 0
- elif ip_type == self.SUBNET_IP:
- prefix_len = 64 if is_ip6 else 16
- ip_rule = inet_pton(AF_INET6 if is_ip6 else AF_INET, ip)
-
- # create suitable ACL rule
- if is_permit:
- rule_l4_sport = packet[UDP].sport
- rule_l4_dport = packet[UDP].dport
- rule_family = AF_INET6 if packet.haslayer(IPv6) else AF_INET
- rule_prefix_len = 128 if packet.haslayer(IPv6) else 32
- rule_l3_layer = IPv6 if packet.haslayer(IPv6) else IP
- if packet.haslayer(IPv6):
- rule_l4_proto = packet[UDP].overload_fields[IPv6]['nh']
- else:
- rule_l4_proto = packet[IP].proto
-
- src_network = ip_network(
- (packet[rule_l3_layer].src, rule_prefix_len))
- dst_network = ip_network(
- (packet[rule_l3_layer].dst, rule_prefix_len))
- acl_rule = AclRule(is_permit=is_permit, proto=rule_l4_proto,
- src_prefix=src_network,
- dst_prefix=dst_network,
- sport_from=rule_l4_sport,
- sport_to=rule_l4_sport,
- dport_from=rule_l4_dport,
- dport_to=rule_l4_dport)
- acl_rules.append(acl_rule)
-
- if mac_type == self.WILD_MAC and ip_type == self.WILD_IP and p > 0:
- continue
-
- if is_permit:
- macip_rule = MacipRule(
- is_permit=is_permit,
- src_prefix=ip_network(
- (ip_rule, prefix_len)),
- src_mac=MACAddress(mac_rule).packed,
- src_mac_mask=MACAddress(mac_mask).packed)
- macip_rules.append(macip_rule)
-
- # deny all other packets
- if not (mac_type == self.WILD_MAC and ip_type == self.WILD_IP):
- network = IPv6Network((0, 0)) if is_ip6 else IPv4Network((0, 0))
- macip_rule = MacipRule(
- is_permit=0,
- src_prefix=network,
- src_mac=MACAddress("00:00:00:00:00:00").packed,
- src_mac_mask=MACAddress("00:00:00:00:00:00").packed)
- macip_rules.append(macip_rule)
-
- network = IPv6Network((0, 0)) if is_ip6 else IPv4Network((0, 0))
- acl_rule = AclRule(is_permit=0, src_prefix=network, dst_prefix=network,
- sport_from=0, sport_to=0, dport_from=0, dport_to=0)
- acl_rules.append(acl_rule)
- return {'stream': packets,
- 'macip_rules': macip_rules,
- 'acl_rules': acl_rules}
-
- def verify_capture(self, stream, capture, is_ip6):
- """
- :param stream:
- :param capture:
- :param is_ip6:
- :return:
- """
- # p_l3 = IPv6 if is_ip6 else IP
- # if self.DEBUG:
- # for p in stream:
- # print(p[Ether].src, p[Ether].dst, p[p_l3].src, p[p_l3].dst)
- #
- # acls = self.macip_acl_dump_debug()
-
- # TODO : verify
- # for acl in acls:
- # for r in acl.r:
- # print(binascii.hexlify(r.src_mac), \
- # binascii.hexlify(r.src_mac_mask),\
- # unpack('<16B', r.src_ip_addr), \
- # r.src_ip_prefix_len)
- #
- # for p in capture:
- # print(p[Ether].src, p[Ether].dst, p[p_l3].src, p[p_l3].dst
- # data = p[Raw].load.split(':',1)[1])
- # print(p[p_l3].src, data)
-
- def run_traffic(self, mac_type, ip_type, traffic, is_ip6, packets,
- do_not_expected_capture=False, tags=None,
- apply_rules=True, isMACIP=True, permit_tags=PERMIT_TAGS,
- try_replace=False):
- self.reset_packet_infos()
-
- if tags is None:
- tx_if = self.pg0 if traffic == self.BRIDGED else self.pg3
- rx_if = self.pg3 if traffic == self.BRIDGED else self.pg0
- src_if = self.pg3
- dst_if = self.loop0
- else:
- if tags == self.DOT1Q:
- if traffic == self.BRIDGED:
- tx_if = self.subifs[0]
- rx_if = self.pg0
- src_if = self.subifs[0]
- dst_if = self.loop0
- else:
- tx_if = self.subifs[2]
- rx_if = self.pg0
- src_if = self.subifs[2]
- dst_if = self.loop0
- elif tags == self.DOT1AD:
- if traffic == self.BRIDGED:
- tx_if = self.subifs[1]
- rx_if = self.pg0
- src_if = self.subifs[1]
- dst_if = self.loop0
- else:
- tx_if = self.subifs[3]
- rx_if = self.pg0
- src_if = self.subifs[3]
- dst_if = self.loop0
- else:
- return
-
- test_dict = self.create_stream(mac_type, ip_type, packets,
- src_if, dst_if,
- traffic, is_ip6,
- tags=permit_tags)
-
- if apply_rules:
- if isMACIP:
- self.acl = VppMacipAcl(self, rules=test_dict['macip_rules'])
- else:
- self.acl = VppAcl(self, rules=test_dict['acl_rules'])
- self.acl.add_vpp_config()
-
- if isMACIP:
- self.acl_if = VppMacipAclInterface(
- self, sw_if_index=tx_if.sw_if_index, acls=[self.acl])
- self.acl_if.add_vpp_config()
-
- dump = self.acl_if.dump()
- self.assertTrue(dump)
- self.assertEqual(dump[0].acls[0], self.acl.acl_index)
- else:
- self.acl_if = VppAclInterface(
- self, sw_if_index=tx_if.sw_if_index, n_input=1,
- acls=[self.acl])
- self.acl_if.add_vpp_config()
- else:
- if hasattr(self, "acl_if"):
- self.acl_if.remove_vpp_config()
- if try_replace and hasattr(self, "acl"):
- if isMACIP:
- self.acl.modify_vpp_config(test_dict['macip_rules'])
- else:
- self.acl.modify_vpp_config(test_dict['acl_rules'])
-
- if not isinstance(src_if, VppSubInterface):
- tx_if.add_stream(test_dict['stream'])
- else:
- tx_if.parent.add_stream(test_dict['stream'])
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
-
- if do_not_expected_capture:
- rx_if.get_capture(0)
- else:
- if traffic == self.BRIDGED and mac_type == self.WILD_MAC and \
- ip_type == self.WILD_IP:
- capture = rx_if.get_capture(packets)
- else:
- capture = rx_if.get_capture(
- self.get_packet_count_for_if_idx(dst_if.sw_if_index))
- self.verify_capture(test_dict['stream'], capture, is_ip6)
- if not isMACIP:
- if hasattr(self, "acl_if"):
- self.acl_if.remove_vpp_config()
- if hasattr(self, "acl"):
- self.acl.remove_vpp_config()
-
- def run_test_acls(self, mac_type, ip_type, acl_count,
- rules_count, traffic=None, ip=None):
- self.apply_macip_rules(self.create_rules(mac_type, ip_type, acl_count,
- rules_count))
- self.verify_macip_acls(acl_count, rules_count)
-
- if traffic is not None:
- self.run_traffic(self.EXACT_MAC, self.EXACT_IP, traffic, ip, 9)
-
-
-class TestMACIP_IP4(MethodHolder):
- """MACIP with IP4 traffic"""
-
- @classmethod
- def setUpClass(cls):
- super(TestMACIP_IP4, cls).setUpClass()
-
- @classmethod
- def tearDownClass(cls):
- super(TestMACIP_IP4, cls).tearDownClass()
-
- def test_acl_bridged_ip4_exactMAC_exactIP(self):
- """ IP4 MACIP exactMAC|exactIP ACL bridged traffic
- """
- self.run_traffic(self.EXACT_MAC, self.EXACT_IP,
- self.BRIDGED, self.IS_IP4, 9)
-
- def test_acl_bridged_ip4_exactMAC_subnetIP(self):
- """ IP4 MACIP exactMAC|subnetIP ACL bridged traffic
- """
-
- self.run_traffic(self.EXACT_MAC, self.SUBNET_IP,
- self.BRIDGED, self.IS_IP4, 9)
-
- def test_acl_bridged_ip4_exactMAC_wildIP(self):
- """ IP4 MACIP exactMAC|wildIP ACL bridged traffic
- """
-
- self.run_traffic(self.EXACT_MAC, self.WILD_IP,
- self.BRIDGED, self.IS_IP4, 9)
-
- def test_acl_bridged_ip4_ouiMAC_exactIP(self):
- """ IP4 MACIP ouiMAC|exactIP ACL bridged traffic
- """
-
- self.run_traffic(self.OUI_MAC, self.EXACT_IP,
- self.BRIDGED, self.IS_IP4, 3)
-
- def test_acl_bridged_ip4_ouiMAC_subnetIP(self):
- """ IP4 MACIP ouiMAC|subnetIP ACL bridged traffic
- """
-
- self.run_traffic(self.OUI_MAC, self.SUBNET_IP,
- self.BRIDGED, self.IS_IP4, 9)
-
- def test_acl_bridged_ip4_ouiMAC_wildIP(self):
- """ IP4 MACIP ouiMAC|wildIP ACL bridged traffic
- """
-
- self.run_traffic(self.OUI_MAC, self.WILD_IP,
- self.BRIDGED, self.IS_IP4, 9)
-
- def test_ac_bridgedl_ip4_wildMAC_exactIP(self):
- """ IP4 MACIP wildcardMAC|exactIP ACL bridged traffic
- """
-
- self.run_traffic(self.WILD_MAC, self.EXACT_IP,
- self.BRIDGED, self.IS_IP4, 9)
-
- def test_acl_bridged_ip4_wildMAC_subnetIP(self):
- """ IP4 MACIP wildcardMAC|subnetIP ACL bridged traffic
- """
-
- self.run_traffic(self.WILD_MAC, self.SUBNET_IP,
- self.BRIDGED, self.IS_IP4, 9)
-
- def test_acl_bridged_ip4_wildMAC_wildIP(self):
- """ IP4 MACIP wildcardMAC|wildIP ACL bridged traffic
- """
-
- self.run_traffic(self.WILD_MAC, self.WILD_IP,
- self.BRIDGED, self.IS_IP4, 9)
-
- def test_acl_routed_ip4_exactMAC_exactIP(self):
- """ IP4 MACIP exactMAC|exactIP ACL routed traffic
- """
- self.run_traffic(self.EXACT_MAC, self.EXACT_IP,
- self.ROUTED, self.IS_IP4, 9)
-
- def test_acl_routed_ip4_exactMAC_subnetIP(self):
- """ IP4 MACIP exactMAC|subnetIP ACL routed traffic
- """
- self.run_traffic(self.EXACT_MAC, self.SUBNET_IP,
- self.ROUTED, self.IS_IP4, 9)
-
- def test_acl_routed_ip4_exactMAC_wildIP(self):
- """ IP4 MACIP exactMAC|wildIP ACL routed traffic
- """
- self.run_traffic(self.EXACT_MAC, self.WILD_IP,
- self.ROUTED, self.IS_IP4, 9)
-
- def test_acl_routed_ip4_ouiMAC_exactIP(self):
- """ IP4 MACIP ouiMAC|exactIP ACL routed traffic
- """
-
- self.run_traffic(self.OUI_MAC, self.EXACT_IP,
- self.ROUTED, self.IS_IP4, 9)
-
- def test_acl_routed_ip4_ouiMAC_subnetIP(self):
- """ IP4 MACIP ouiMAC|subnetIP ACL routed traffic
- """
-
- self.run_traffic(self.OUI_MAC, self.SUBNET_IP,
- self.ROUTED, self.IS_IP4, 9)
-
- def test_acl_routed_ip4_ouiMAC_wildIP(self):
- """ IP4 MACIP ouiMAC|wildIP ACL routed traffic
- """
-
- self.run_traffic(self.OUI_MAC, self.WILD_IP,
- self.ROUTED, self.IS_IP4, 9)
-
- def test_acl_routed_ip4_wildMAC_exactIP(self):
- """ IP4 MACIP wildcardMAC|exactIP ACL routed traffic
- """
-
- self.run_traffic(self.WILD_MAC, self.EXACT_IP,
- self.ROUTED, self.IS_IP4, 9)
-
- def test_acl_routed_ip4_wildMAC_subnetIP(self):
- """ IP4 MACIP wildcardMAC|subnetIP ACL routed traffic
- """
-
- self.run_traffic(self.WILD_MAC, self.SUBNET_IP,
- self.ROUTED, self.IS_IP4, 9)
-
- def test_acl_routed_ip4_wildMAC_wildIP(self):
- """ IP4 MACIP wildcardMAC|wildIP ACL
- """
-
- self.run_traffic(self.WILD_MAC, self.WILD_IP,
- self.ROUTED, self.IS_IP4, 9)
-
- def test_acl_replace_traffic_ip4(self):
- """ MACIP replace ACL with IP4 traffic
- """
- self.run_traffic(self.OUI_MAC, self.SUBNET_IP,
- self.BRIDGED, self.IS_IP4, 9, try_replace=True)
- self.run_traffic(self.EXACT_MAC, self.EXACT_IP,
- self.BRIDGED, self.IS_IP4, 9, try_replace=True)
-
-
-class TestMACIP_IP6(MethodHolder):
- """MACIP with IP6 traffic"""
-
- @classmethod
- def setUpClass(cls):
- super(TestMACIP_IP6, cls).setUpClass()
-
- @classmethod
- def tearDownClass(cls):
- super(TestMACIP_IP6, cls).tearDownClass()
-
- def test_acl_bridged_ip6_exactMAC_exactIP(self):
- """ IP6 MACIP exactMAC|exactIP ACL bridged traffic
- """
-
- self.run_traffic(self.EXACT_MAC, self.EXACT_IP,
- self.BRIDGED, self.IS_IP6, 9)
-
- def test_acl_bridged_ip6_exactMAC_subnetIP(self):
- """ IP6 MACIP exactMAC|subnetIP ACL bridged traffic
- """
-
- self.run_traffic(self.EXACT_MAC, self.SUBNET_IP,
- self.BRIDGED, self.IS_IP6, 9)
-
- def test_acl_bridged_ip6_exactMAC_wildIP(self):
- """ IP6 MACIP exactMAC|wildIP ACL bridged traffic
- """
-
- self.run_traffic(self.EXACT_MAC, self.WILD_IP,
- self.BRIDGED, self.IS_IP6, 9)
-
- def test_acl_bridged_ip6_ouiMAC_exactIP(self):
- """ IP6 MACIP oui_MAC|exactIP ACL bridged traffic
- """
-
- self.run_traffic(self.OUI_MAC, self.EXACT_IP,
- self.BRIDGED, self.IS_IP6, 9)
-
- def test_acl_bridged_ip6_ouiMAC_subnetIP(self):
- """ IP6 MACIP ouiMAC|subnetIP ACL bridged traffic
- """
-
- self.run_traffic(self.OUI_MAC, self.SUBNET_IP,
- self.BRIDGED, self.IS_IP6, 9)
-
- def test_acl_bridged_ip6_ouiMAC_wildIP(self):
- """ IP6 MACIP ouiMAC|wildIP ACL bridged traffic
- """
-
- self.run_traffic(self.OUI_MAC, self.WILD_IP,
- self.BRIDGED, self.IS_IP6, 9)
-
- def test_acl_bridged_ip6_wildMAC_exactIP(self):
- """ IP6 MACIP wildcardMAC|exactIP ACL bridged traffic
- """
-
- self.run_traffic(self.WILD_MAC, self.EXACT_IP,
- self.BRIDGED, self.IS_IP6, 9)
-
- def test_acl_bridged_ip6_wildMAC_subnetIP(self):
- """ IP6 MACIP wildcardMAC|subnetIP ACL bridged traffic
- """
-
- self.run_traffic(self.WILD_MAC, self.SUBNET_IP,
- self.BRIDGED, self.IS_IP6, 9)
-
- def test_acl_bridged_ip6_wildMAC_wildIP(self):
- """ IP6 MACIP wildcardMAC|wildIP ACL bridged traffic
- """
-
- self.run_traffic(self.WILD_MAC, self.WILD_IP,
- self.BRIDGED, self.IS_IP6, 9)
-
- def test_acl_routed_ip6_exactMAC_exactIP(self):
- """ IP6 MACIP exactMAC|exactIP ACL routed traffic
- """
-
- self.run_traffic(self.EXACT_MAC, self.EXACT_IP,
- self.ROUTED, self.IS_IP6, 9)
-
- def test_acl_routed_ip6_exactMAC_subnetIP(self):
- """ IP6 MACIP exactMAC|subnetIP ACL routed traffic
- """
-
- self.run_traffic(self.EXACT_MAC, self.SUBNET_IP,
- self.ROUTED, self.IS_IP6, 9)
-
- def test_acl_routed_ip6_exactMAC_wildIP(self):
- """ IP6 MACIP exactMAC|wildIP ACL routed traffic
- """
-
- self.run_traffic(self.EXACT_MAC, self.WILD_IP,
- self.ROUTED, self.IS_IP6, 9)
-
- def test_acl_routed_ip6_ouiMAC_exactIP(self):
- """ IP6 MACIP ouiMAC|exactIP ACL routed traffic
- """
-
- self.run_traffic(self.OUI_MAC, self.EXACT_IP,
- self.ROUTED, self.IS_IP6, 9)
-
- def test_acl_routed_ip6_ouiMAC_subnetIP(self):
- """ IP6 MACIP ouiMAC|subnetIP ACL routed traffic
- """
-
- self.run_traffic(self.OUI_MAC, self.SUBNET_IP,
- self.ROUTED, self.IS_IP6, 9)
-
- def test_acl_routed_ip6_ouiMAC_wildIP(self):
- """ IP6 MACIP ouiMAC|wildIP ACL routed traffic
- """
-
- self.run_traffic(self.OUI_MAC, self.WILD_IP,
- self.ROUTED, self.IS_IP6, 9)
-
- def test_acl_routed_ip6_wildMAC_exactIP(self):
- """ IP6 MACIP wildcardMAC|exactIP ACL routed traffic
- """
-
- self.run_traffic(self.WILD_MAC, self.EXACT_IP,
- self.ROUTED, self.IS_IP6, 9)
-
- def test_acl_routed_ip6_wildMAC_subnetIP(self):
- """ IP6 MACIP wildcardMAC|subnetIP ACL routed traffic
- """
-
- self.run_traffic(self.WILD_MAC, self.SUBNET_IP,
- self.ROUTED, self.IS_IP6, 9)
-
- def test_acl_routed_ip6_wildMAC_wildIP(self):
- """ IP6 MACIP wildcardMAC|wildIP ACL
- """
-
- self.run_traffic(self.WILD_MAC, self.WILD_IP,
- self.ROUTED, self.IS_IP6, 9)
-
- def test_acl_replace_traffic_ip6(self):
- """ MACIP replace ACL with IP6 traffic
- """
- self.run_traffic(self.OUI_MAC, self.SUBNET_IP,
- self.BRIDGED, self.IS_IP6, 9, try_replace=True)
- self.run_traffic(self.EXACT_MAC, self.EXACT_IP,
- self.BRIDGED, self.IS_IP6, 9, try_replace=True)
-
-
-class TestMACIP(MethodHolder):
- """MACIP Tests"""
-
- @classmethod
- def setUpClass(cls):
- super(TestMACIP, cls).setUpClass()
-
- @classmethod
- def tearDownClass(cls):
- super(TestMACIP, cls).tearDownClass()
-
- def test_acl_1_2(self):
- """ MACIP ACL with 2 entries
- """
-
- self.run_test_acls(self.EXACT_MAC, self.WILD_IP, 1, [2])
-
- def test_acl_1_5(self):
- """ MACIP ACL with 5 entries
- """
-
- self.run_test_acls(self.EXACT_MAC, self.SUBNET_IP, 1, [5])
-
- def test_acl_1_10(self):
- """ MACIP ACL with 10 entries
- """
-
- self.run_test_acls(self.EXACT_MAC, self.EXACT_IP, 1, [10])
-
- def test_acl_1_20(self):
- """ MACIP ACL with 20 entries
- """
-
- self.run_test_acls(self.OUI_MAC, self.WILD_IP, 1, [20])
-
- def test_acl_1_50(self):
- """ MACIP ACL with 50 entries
- """
-
- self.run_test_acls(self.OUI_MAC, self.SUBNET_IP, 1, [50])
-
- def test_acl_1_100(self):
- """ MACIP ACL with 100 entries
- """
-
- self.run_test_acls(self.OUI_MAC, self.EXACT_IP, 1, [100])
-
- def test_acl_2_X(self):
- """ MACIP 2 ACLs each with 100+ entries
- """
-
- self.run_test_acls(self.OUI_MAC, self.SUBNET_IP, 2, [100, 200])
-
- def test_acl_10_X(self):
- """ MACIP 10 ACLs each with 100+ entries
- """
-
- self.run_test_acls(self.EXACT_MAC, self.EXACT_IP, 10,
- [100, 120, 140, 160, 180, 200, 210, 220, 230, 240])
-
- def test_acl_10_X_traffic_ip4(self):
- """ MACIP 10 ACLs each with 100+ entries with IP4 traffic
- """
-
- self.run_test_acls(self.EXACT_MAC, self.EXACT_IP, 10,
- [100, 120, 140, 160, 180, 200, 210, 220, 230, 240],
- self.BRIDGED, self.IS_IP4)
-
- def test_acl_10_X_traffic_ip6(self):
- """ MACIP 10 ACLs each with 100+ entries with IP6 traffic
- """
-
- self.run_test_acls(self.EXACT_MAC, self.EXACT_IP, 10,
- [100, 120, 140, 160, 180, 200, 210, 220, 230, 240],
- self.BRIDGED, self.IS_IP6)
-
- def test_acl_replace(self):
- """ MACIP replace ACL
- """
-
- r1 = self.create_rules(acl_count=3, rules_count=[2, 2, 2])
- r2 = self.create_rules(mac_type=self.OUI_MAC, ip_type=self.SUBNET_IP)
- macip_acls = self.apply_macip_rules(r1)
-
- acls_before = self.macip_acl_dump_debug()
-
- # replace acls #2, #3 with new
- macip_acls[2].modify_vpp_config(r2[0])
- macip_acls[3].modify_vpp_config(r2[1])
-
- acls_after = self.macip_acl_dump_debug()
-
- # verify changes
- self.assertEqual(len(acls_before), len(acls_after))
- for acl1, acl2 in zip(
- acls_before[:2]+acls_before[4:],
- acls_after[:2]+acls_after[4:]):
- self.assertEqual(len(acl1), len(acl2))
-
- self.assertEqual(len(acl1.r), len(acl2.r))
- for r1, r2 in zip(acl1.r, acl2.r):
- self.assertEqual(len(acl1.r), len(acl2.r))
- self.assertEqual(acl1.r, acl2.r)
- for acl1, acl2 in zip(
- acls_before[2:4],
- acls_after[2:4]):
- self.assertEqual(len(acl1), len(acl2))
-
- self.assertNotEqual(len(acl1.r), len(acl2.r))
- for r1, r2 in zip(acl1.r, acl2.r):
- self.assertNotEqual(len(acl1.r), len(acl2.r))
- self.assertNotEqual(acl1.r, acl2.r)
-
- def test_delete_intf(self):
- """ MACIP ACL delete intf with acl
- """
-
- intf_count = len(self.interfaces)+1
- intf = []
- macip_alcs = self.apply_macip_rules(
- self.create_rules(acl_count=3, rules_count=[3, 5, 4]))
-
- intf.append(VppLoInterface(self))
- intf.append(VppLoInterface(self))
-
- sw_if_index0 = intf[0].sw_if_index
- macip_acl_if0 = VppMacipAclInterface(
- self, sw_if_index=sw_if_index0, acls=[macip_alcs[1]])
- macip_acl_if0.add_vpp_config()
-
- reply = self.vapi.macip_acl_interface_get()
- self.assertEqual(reply.count, intf_count+1)
- self.assertEqual(reply.acls[sw_if_index0], 1)
-
- sw_if_index1 = intf[1].sw_if_index
- macip_acl_if1 = VppMacipAclInterface(
- self, sw_if_index=sw_if_index1, acls=[macip_alcs[0]])
- macip_acl_if1.add_vpp_config()
-
- reply = self.vapi.macip_acl_interface_get()
- self.assertEqual(reply.count, intf_count+2)
- self.assertEqual(reply.acls[sw_if_index1], 0)
-
- intf[0].remove_vpp_config()
- reply = self.vapi.macip_acl_interface_get()
- self.assertEqual(reply.count, intf_count+2)
- self.assertEqual(reply.acls[sw_if_index0], 4294967295)
- self.assertEqual(reply.acls[sw_if_index1], 0)
-
- intf.append(VppLoInterface(self))
- intf.append(VppLoInterface(self))
- sw_if_index2 = intf[2].sw_if_index
- sw_if_index3 = intf[3].sw_if_index
- macip_acl_if2 = VppMacipAclInterface(
- self, sw_if_index=sw_if_index2, acls=[macip_alcs[1]])
- macip_acl_if2.add_vpp_config()
- macip_acl_if3 = VppMacipAclInterface(
- self, sw_if_index=sw_if_index3, acls=[macip_alcs[1]])
- macip_acl_if3.add_vpp_config()
-
- reply = self.vapi.macip_acl_interface_get()
- self.assertEqual(reply.count, intf_count+3)
- self.assertEqual(reply.acls[sw_if_index1], 0)
- self.assertEqual(reply.acls[sw_if_index2], 1)
- self.assertEqual(reply.acls[sw_if_index3], 1)
- self.logger.info("MACIP ACL on multiple interfaces:")
- self.logger.info(self.vapi.ppcli("sh acl-plugin macip acl"))
- self.logger.info(self.vapi.ppcli("sh acl-plugin macip acl index 1234"))
- self.logger.info(self.vapi.ppcli("sh acl-plugin macip acl index 1"))
- self.logger.info(self.vapi.ppcli("sh acl-plugin macip acl index 0"))
- self.logger.info(self.vapi.ppcli("sh acl-plugin macip interface"))
-
- intf[2].remove_vpp_config()
- intf[1].remove_vpp_config()
-
- reply = self.vapi.macip_acl_interface_get()
- self.assertEqual(reply.count, intf_count+3)
- self.assertEqual(reply.acls[sw_if_index0], 4294967295)
- self.assertEqual(reply.acls[sw_if_index1], 4294967295)
- self.assertEqual(reply.acls[sw_if_index2], 4294967295)
- self.assertEqual(reply.acls[sw_if_index3], 1)
-
- intf[3].remove_vpp_config()
- reply = self.vapi.macip_acl_interface_get()
-
- self.assertEqual(len([x for x in reply.acls if x != 4294967295]), 0)
-
-
-class TestACL_dot1q_bridged(MethodHolder):
- """ACL on dot1q bridged subinterfaces Tests"""
-
- @classmethod
- def setUpClass(cls):
- super(TestACL_dot1q_bridged, cls).setUpClass()
-
- @classmethod
- def tearDownClass(cls):
- super(TestACL_dot1q_bridged, cls).tearDownClass()
-
- def test_acl_bridged_ip4_subif_dot1q(self):
- """ IP4 ACL SubIf Dot1Q bridged traffic"""
- self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.BRIDGED,
- self.IS_IP4, 9, tags=self.DOT1Q, isMACIP=False)
-
- def test_acl_bridged_ip6_subif_dot1q(self):
- """ IP6 ACL SubIf Dot1Q bridged traffic"""
- self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.BRIDGED,
- self.IS_IP6, 9, tags=self.DOT1Q, isMACIP=False)
-
-
-class TestACL_dot1ad_bridged(MethodHolder):
- """ACL on dot1ad bridged subinterfaces Tests"""
-
- @classmethod
- def setUpClass(cls):
- super(TestACL_dot1ad_bridged, cls).setUpClass()
-
- @classmethod
- def tearDownClass(cls):
- super(TestACL_dot1ad_bridged, cls).tearDownClass()
-
- def test_acl_bridged_ip4_subif_dot1ad(self):
- """ IP4 ACL SubIf Dot1AD bridged traffic"""
- self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.BRIDGED,
- self.IS_IP4, 9, tags=self.DOT1AD, isMACIP=False)
-
- def test_acl_bridged_ip6_subif_dot1ad(self):
- """ IP6 ACL SubIf Dot1AD bridged traffic"""
- self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.BRIDGED,
- self.IS_IP6, 9, tags=self.DOT1AD, isMACIP=False)
-
-
-class TestACL_dot1q_routed(MethodHolder):
- """ACL on dot1q routed subinterfaces Tests"""
-
- @classmethod
- def setUpClass(cls):
- super(TestACL_dot1q_routed, cls).setUpClass()
-
- @classmethod
- def tearDownClass(cls):
- super(TestACL_dot1q_routed, cls).tearDownClass()
-
- def test_acl_routed_ip4_subif_dot1q(self):
- """ IP4 ACL SubIf Dot1Q routed traffic"""
- self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.ROUTED,
- self.IS_IP4, 9, tags=self.DOT1Q, isMACIP=False)
-
- def test_acl_routed_ip6_subif_dot1q(self):
- """ IP6 ACL SubIf Dot1Q routed traffic"""
- self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.ROUTED,
- self.IS_IP6, 9, tags=self.DOT1Q, isMACIP=False)
-
- def test_acl_routed_ip4_subif_dot1q_deny_by_tags(self):
- """ IP4 ACL SubIf wrong tags Dot1Q routed traffic"""
- self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.ROUTED,
- self.IS_IP4, 9, True, tags=self.DOT1Q, isMACIP=False,
- permit_tags=self.DENY_TAGS)
-
- def test_acl_routed_ip6_subif_dot1q_deny_by_tags(self):
- """ IP6 ACL SubIf wrong tags Dot1Q routed traffic"""
- self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.ROUTED,
- self.IS_IP6, 9, True, tags=self.DOT1Q, isMACIP=False,
- permit_tags=self.DENY_TAGS)
-
-
-class TestACL_dot1ad_routed(MethodHolder):
- """ACL on dot1ad routed subinterfaces Tests"""
-
- @classmethod
- def setUpClass(cls):
- super(TestACL_dot1ad_routed, cls).setUpClass()
-
- @classmethod
- def tearDownClass(cls):
- super(TestACL_dot1ad_routed, cls).tearDownClass()
-
- def test_acl_routed_ip6_subif_dot1ad(self):
- """ IP6 ACL SubIf Dot1AD routed traffic"""
- self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.ROUTED,
- self.IS_IP6, 9, tags=self.DOT1AD, isMACIP=False)
-
- def test_acl_routed_ip4_subif_dot1ad(self):
- """ IP4 ACL SubIf Dot1AD routed traffic"""
- self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.ROUTED,
- self.IS_IP4, 9, tags=self.DOT1AD, isMACIP=False)
-
- def test_acl_routed_ip6_subif_dot1ad_deny_by_tags(self):
- """ IP6 ACL SubIf wrong tags Dot1AD routed traffic"""
- self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.ROUTED,
- self.IS_IP6, 9, True, tags=self.DOT1AD, isMACIP=False,
- permit_tags=self.DENY_TAGS)
-
- def test_acl_routed_ip4_subif_dot1ad_deny_by_tags(self):
- """ IP4 ACL SubIf wrong tags Dot1AD routed traffic"""
- self.run_traffic(self.EXACT_MAC, self.EXACT_IP, self.ROUTED,
- self.IS_IP4, 9, True, tags=self.DOT1AD, isMACIP=False,
- permit_tags=self.DENY_TAGS)
-
-
-if __name__ == '__main__':
- unittest.main(testRunner=VppTestRunner)
diff --git a/src/plugins/acl/test/test_classify_l2_acl.py b/src/plugins/acl/test/test_classify_l2_acl.py
deleted file mode 100644
index b1309881e58..00000000000
--- a/src/plugins/acl/test/test_classify_l2_acl.py
+++ /dev/null
@@ -1,608 +0,0 @@
-#!/usr/bin/env python3
-""" Classifier-based L2 ACL Test Case HLD:
-"""
-
-import unittest
-import random
-import binascii
-import socket
-
-
-from scapy.packet import Raw
-from scapy.data import ETH_P_IP
-from scapy.layers.l2 import Ether
-from scapy.layers.inet import IP, TCP, UDP, ICMP
-from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
-from scapy.layers.inet6 import IPv6ExtHdrFragment
-from framework import VppTestCase, VppTestRunner
-from util import Host, ppp
-from template_classifier import TestClassifier
-
-
-class TestClassifyAcl(TestClassifier):
- """ Classifier-based L2 input and output ACL Test Case """
-
- # traffic types
- IP = 0
- ICMP = 1
-
- # IP version
- IPRANDOM = -1
- IPV4 = 0
- IPV6 = 1
-
- # rule types
- DENY = 0
- PERMIT = 1
-
- # supported protocols
- proto = [[6, 17], [1, 58]]
- proto_map = {1: 'ICMP', 58: 'ICMPv6EchoRequest', 6: 'TCP', 17: 'UDP'}
- ICMPv4 = 0
- ICMPv6 = 1
- TCP = 0
- UDP = 1
- PROTO_ALL = 0
-
- # port ranges
- PORTS_ALL = -1
- PORTS_RANGE = 0
- PORTS_RANGE_2 = 1
- udp_sport_from = 10
- udp_sport_to = udp_sport_from + 5
- udp_dport_from = 20000
- udp_dport_to = udp_dport_from + 5000
- tcp_sport_from = 30
- tcp_sport_to = tcp_sport_from + 5
- tcp_dport_from = 40000
- tcp_dport_to = tcp_dport_from + 5000
-
- udp_sport_from_2 = 90
- udp_sport_to_2 = udp_sport_from_2 + 5
- udp_dport_from_2 = 30000
- udp_dport_to_2 = udp_dport_from_2 + 5000
- tcp_sport_from_2 = 130
- tcp_sport_to_2 = tcp_sport_from_2 + 5
- tcp_dport_from_2 = 20000
- tcp_dport_to_2 = tcp_dport_from_2 + 5000
-
- icmp4_type = 8 # echo request
- icmp4_code = 3
- icmp6_type = 128 # echo request
- icmp6_code = 3
-
- icmp4_type_2 = 8
- icmp4_code_from_2 = 5
- icmp4_code_to_2 = 20
- icmp6_type_2 = 128
- icmp6_code_from_2 = 8
- icmp6_code_to_2 = 42
-
- # Test variables
- bd_id = 1
-
- @classmethod
- def setUpClass(cls):
- """
- Perform standard class setup (defined by class method setUpClass in
- class VppTestCase) before running the test case, set test case related
- variables and configure VPP.
- """
- super(TestClassifyAcl, cls).setUpClass()
- cls.af = None
-
- try:
- # Create 2 pg interfaces
- cls.create_pg_interfaces(range(2))
-
- # Packet flows mapping pg0 -> pg1, pg2 etc.
- cls.flows = dict()
- cls.flows[cls.pg0] = [cls.pg1]
-
- # Packet sizes
- cls.pg_if_packet_sizes = [64, 512, 1518, 9018]
-
- # Create BD with MAC learning and unknown unicast flooding disabled
- # and put interfaces to this BD
- cls.vapi.bridge_domain_add_del(bd_id=cls.bd_id, uu_flood=1,
- learn=1)
- for pg_if in cls.pg_interfaces:
- cls.vapi.sw_interface_set_l2_bridge(
- rx_sw_if_index=pg_if.sw_if_index, bd_id=cls.bd_id)
-
- # Set up all interfaces
- for i in cls.pg_interfaces:
- i.admin_up()
-
- # Mapping between packet-generator index and lists of test hosts
- cls.hosts_by_pg_idx = dict()
- for pg_if in cls.pg_interfaces:
- cls.hosts_by_pg_idx[pg_if.sw_if_index] = []
-
- # Create list of deleted hosts
- cls.deleted_hosts_by_pg_idx = dict()
- for pg_if in cls.pg_interfaces:
- cls.deleted_hosts_by_pg_idx[pg_if.sw_if_index] = []
-
- # warm-up the mac address tables
- # self.warmup_test()
-
- # Holder of the active classify table key
- cls.acl_active_table = ''
-
- except Exception:
- super(TestClassifyAcl, cls).tearDownClass()
- raise
-
- @classmethod
- def tearDownClass(cls):
- super(TestClassifyAcl, cls).tearDownClass()
-
- def setUp(self):
- super(TestClassifyAcl, self).setUp()
- self.acl_tbl_idx = {}
-
- def tearDown(self):
- """
- Show various debug prints after each test.
- """
- if not self.vpp_dead:
- if self.acl_active_table == 'mac_inout':
- self.output_acl_set_interface(
- self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0)
- self.input_acl_set_interface(
- self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
- self.acl_active_table = ''
- elif self.acl_active_table == 'mac_out':
- self.output_acl_set_interface(
- self.pg1, self.acl_tbl_idx.get(self.acl_active_table), 0)
- self.acl_active_table = ''
- elif self.acl_active_table == 'mac_in':
- self.input_acl_set_interface(
- self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
- self.acl_active_table = ''
-
- super(TestClassifyAcl, self).tearDown()
-
- def create_classify_session(self, intf, table_index, match,
- hit_next_index=0xffffffff, is_add=1):
- """Create Classify Session
-
- :param VppInterface intf: Interface to apply classify session.
- :param int table_index: table index to identify classify table.
- :param str match: matched value for interested traffic.
- :param int is_add: option to configure classify session.
- - create(1) or delete(0)
- """
- mask_match, mask_match_len = self._resolve_mask_match(match)
- r = self.vapi.classify_add_del_session(
- is_add=is_add,
- table_index=table_index,
- match=mask_match,
- match_len=mask_match_len,
- hit_next_index=hit_next_index)
- self.assertIsNotNone(r, 'No response msg for add_del_session')
-
- def create_hosts(self, count, start=0):
- """
- Create required number of host MAC addresses and distribute them among
- interfaces. Create host IPv4 address for every host MAC address.
-
- :param int count: Number of hosts to create MAC/IPv4 addresses for.
- :param int start: Number to start numbering from.
- """
- n_int = len(self.pg_interfaces)
- macs_per_if = count // n_int
- i = -1
- for pg_if in self.pg_interfaces:
- i += 1
- start_nr = macs_per_if * i + start
- end_nr = count + start if i == (n_int - 1) \
- else macs_per_if * (i + 1) + start
- hosts = self.hosts_by_pg_idx[pg_if.sw_if_index]
- for j in range(start_nr, end_nr):
- host = Host(
- "00:00:00:ff:%02x:%02x" % (pg_if.sw_if_index, j),
- "172.17.1%02x.%u" % (pg_if.sw_if_index, j),
- "2017:dead:%02x::%u" % (pg_if.sw_if_index, j))
- hosts.append(host)
-
- def create_upper_layer(self, packet_index, proto, ports=0):
- p = self.proto_map[proto]
- if p == 'UDP':
- if ports == 0:
- return UDP(sport=random.randint(self.udp_sport_from,
- self.udp_sport_to),
- dport=random.randint(self.udp_dport_from,
- self.udp_dport_to))
- else:
- return UDP(sport=ports, dport=ports)
- elif p == 'TCP':
- if ports == 0:
- return TCP(sport=random.randint(self.tcp_sport_from,
- self.tcp_sport_to),
- dport=random.randint(self.tcp_dport_from,
- self.tcp_dport_to))
- else:
- return TCP(sport=ports, dport=ports)
- return ''
-
- def create_stream(self, src_if, packet_sizes, traffic_type=0, ipv6=0,
- proto=-1, ports=0, fragments=False,
- pkt_raw=True, etype=-1):
- """
- Create input packet stream for defined interface using hosts or
- deleted_hosts list.
-
- :param object src_if: Interface to create packet stream for.
- :param list packet_sizes: List of required packet sizes.
- :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
- :return: Stream of packets.
- """
- pkts = []
- if self.flows.__contains__(src_if):
- src_hosts = self.hosts_by_pg_idx[src_if.sw_if_index]
- for dst_if in self.flows[src_if]:
- dst_hosts = self.hosts_by_pg_idx[dst_if.sw_if_index]
- n_int = len(dst_hosts) * len(src_hosts)
- for i in range(0, n_int):
- dst_host = dst_hosts[i // len(src_hosts)]
- src_host = src_hosts[i % len(src_hosts)]
- pkt_info = self.create_packet_info(src_if, dst_if)
- if ipv6 == 1:
- pkt_info.ip = 1
- elif ipv6 == 0:
- pkt_info.ip = 0
- else:
- pkt_info.ip = random.choice([0, 1])
- if proto == -1:
- pkt_info.proto = random.choice(self.proto[self.IP])
- else:
- pkt_info.proto = proto
- payload = self.info_to_payload(pkt_info)
- p = Ether(dst=dst_host.mac, src=src_host.mac)
- if etype > 0:
- p = Ether(dst=dst_host.mac,
- src=src_host.mac,
- type=etype)
- if pkt_info.ip:
- p /= IPv6(dst=dst_host.ip6, src=src_host.ip6)
- if fragments:
- p /= IPv6ExtHdrFragment(offset=64, m=1)
- else:
- if fragments:
- p /= IP(src=src_host.ip4, dst=dst_host.ip4,
- flags=1, frag=64)
- else:
- p /= IP(src=src_host.ip4, dst=dst_host.ip4)
- if traffic_type == self.ICMP:
- if pkt_info.ip:
- p /= ICMPv6EchoRequest(type=self.icmp6_type,
- code=self.icmp6_code)
- else:
- p /= ICMP(type=self.icmp4_type,
- code=self.icmp4_code)
- else:
- p /= self.create_upper_layer(i, pkt_info.proto, ports)
- if pkt_raw:
- p /= Raw(payload)
- pkt_info.data = p.copy()
- if pkt_raw:
- size = random.choice(packet_sizes)
- self.extend_packet(p, size)
- pkts.append(p)
- return pkts
-
- def verify_capture(self, pg_if, capture,
- traffic_type=0, ip_type=0, etype=-1):
- """
- Verify captured input packet stream for defined interface.
-
- :param object pg_if: Interface to verify captured packet stream for.
- :param list capture: Captured packet stream.
- :param traffic_type: 1: ICMP packet, 2: IPv6 with EH, 0: otherwise.
- """
- last_info = dict()
- for i in self.pg_interfaces:
- last_info[i.sw_if_index] = None
- dst_sw_if_index = pg_if.sw_if_index
- for packet in capture:
- if etype > 0:
- if packet[Ether].type != etype:
- self.logger.error(ppp("Unexpected ethertype in packet:",
- packet))
- else:
- continue
- try:
- # Raw data for ICMPv6 are stored in ICMPv6EchoRequest.data
- if traffic_type == self.ICMP and ip_type == self.IPV6:
- payload_info = self.payload_to_info(
- packet[ICMPv6EchoRequest].data)
- payload = packet[ICMPv6EchoRequest]
- else:
- payload_info = self.payload_to_info(packet[Raw])
- payload = packet[self.proto_map[payload_info.proto]]
- except:
- self.logger.error(ppp("Unexpected or invalid packet "
- "(outside network):", packet))
- raise
-
- if ip_type != 0:
- self.assertEqual(payload_info.ip, ip_type)
- if traffic_type == self.ICMP:
- try:
- if payload_info.ip == 0:
- self.assertEqual(payload.type, self.icmp4_type)
- self.assertEqual(payload.code, self.icmp4_code)
- else:
- self.assertEqual(payload.type, self.icmp6_type)
- self.assertEqual(payload.code, self.icmp6_code)
- except:
- self.logger.error(ppp("Unexpected or invalid packet "
- "(outside network):", packet))
- raise
- else:
- try:
- ip_version = IPv6 if payload_info.ip == 1 else IP
-
- ip = packet[ip_version]
- packet_index = payload_info.index
-
- self.assertEqual(payload_info.dst, dst_sw_if_index)
- self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
- (pg_if.name, payload_info.src,
- packet_index))
- next_info = self.get_next_packet_info_for_interface2(
- payload_info.src, dst_sw_if_index,
- last_info[payload_info.src])
- last_info[payload_info.src] = next_info
- self.assertTrue(next_info is not None)
- self.assertEqual(packet_index, next_info.index)
- saved_packet = next_info.data
- # Check standard fields
- self.assertEqual(ip.src, saved_packet[ip_version].src)
- self.assertEqual(ip.dst, saved_packet[ip_version].dst)
- p = self.proto_map[payload_info.proto]
- if p == 'TCP':
- tcp = packet[TCP]
- self.assertEqual(tcp.sport, saved_packet[
- TCP].sport)
- self.assertEqual(tcp.dport, saved_packet[
- TCP].dport)
- elif p == 'UDP':
- udp = packet[UDP]
- self.assertEqual(udp.sport, saved_packet[
- UDP].sport)
- self.assertEqual(udp.dport, saved_packet[
- UDP].dport)
- except:
- self.logger.error(ppp("Unexpected or invalid packet:",
- packet))
- raise
- for i in self.pg_interfaces:
- remaining_packet = self.get_next_packet_info_for_interface2(
- i, dst_sw_if_index, last_info[i.sw_if_index])
- self.assertTrue(
- remaining_packet is None,
- "Port %u: Packet expected from source %u didn't arrive" %
- (dst_sw_if_index, i.sw_if_index))
-
- def run_traffic_no_check(self):
- # Test
- # Create incoming packet streams for packet-generator interfaces
- for i in self.pg_interfaces:
- if self.flows.__contains__(i):
- pkts = self.create_stream(i, self.pg_if_packet_sizes)
- if len(pkts) > 0:
- i.add_stream(pkts)
-
- # Enable packet capture and start packet sending
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
-
- def run_verify_test(self, traffic_type=0, ip_type=0, proto=-1, ports=0,
- frags=False, pkt_raw=True, etype=-1):
- # Test
- # Create incoming packet streams for packet-generator interfaces
- pkts_cnt = 0
- for i in self.pg_interfaces:
- if self.flows.__contains__(i):
- pkts = self.create_stream(i, self.pg_if_packet_sizes,
- traffic_type, ip_type, proto, ports,
- frags, pkt_raw, etype)
- if len(pkts) > 0:
- i.add_stream(pkts)
- pkts_cnt += len(pkts)
-
- # Enable packet capture and start packet sendingself.IPV
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
-
- # Verify
- # Verify outgoing packet streams per packet-generator interface
- for src_if in self.pg_interfaces:
- if self.flows.__contains__(src_if):
- for dst_if in self.flows[src_if]:
- capture = dst_if.get_capture(pkts_cnt)
- self.logger.info("Verifying capture on interface %s" %
- dst_if.name)
- self.verify_capture(dst_if, capture,
- traffic_type, ip_type, etype)
-
- def run_verify_negat_test(self, traffic_type=0, ip_type=0, proto=-1,
- ports=0, frags=False, etype=-1):
- # Test
- self.reset_packet_infos()
- for i in self.pg_interfaces:
- if self.flows.__contains__(i):
- pkts = self.create_stream(i, self.pg_if_packet_sizes,
- traffic_type, ip_type, proto, ports,
- frags, True, etype)
- if len(pkts) > 0:
- i.add_stream(pkts)
-
- # Enable packet capture and start packet sending
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
-
- # Verify
- # Verify outgoing packet streams per packet-generator interface
- for src_if in self.pg_interfaces:
- if self.flows.__contains__(src_if):
- for dst_if in self.flows[src_if]:
- self.logger.info("Verifying capture on interface %s" %
- dst_if.name)
- capture = dst_if.get_capture(0)
- self.assertEqual(len(capture), 0)
-
- def build_classify_table(self, src_mac='', dst_mac='', ether_type='',
- etype='', key='mac', hit_next_index=0xffffffff):
- # Basic ACL testing
- a_mask = self.build_mac_mask(src_mac=src_mac, dst_mac=dst_mac,
- ether_type=ether_type)
- self.create_classify_table(key, a_mask)
- for host in self.hosts_by_pg_idx[self.pg0.sw_if_index]:
- s_mac = host.mac if src_mac else ''
- if dst_mac:
- for dst_if in self.flows[self.pg0]:
- for dst_host in self.hosts_by_pg_idx[dst_if.sw_if_index]:
- self.create_classify_session(
- self.pg0, self.acl_tbl_idx.get(key),
- self.build_mac_match(src_mac=s_mac,
- dst_mac=dst_host.mac,
- ether_type=etype),
- hit_next_index=hit_next_index)
- else:
- self.create_classify_session(
- self.pg0, self.acl_tbl_idx.get(key),
- self.build_mac_match(src_mac=s_mac, dst_mac='',
- ether_type=etype),
- hit_next_index=hit_next_index)
-
- def test_0000_warmup_test(self):
- """ Learn the MAC addresses
- """
- self.create_hosts(2)
- self.run_traffic_no_check()
-
- def test_0010_inacl_permit_src_mac(self):
- """ Input L2 ACL test - permit source MAC
-
- Test scenario for basic IP ACL with source IP
- - Create IPv4 stream for pg0 -> pg1 interface.
- - Create ACL with source MAC address.
- - Send and verify received packets on pg1 interface.
- """
- key = 'mac_in'
- self.build_classify_table(src_mac='ffffffffffff', key=key)
- self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
- self.acl_active_table = key
- self.run_verify_test(self.IP, self.IPV4, -1)
-
- def test_0011_inacl_permit_dst_mac(self):
- """ Input L2 ACL test - permit destination MAC
-
- Test scenario for basic IP ACL with source IP
- - Create IPv4 stream for pg0 -> pg1 interface.
- - Create ACL with destination MAC address.
- - Send and verify received packets on pg1 interface.
- """
- key = 'mac_in'
- self.build_classify_table(dst_mac='ffffffffffff', key=key)
- self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
- self.acl_active_table = key
- self.run_verify_test(self.IP, self.IPV4, -1)
-
- def test_0012_inacl_permit_src_dst_mac(self):
- """ Input L2 ACL test - permit source and destination MAC
-
- Test scenario for basic IP ACL with source IP
- - Create IPv4 stream for pg0 -> pg1 interface.
- - Create ACL with source and destination MAC addresses.
- - Send and verify received packets on pg1 interface.
- """
- key = 'mac_in'
- self.build_classify_table(
- src_mac='ffffffffffff', dst_mac='ffffffffffff', key=key)
- self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
- self.acl_active_table = key
- self.run_verify_test(self.IP, self.IPV4, -1)
-
- def test_0013_inacl_permit_ether_type(self):
- """ Input L2 ACL test - permit ether_type
-
- Test scenario for basic IP ACL with source IP
- - Create IPv4 stream for pg0 -> pg1 interface.
- - Create ACL with destination MAC address.
- - Send and verify received packets on pg1 interface.
- """
- key = 'mac_in'
- self.build_classify_table(
- ether_type='ffff', etype=hex(ETH_P_IP)[2:], key=key)
- self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
- self.acl_active_table = key
- self.run_verify_test(self.IP, self.IPV4, -1)
-
- def test_0015_inacl_deny(self):
- """ Input L2 ACL test - deny
-
- Test scenario for basic IP ACL with source IP
- - Create IPv4 stream for pg0 -> pg1 interface.
-
- - Create ACL with source MAC address.
- - Send and verify no received packets on pg1 interface.
- """
- key = 'mac_in'
- self.build_classify_table(
- src_mac='ffffffffffff', hit_next_index=0, key=key)
- self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
- self.acl_active_table = key
- self.run_verify_negat_test(self.IP, self.IPV4, -1)
-
- def test_0020_outacl_permit(self):
- """ Output L2 ACL test - permit
-
- Test scenario for basic IP ACL with source IP
- - Create IPv4 stream for pg0 -> pg1 interface.
- - Create ACL with source MAC address.
- - Send and verify received packets on pg1 interface.
- """
- key = 'mac_out'
- self.build_classify_table(src_mac='ffffffffffff', key=key)
- self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
- self.acl_active_table = key
- self.run_verify_test(self.IP, self.IPV4, -1)
-
- def test_0025_outacl_deny(self):
- """ Output L2 ACL test - deny
-
- Test scenario for basic IP ACL with source IP
- - Create IPv4 stream for pg0 -> pg1 interface.
- - Create ACL with source MAC address.
- - Send and verify no received packets on pg1 interface.
- """
- key = 'mac_out'
- self.build_classify_table(
- src_mac='ffffffffffff', hit_next_index=0, key=key)
- self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
- self.acl_active_table = key
- self.run_verify_negat_test(self.IP, self.IPV4, -1)
-
- def test_0030_inoutacl_permit(self):
- """ Input+Output L2 ACL test - permit
-
- Test scenario for basic IP ACL with source IP
- - Create IPv4 stream for pg0 -> pg1 interface.
- - Create ACLs with source MAC address.
- - Send and verify received packets on pg1 interface.
- """
- key = 'mac_inout'
- self.build_classify_table(src_mac='ffffffffffff', key=key)
- self.output_acl_set_interface(self.pg1, self.acl_tbl_idx.get(key))
- self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
- self.acl_active_table = key
- self.run_verify_test(self.IP, self.IPV4, -1)
-
-
-if __name__ == '__main__':
- unittest.main(testRunner=VppTestRunner)
diff --git a/src/plugins/acl/test/vpp_acl.py b/src/plugins/acl/test/vpp_acl.py
deleted file mode 100644
index 2d2f7ca257b..00000000000
--- a/src/plugins/acl/test/vpp_acl.py
+++ /dev/null
@@ -1,476 +0,0 @@
-from ipaddress import IPv4Network
-
-from vpp_object import VppObject
-from vpp_papi import VppEnum
-from vpp_ip import INVALID_INDEX
-from vpp_papi_provider import UnexpectedApiReturnValueError
-
-
-class VppAclPlugin(VppObject):
-
- def __init__(self, test, enable_intf_counters=False):
- self._test = test
- self.enable_intf_counters = enable_intf_counters
-
- @property
- def enable_intf_counters(self):
- return self._enable_intf_counters
-
- @enable_intf_counters.setter
- def enable_intf_counters(self, enable):
- self.vapi.acl_stats_intf_counters_enable(enable=enable)
-
- def add_vpp_config(self):
- pass
-
- def remove_vpp_config(self):
- pass
-
- def query_vpp_config(self):
- pass
-
- def object_id(self):
- return ("acl-plugin-%d" % (self._sw_if_index))
-
-
-class AclRule():
- """ ACL Rule """
-
- # port ranges
- PORTS_ALL = -1
- PORTS_RANGE = 0
- PORTS_RANGE_2 = 1
- udp_sport_from = 10
- udp_sport_to = udp_sport_from + 5
- udp_dport_from = 20000
- udp_dport_to = udp_dport_from + 5000
- tcp_sport_from = 30
- tcp_sport_to = tcp_sport_from + 5
- tcp_dport_from = 40000
- tcp_dport_to = tcp_dport_from + 5000
-
- udp_sport_from_2 = 90
- udp_sport_to_2 = udp_sport_from_2 + 5
- udp_dport_from_2 = 30000
- udp_dport_to_2 = udp_dport_from_2 + 5000
- tcp_sport_from_2 = 130
- tcp_sport_to_2 = tcp_sport_from_2 + 5
- tcp_dport_from_2 = 20000
- tcp_dport_to_2 = tcp_dport_from_2 + 5000
-
- icmp4_type = 8 # echo request
- icmp4_code = 3
- icmp6_type = 128 # echo request
- icmp6_code = 3
-
- icmp4_type_2 = 8
- icmp4_code_from_2 = 5
- icmp4_code_to_2 = 20
- icmp6_type_2 = 128
- icmp6_code_from_2 = 8
- icmp6_code_to_2 = 42
-
- def __init__(self, is_permit, src_prefix=IPv4Network('0.0.0.0/0'),
- dst_prefix=IPv4Network('0.0.0.0/0'),
- proto=0, ports=PORTS_ALL, sport_from=None, sport_to=None,
- dport_from=None, dport_to=None):
- self.is_permit = is_permit
- self.src_prefix = src_prefix
- self.dst_prefix = dst_prefix
- self._proto = proto
- self._ports = ports
- # assign ports by range
- self.update_ports()
- # assign specified ports
- if sport_from:
- self.sport_from = sport_from
- if sport_to:
- self.sport_to = sport_to
- if dport_from:
- self.dport_from = dport_from
- if dport_to:
- self.dport_to = dport_to
-
- def __copy__(self):
- new_rule = AclRule(self.is_permit, self.src_prefix, self.dst_prefix,
- self._proto, self._ports, self.sport_from,
- self.sport_to, self.dport_from, self.dport_to)
- return new_rule
-
- def update_ports(self):
- if self._ports == self.PORTS_ALL:
- self.sport_from = 0
- self.dport_from = 0
- self.sport_to = 65535
- if self._proto == 1 or self._proto == 58:
- self.sport_to = 255
- self.dport_to = self.sport_to
- elif self._ports == self.PORTS_RANGE:
- if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
- self.sport_from = self.icmp4_type
- self.sport_to = self.icmp4_type
- self.dport_from = self.icmp4_code
- self.dport_to = self.icmp4_code
- elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
- self.sport_from = self.icmp6_type
- self.sport_to = self.icmp6_type
- self.dport_from = self.icmp6_code
- self.dport_to = self.icmp6_code
- elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
- self.sport_from = self.tcp_sport_from
- self.sport_to = self.tcp_sport_to
- self.dport_from = self.tcp_dport_from
- self.dport_to = self.tcp_dport_to
- elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
- self.sport_from = self.udp_sport_from
- self.sport_to = self.udp_sport_to
- self.dport_from = self.udp_dport_from
- self.dport_to = self.udp_dport_to
- elif self._ports == self.PORTS_RANGE_2:
- if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
- self.sport_from = self.icmp4_type_2
- self.sport_to = self.icmp4_type_2
- self.dport_from = self.icmp4_code_from_2
- self.dport_to = self.icmp4_code_to_2
- elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
- self.sport_from = self.icmp6_type_2
- self.sport_to = self.icmp6_type_2
- self.dport_from = self.icmp6_code_from_2
- self.dport_to = self.icmp6_code_to_2
- elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
- self.sport_from = self.tcp_sport_from_2
- self.sport_to = self.tcp_sport_to_2
- self.dport_from = self.tcp_dport_from_2
- self.dport_to = self.tcp_dport_to_2
- elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
- self.sport_from = self.udp_sport_from_2
- self.sport_to = self.udp_sport_to_2
- self.dport_from = self.udp_dport_from_2
- self.dport_to = self.udp_dport_to_2
- else:
- self.sport_from = self._ports
- self.sport_to = self._ports
- self.dport_from = self._ports
- self.dport_to = self._ports
-
- @property
- def proto(self):
- return self._proto
-
- @proto.setter
- def proto(self, proto):
- self._proto = proto
- self.update_ports()
-
- @property
- def ports(self):
- return self._ports
-
- @ports.setter
- def ports(self, ports):
- self._ports = ports
- self.update_ports()
-
- def encode(self):
- return {'is_permit': self.is_permit, 'proto': self.proto,
- 'srcport_or_icmptype_first': self.sport_from,
- 'srcport_or_icmptype_last': self.sport_to,
- 'src_prefix': self.src_prefix,
- 'dstport_or_icmpcode_first': self.dport_from,
- 'dstport_or_icmpcode_last': self.dport_to,
- 'dst_prefix': self.dst_prefix}
-
-
-class VppAcl(VppObject):
- """ VPP ACL """
-
- def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
- self._test = test
- self._acl_index = acl_index
- self.tag = tag
- self._rules = rules
-
- @property
- def rules(self):
- return self._rules
-
- @property
- def acl_index(self):
- return self._acl_index
-
- @property
- def count(self):
- return len(self._rules)
-
- def encode_rules(self):
- rules = []
- for rule in self._rules:
- rules.append(rule.encode())
- return rules
-
- def add_vpp_config(self, expect_error=False):
- try:
- reply = self._test.vapi.acl_add_replace(
- acl_index=self._acl_index, tag=self.tag, count=self.count,
- r=self.encode_rules())
- self._acl_index = reply.acl_index
- self._test.registry.register(self, self._test.logger)
- if expect_error:
- self._test.fail("Unexpected api reply")
- return self
- except UnexpectedApiReturnValueError:
- if not expect_error:
- self._test.fail("Unexpected api reply")
- return None
-
- def modify_vpp_config(self, rules):
- self._rules = rules
- self.add_vpp_config()
-
- def remove_vpp_config(self, expect_error=False):
- try:
- self._test.vapi.acl_del(acl_index=self._acl_index)
- if expect_error:
- self._test.fail("Unexpected api reply")
- except UnexpectedApiReturnValueError:
- if not expect_error:
- self._test.fail("Unexpected api reply")
-
- def dump(self):
- return self._test.vapi.acl_dump(acl_index=self._acl_index)
-
- def query_vpp_config(self):
- dump = self.dump()
- for rule in dump:
- if rule.acl_index == self._acl_index:
- return True
- return False
-
- def object_id(self):
- return ("acl-%s-%d" % (self.tag, self._acl_index))
-
-
-class VppEtypeWhitelist(VppObject):
- """ VPP Etype Whitelist """
-
- def __init__(self, test, sw_if_index, whitelist, n_input=0):
- self._test = test
- self.whitelist = whitelist
- self.n_input = n_input
- self._sw_if_index = sw_if_index
-
- @property
- def sw_if_index(self):
- return self._sw_if_index
-
- @property
- def count(self):
- return len(self.whitelist)
-
- def add_vpp_config(self):
- self._test.vapi.acl_interface_set_etype_whitelist(
- sw_if_index=self._sw_if_index, count=self.count,
- n_input=self.n_input, whitelist=self.whitelist)
- self._test.registry.register(self, self._test.logger)
- return self
-
- def remove_vpp_config(self):
- self._test.vapi.acl_interface_set_etype_whitelist(
- sw_if_index=self._sw_if_index, count=0, n_input=0, whitelist=[])
-
- def query_vpp_config(self):
- self._test.vapi.acl_interface_etype_whitelist_dump(
- sw_if_index=self._sw_if_index)
- return False
-
- def object_id(self):
- return ("acl-etype_wl-%d" % (self._sw_if_index))
-
-
-class VppAclInterface(VppObject):
- """ VPP ACL Interface """
-
- def __init__(self, test, sw_if_index, acls, n_input=0):
- self._test = test
- self._sw_if_index = sw_if_index
- self.n_input = n_input
- self.acls = acls
-
- @property
- def sw_if_index(self):
- return self._sw_if_index
-
- @property
- def count(self):
- return len(self.acls)
-
- def encode_acls(self):
- acls = []
- for acl in self.acls:
- acls.append(acl.acl_index)
- return acls
-
- def add_vpp_config(self, expect_error=False):
- try:
- reply = self._test.vapi.acl_interface_set_acl_list(
- sw_if_index=self._sw_if_index, n_input=self.n_input,
- count=self.count, acls=self.encode_acls())
- self._test.registry.register(self, self._test.logger)
- if expect_error:
- self._test.fail("Unexpected api reply")
- return self
- except UnexpectedApiReturnValueError:
- if not expect_error:
- self._test.fail("Unexpected api reply")
- return None
-
- def remove_vpp_config(self, expect_error=False):
- try:
- reply = self._test.vapi.acl_interface_set_acl_list(
- sw_if_index=self._sw_if_index, n_input=0, count=0, acls=[])
- if expect_error:
- self._test.fail("Unexpected api reply")
- except UnexpectedApiReturnValueError:
- if not expect_error:
- self._test.fail("Unexpected api reply")
-
- def query_vpp_config(self):
- dump = self._test.vapi.acl_interface_list_dump(
- sw_if_index=self._sw_if_index)
- for acl_list in dump:
- if acl_list.count > 0:
- return True
- return False
-
- def object_id(self):
- return ("acl-if-list-%d" % (self._sw_if_index))
-
-
-class MacipRule():
- """ Mac Ip rule """
-
- def __init__(self, is_permit, src_mac=0, src_mac_mask=0,
- src_prefix=IPv4Network('0.0.0.0/0')):
- self.is_permit = is_permit
- self.src_mac = src_mac
- self.src_mac_mask = src_mac_mask
- self.src_prefix = src_prefix
-
- def encode(self):
- return {'is_permit': self.is_permit, 'src_mac': self.src_mac,
- 'src_mac_mask': self.src_mac_mask,
- 'src_prefix': self.src_prefix}
-
-
-class VppMacipAcl(VppObject):
- """ Vpp Mac Ip ACL """
-
- def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
- self._test = test
- self._acl_index = acl_index
- self.tag = tag
- self._rules = rules
-
- @property
- def acl_index(self):
- return self._acl_index
-
- @property
- def rules(self):
- return self._rules
-
- @property
- def count(self):
- return len(self._rules)
-
- def encode_rules(self):
- rules = []
- for rule in self._rules:
- rules.append(rule.encode())
- return rules
-
- def add_vpp_config(self, expect_error=False):
- try:
- reply = self._test.vapi.macip_acl_add_replace(
- acl_index=self._acl_index, tag=self.tag, count=self.count,
- r=self.encode_rules())
- self._acl_index = reply.acl_index
- self._test.registry.register(self, self._test.logger)
- if expect_error:
- self._test.fail("Unexpected api reply")
- return self
- except UnexpectedApiReturnValueError:
- if not expect_error:
- self._test.fail("Unexpected api reply")
- return None
-
- def modify_vpp_config(self, rules):
- self._rules = rules
- self.add_vpp_config()
-
- def remove_vpp_config(self, expect_error=False):
- try:
- self._test.vapi.macip_acl_del(acl_index=self._acl_index)
- if expect_error:
- self._test.fail("Unexpected api reply")
- except UnexpectedApiReturnValueError:
- if not expect_error:
- self._test.fail("Unexpected api reply")
-
- def dump(self):
- return self._test.vapi.macip_acl_dump(acl_index=self._acl_index)
-
- def query_vpp_config(self):
- dump = self.dump()
- for rule in dump:
- if rule.acl_index == self._acl_index:
- return True
- return False
-
- def object_id(self):
- return ("macip-acl-%s-%d" % (self.tag, self._acl_index))
-
-
-class VppMacipAclInterface(VppObject):
- """ VPP Mac Ip ACL Interface """
-
- def __init__(self, test, sw_if_index, acls):
- self._test = test
- self._sw_if_index = sw_if_index
- self.acls = acls
-
- @property
- def sw_if_index(self):
- return self._sw_if_index
-
- @property
- def count(self):
- return len(self.acls)
-
- def add_vpp_config(self):
- for acl in self.acls:
- self._test.vapi.macip_acl_interface_add_del(
- is_add=True, sw_if_index=self._sw_if_index,
- acl_index=acl.acl_index)
- self._test.registry.register(self, self._test.logger)
-
- def remove_vpp_config(self):
- for acl in self.acls:
- self._test.vapi.macip_acl_interface_add_del(
- is_add=False, sw_if_index=self._sw_if_index,
- acl_index=acl.acl_index)
-
- def dump(self):
- return self._test.vapi.macip_acl_interface_list_dump(
- sw_if_index=self._sw_if_index)
-
- def query_vpp_config(self):
- dump = self.dump()
- for acl_list in dump:
- for acl_index in acl_list.acls:
- if acl_index != INVALID_INDEX:
- return True
- return False
-
- def object_id(self):
- return ("macip-acl-if-list-%d" % (self._sw_if_index))