#!/usr/bin/env python3 from __future__ import print_function """ACL plugin - MACIP tests """ import binascii import ipaddress import random from socket import inet_ntop, inet_pton, AF_INET, AF_INET6 from struct import pack, unpack import re import unittest from ipaddress import ip_network, IPv4Network, IPv6Network import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, UDP from scapy.layers.inet6 import IPv6 from framework import VppTestCase, VppTestRunner from vpp_lo_interface import VppLoInterface from vpp_l2 import L2_PORT_TYPE from vpp_sub_interface import ( L2_VTR_OP, VppSubInterface, VppDot1QSubint, VppDot1ADSubint, ) from vpp_acl import ( AclRule, VppAcl, VppAclInterface, VppEtypeWhitelist, VppMacipAclInterface, VppMacipAcl, MacipRule, ) from vpp_papi import MACAddress class MethodHolder(VppTestCase): DEBUG = False BRIDGED = True ROUTED = False IS_IP4 = False IS_IP6 = True DOT1AD = "dot1ad" DOT1Q = "dot1q" PERMIT_TAGS = True DENY_TAGS = False # rule types DENY = 0 PERMIT = 1 # ACL types EXACT_IP = 1 SUBNET_IP = 2 WILD_IP = 3 EXACT_MAC = 1 WILD_MAC = 2 OUI_MAC = 3 ACLS = [] @classmethod def setUpClass(cls): """ Perform standard class setup (defined by class method setUpClass in class VppTestCase) before running the test case, set test case related variables and configure VPP. """ super(MethodHolder, cls).setUpClass() cls.pg_if_packet_sizes = [64, 512, 1518, 9018] # packet sizes cls.bd_id = 111 cls.remote_hosts_count = 200 try: # create 4 pg interfaces, 1 loopback interface cls.create_pg_interfaces(range(4)) cls.create_loopback_interfaces(1) # create 2 subinterfaces cls.subifs = [ VppDot1QSubint(cls, cls.pg1, 10), VppDot1ADSubint(cls, cls.pg2, 20, 300, 400), VppDot1QSubint(cls, cls.pg3, 30), VppDot1ADSubint(cls, cls.pg3, 40, 600, 700), ] cls.subifs[0].set_vtr(L2_VTR_OP.L2_POP_1, inner=10, push1q=1) cls.subifs[1].set_vtr(L2_VTR_OP.L2_POP_2, outer=300, inner=400, push1q=1) cls.subifs[2].set_vtr(L2_VTR_OP.L2_POP_1, inner=30, push1q=1) cls.subifs[3].set_vtr(L2_VTR_OP.L2_POP_2, outer=600, inner=700, push1q=1) cls.interfaces = list(cls.pg_interfaces) cls.interfaces.extend(cls.lo_interfaces) cls.interfaces.extend(cls.subifs) for i in cls.interfaces: i.admin_up() # Create BD with MAC learning enabled and put interfaces to this BD cls.vapi.sw_interface_set_l2_bridge( rx_sw_if_index=cls.loop0.sw_if_index, bd_id=cls.bd_id, port_type=L2_PORT_TYPE.BVI, ) cls.vapi.sw_interface_set_l2_bridge( rx_sw_if_index=cls.pg0.sw_if_index, bd_id=cls.bd_id ) cls.vapi.sw_interface_set_l2_bridge( rx_sw_if_index=cls.pg1.sw_if_index, bd_id=cls.bd_id ) cls.vapi.sw_interface_set_l2_bridge( rx_sw_if_index=cls.subifs[0].sw_if_index, bd_id=cls.bd_id ) cls.vapi.sw_interface_set_l2_bridge( rx_sw_if_index=cls.subifs[1].sw_if_index, bd_id=cls.bd_id ) # Configure IPv4/6 addresses on loop interface and routed interface cls.loop0.config_ip4() cls.loop0.config_ip6() cls.pg2.config_ip4() cls.pg2.config_ip6() cls.pg3.config_ip4() cls.pg3.config_ip6() # Configure MAC address binding to IPv4 neighbors on loop0 cls.loop0.generate_remote_hosts(cls.remote_hosts_count) # Modify host mac addresses to have different OUI parts for i in range(2, cls.remote_hosts_count + 2): mac = cls.loop0.remote_hosts[i - 2]._mac.split(":") mac[2] = format(int(mac[2], 16) + i, "02x") cls.loop0.remote_hosts[i - 2]._mac = ":".join(mac) cls.loop0.configure_ipv4_neighbors() cls.loop0.configure_ipv6_neighbors() # configure MAC address on pg3 cls.pg3.resolve_arp() cls.pg3.resolve_ndp() # configure MAC address on subifs for i in cls.subifs: i.config_ip4() i.resolve_arp() i.config_ip6() # configure MAC address on pg2 cls.pg2.resolve_arp() cls.pg2.resolve_ndp() # Loopback BVI interface has remote hosts # one half of hosts are behind pg0 second behind pg1,pg2,pg3 subifs cls.pg0.remote_hosts = cls.loop0.remote_hosts[:100] cls.subifs[0].remote_hosts = cls.loop0.remote_hosts[100:125] cls.subifs[1].remote_hosts = cls.loop0.remote_hosts[125:150] cls.subifs[2].remote_hosts = cls.loop0.remote_hosts[150:175] cls.subifs[3].remote_hosts = cls.loop0.remote_hosts[175:] except Exception: super(MethodHolder, cls).tearDownClass() raise @classmethod def tearDownClass(cls): super(MethodHolder, cls).tearDownClass() def setUp(self): super(MethodHolder, self).setUp() self.reset_packet_infos() def show_commands_at_teardown(self): self.logger.info(self.vapi.ppcli("show interface address")) self.logger.info(self.vapi.ppcli("show hardware")) self.logger.info(self.vapi.ppcli("sh acl-plugin macip acl")) self.logger.info(self.vapi.ppcli("sh acl-plugin macip interface")) self.logger.info(self.vapi.ppcli("sh classify tables verbose")) self.logger.info(self.vapi.ppcli("sh acl-plugin acl")) self.logger.info(self.vapi.ppcli("sh acl-plugin interface")) self.logger.info(self.vapi.ppcli("sh acl-plugin tables")) # print(self.vapi.ppcli("show interface address")) # print(self.vapi.ppcli("show hardware")) # print(self.vapi.ppcli("sh acl-plugin macip interface")) # print(self.vapi.ppcli("sh acl-plugin macip acl")) def macip_acl_dump_debug(self): acls = self.vapi.macip_acl_dump() if self.DEBUG: for acl in acls: # print("ACL #"+str(acl.acl_index)) for r in acl.r: rule = "ACTION" if r.is_permit == 1: rule = "PERMIT" elif r.is_permit == 0: rule = "DENY " """ print(" IP6" if r.is_ipv6 else " IP4", rule, binascii.hexlify(r.src_mac), binascii.hexlify(r.src_mac_mask), unpack('<16B', r.src_ip_addr), r.src_ip_prefix_len) """ return acls def create_rules( self, mac_type=EXACT_MAC, ip_type=EXACT_IP, acl_count=1, rules_count=None ): acls = [] if rules_count is None: rules_count = [1] src_mac = int("220000dead00", 16) for acl in range(2, (acl_count + 1) * 2): rules = [] host = random.choice(self.loop0.remote_hosts) is_ip6 = acl % 2 ip4 = host.ip4.split(".") ip6 = list(unpack("<16B", inet_pton(AF_INET6, host.ip6))) if ip_type == self.EXACT_IP: prefix_len4 = 32 prefix_len6 = 128 elif ip_type == self.WILD_IP: ip4 = [0, 0, 0, 0] ip6 = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] prefix_len4 = 0 prefix_len6 = 0 rules_count[int((acl / 2) - 1)] = 1 else: prefix_len4 = 24 prefix_len6 = 64 if mac_type == self.EXACT_MAC: mask = "ff:ff:ff:ff:ff:ff" elif mac_type == self.WILD_MAC: mask = "00:00:00:00:00:00" elif mac_type == self.OUI_MAC: mask = "ff:ff:ff:00:00:00" else: mask = """Stream profile for T-rex traffic generator.

Stream profile:
 - Two streams sent in directions 0 --> 1 and 1 --> 0 at the same time.
 - Packet: ETH / IP /
 - Direction 0 --> 1:
   - Source IP address range:
   - Destination IP address range: -
 - Direction 1 --> 0:
   - Source IP address range:
   - Destination IP address range: -

from trex.stl.api import *
from profile_trex_stateless_base_class import TrafficStreamsBaseClass

class TrafficStreams(TrafficStreamsBaseClass):
    """Stream profile."""

    def __init__(self):
        """Initialization and setting of streams' parameters."""

        super(TrafficStreamsBaseClass, self).__init__()

        self.p1_dst_start_mac = u"02:02:00:00:12:00"

        self.p2_dst_start_mac = u"02:02:00:00:02:00"

        # IPs used in packet headers.
        self.p1_src_start_ip = u""
        self.p1_dst_start_ip = u""
        self.p1_dst_end_ip = u""

        self.p2_src_start_ip = u""
        self.p2_dst_start_ip = u""
        self.p2_dst_end_ip = u""

    def define_packets(self):
        """Defines the packets to be sent from the traffic generator.

        Packet definition: | ETH | IP |

        :returns: Packets to be sent from the traffic generator.
        :rtype: tuple

        # Direction 0 --> 1
        base_pkt_a = (
            ) /
        # Direction 1 --> 0
        base_pkt_b = (
            ) /

        # Direction 0 --> 1
        vm1 = STLScVmRaw(
        # Direction 1 --> 0
        vm2 = STLScVmRaw(

        return base_pkt_a, base_pkt_b, vm1, vm2

def register():
    """Register this traffic profile to T-rex.

    Do not change this function.

    :return: Traffic streams.
    :rtype: Object
    return TrafficStreams()
self.assertEqual(reply.acls[sw_if_index1], 0) self.assertEqual(reply.acls[sw_if_index2], 1) self.assertEqual(reply.acls[sw_if_index3], 1) self.logger.info("MACIP ACL on multiple interfaces:") self.logger.info(self.vapi.ppcli("sh acl-plugin macip acl")) self.logger.info(self.vapi.ppcli("sh acl-plugin macip acl index 1234")) self.logger.info(self.vapi.ppcli("sh acl-plugin macip acl index 1")) self.logger.info(self.vapi.ppcli("sh acl-plugin macip acl index 0")) self.logger.info(self.vapi.ppcli("sh acl-plugin macip interface")) intf[2].remove_vpp_config() intf[1].remove_vpp_config() reply = self.vapi.macip_acl_interface_get() self.assertEqual(reply.count, intf_count + 3) self.assertEqual(reply.acls[sw_if_index0], 4294967295) self.assertEqual(reply.acls[sw_if_index1], 4294967295) self.assertEqual(reply.acls[sw_if_index2], 4294967295) self.assertEqual(reply.acls[sw_if_index3], 1) intf[3].remove_vpp_config() reply = self.vapi.macip_acl_interface_get() self.assertEqual(len([x for x in reply.acls if x != 4294967295]), 0) class TestACL_dot1q_bridged(MethodHolder): """ACL on dot1q bridged subinterfaces Tests""" @classmethod def setUpClass(cls): super(TestACL_dot1q_bridged, cls).setUpClass() @classmethod def tearDownClass(cls): super(TestACL_dot1q_bridged, cls).tearDownClass() def test_acl_bridged_ip4_subif_dot1q(self): """IP4 ACL SubIf Dot1Q bridged traffic""" self.run_traffic( self.EXACT_MAC, self.EXACT_IP, self.BRIDGED, self.IS_IP4, 9, tags=self.DOT1Q, isMACIP=False, ) def test_acl_bridged_ip6_subif_dot1q(self): """IP6 ACL SubIf Dot1Q bridged traffic""" self.run_traffic( self.EXACT_MAC, self.EXACT_IP, self.BRIDGED, self.IS_IP6, 9, tags=self.DOT1Q, isMACIP=False, ) class TestACL_dot1ad_bridged(MethodHolder): """ACL on dot1ad bridged subinterfaces Tests""" @classmethod def setUpClass(cls): super(TestACL_dot1ad_bridged, cls).setUpClass() @classmethod def tearDownClass(cls): super(TestACL_dot1ad_bridged, cls).tearDownClass() def test_acl_bridged_ip4_subif_dot1ad(self): """IP4 ACL SubIf Dot1AD bridged traffic""" self.run_traffic( self.EXACT_MAC, self.EXACT_IP, self.BRIDGED, self.IS_IP4, 9, tags=self.DOT1AD, isMACIP=False, ) def test_acl_bridged_ip6_subif_dot1ad(self): """IP6 ACL SubIf Dot1AD bridged traffic""" self.run_traffic( self.EXACT_MAC, self.EXACT_IP, self.BRIDGED, self.IS_IP6, 9, tags=self.DOT1AD, isMACIP=False, ) class TestACL_dot1q_routed(MethodHolder): """ACL on dot1q routed subinterfaces Tests""" @classmethod def setUpClass(cls): super(TestACL_dot1q_routed, cls).setUpClass() @classmethod def tearDownClass(cls): super(TestACL_dot1q_routed, cls).tearDownClass() def test_acl_routed_ip4_subif_dot1q(self): """IP4 ACL SubIf Dot1Q routed traffic""" self.run_traffic( self.EXACT_MAC, self.EXACT_IP, self.ROUTED, self.IS_IP4, 9, tags=self.DOT1Q, isMACIP=False, ) def test_acl_routed_ip6_subif_dot1q(self): """IP6 ACL SubIf Dot1Q routed traffic""" self.run_traffic( self.EXACT_MAC, self.EXACT_IP, self.ROUTED, self.IS_IP6, 9, tags=self.DOT1Q, isMACIP=False, ) def test_acl_routed_ip4_subif_dot1q_deny_by_tags(self): """IP4 ACL SubIf wrong tags Dot1Q routed traffic""" self.run_traffic( self.EXACT_MAC, self.EXACT_IP, self.ROUTED, self.IS_IP4, 9, True, tags=self.DOT1Q, isMACIP=False, permit_tags=self.DENY_TAGS, ) def test_acl_routed_ip6_subif_dot1q_deny_by_tags(self): """IP6 ACL SubIf wrong tags Dot1Q routed traffic""" self.run_traffic( self.EXACT_MAC, self.EXACT_IP, self.ROUTED, self.IS_IP6, 9, True, tags=self.DOT1Q, isMACIP=False, permit_tags=self.DENY_TAGS, ) class TestACL_dot1ad_routed(MethodHolder): """ACL on dot1ad routed subinterfaces Tests""" @classmethod def setUpClass(cls): super(TestACL_dot1ad_routed, cls).setUpClass() @classmethod def tearDownClass(cls): super(TestACL_dot1ad_routed, cls).tearDownClass() def test_acl_routed_ip6_subif_dot1ad(self): """IP6 ACL SubIf Dot1AD routed traffic""" self.run_traffic( self.EXACT_MAC, self.EXACT_IP, self.ROUTED, self.IS_IP6, 9, tags=self.DOT1AD, isMACIP=False, ) def test_acl_routed_ip4_subif_dot1ad(self): """IP4 ACL SubIf Dot1AD routed traffic""" self.run_traffic( self.EXACT_MAC, self.EXACT_IP, self.ROUTED, self.IS_IP4, 9, tags=self.DOT1AD, isMACIP=False, ) def test_acl_routed_ip6_subif_dot1ad_deny_by_tags(self): """IP6 ACL SubIf wrong tags Dot1AD routed traffic""" self.run_traffic( self.EXACT_MAC, self.EXACT_IP, self.ROUTED, self.IS_IP6, 9, True, tags=self.DOT1AD, isMACIP=False, permit_tags=self.DENY_TAGS, ) def test_acl_routed_ip4_subif_dot1ad_deny_by_tags(self): """IP4 ACL SubIf wrong tags Dot1AD routed traffic""" self.run_traffic( self.EXACT_MAC, self.EXACT_IP, self.ROUTED, self.IS_IP4, 9, True, tags=self.DOT1AD, isMACIP=False, permit_tags=self.DENY_TAGS, ) if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)