aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_l2tp.py
blob: c57b5912de1851a0273c900d067e895ed4c98fe1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#!/usr/bin/env python3

import unittest

from scapy.layers.l2 import Ether
from scapy.layers.inet6 import IPv6

from framework import VppTestCase


class TestL2tp(VppTestCase):
    """ L2TP Test Case """

    @classmethod
    def setUpClass(cls):
        super(TestL2tp, cls).setUpClass()

        cls.create_pg_interfaces(range(1))
        cls.pg0.admin_up()
        cls.pg0.config_ip6()

    def test_l2tp_decap_local(self):
        """ L2TP don't accept packets unless configured """

        pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
               IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6, nh=115))

        self.pg0.add_stream(pkt)
        self.pg_start()

        # l2tp should not accept packets
        err = self.statistics.get_counter(
            '/err/l2tp-decap-local/l2tpv3 session not found')[0]
        self.assertEqual(err, 0)
        err_count = err

        self.vapi.l2tpv3_create_tunnel(client_address=self.pg0.local_ip6,
                                       our_address=self.pg0.remote_ip6)

        self.pg0.add_stream(pkt)
        self.pg_start()

        # l2tp accepts packets
        err = self.statistics.get_counter(
            '/err/l2tp-decap-local/l2tpv3 session not found')[0]
        self.assertEqual(err, 1)
        err_count = err
'>332 333 334 335 336 337 338 339 340 341 342 343
# Copyright (c) 2019 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""GBP utilities library."""

from enum import IntEnum
from ipaddress import ip_address

from resources.libraries.python.IPUtil import IPUtil
from resources.libraries.python.L2Util import L2Util
from resources.libraries.python.PapiExecutor import PapiSocketExecutor


class GBPEndpointFlags(IntEnum):
    """GBP Endpoint Flags."""
    GBP_API_ENDPOINT_FLAG_NONE = 0
    GBP_API_ENDPOINT_FLAG_BOUNCE = 1
    GBP_API_ENDPOINT_FLAG_REMOTE = 2
    GBP_API_ENDPOINT_FLAG_LEARNT = 4
    GBP_API_ENDPOINT_FLAG_EXTERNAL = 8


class GBPBridgeDomainFlags(IntEnum):
    """GBP Bridge Domain Flags."""
    GBP_BD_API_FLAG_NONE = 0
    GBP_BD_API_FLAG_DO_NOT_LEARN = 1
    GBP_BD_API_FLAG_UU_FWD_DROP = 2
    GBP_BD_API_FLAG_MCAST_DROP = 4
    GBP_BD_API_FLAG_UCAST_ARP = 8


class GBPSubnetType(IntEnum):
    """GBP Subnet Type."""
    GBP_API_SUBNET_TRANSPORT = 1
    GBP_API_SUBNET_STITCHED_INTERNAL = 2
    GBP_API_SUBNET_STITCHED_EXTERNAL = 3
    GBP_API_SUBNET_L3_OUT = 4
    GBP_API_SUBNET_ANON_L3_OUT = 5


class GBPExtItfFlags(IntEnum):
    """GBP External Interface Flags."""
    GBP_API_EXT_ITF_F_NONE = 0
    GBP_API_EXT_ITF_F_ANON = 1


class GBPRuleAction(IntEnum):
    """GBP Rule Action."""
    GBP_API_RULE_PERMIT = 1
    GBP_API_RULE_DENY = 2
    GBP_API_RULE_REDIRECT = 3


class GBP(object):
    """GBP utilities."""

    @staticmethod
    def gbp_route_domain_add(
            node, rd_id=1, ip4_table_id=1, ip6_table_id=0,
            ip4_uu_sw_if_index=0xffffffff, ip6_uu_sw_if_index=0xffffffff):
        """Add GBP route domain.

        :param node: Node to add GBP route domain on.
        :param rd_id: GBP route domain ID.
        :param ip4_table_id: IPv4 table.
        :param ip6_table_id: IPv6 table.
        :param ip4_uu_sw_if_index: IPv4 unicast interface index.
        :param ip6_uu_sw_if_index: IPv6 unicast interface index.
        :type node: dict
        :type rd_id: int
        :type ip4_table_id: int
        :type ip6_table_id: int
        :type ip4_uu_sw_if_index: int
        :type ip6_uu_sw_if_index: int
        """
        cmd = 'gbp_route_domain_add'
        err_msg = 'Failed to add GBP route domain on {node}!'\
                  .format(node=node['host'])

        args_in = dict(
            rd=dict(
                rd_id=rd_id,
                ip4_table_id=ip4_table_id,
                ip6_table_id=ip6_table_id,
                ip4_uu_sw_if_index=ip4_uu_sw_if_index,
                ip6_uu_sw_if_index=ip6_uu_sw_if_index
            )
        )

        with PapiSocketExecutor(node) as papi_exec:
            papi_exec.add(cmd, **args_in).get_reply(err_msg)

    @staticmethod
    def gbp_bridge_domain_add(
            node, bvi_sw_if_index, bd_id=1, rd_id=1,
            uu_fwd_sw_if_index=0xffffffff, bm_flood_sw_if_index=0xffffffff):
        """Add GBP bridge domain.

        :param node: Node to add GBP bridge domain on.
        :param bvi_sw_if_index: SW index of BVI/loopback interface.
        :param bd_id: GBP bridge domain ID.
        :param rd_id: GBP route domain ID.
        :param uu_fwd_sw_if_index: Unicast forward interface index.
        :param bm_flood_sw_if_index: Bcast/Mcast flood interface index.
        :type node: dict
        :type bvi_sw_if_index: int
        :type bd_id: int
        :type rd_id: int
        :type uu_fwd_sw_if_index: int
        :type bm_flood_sw_if_index: int
        """
        cmd = 'gbp_bridge_domain_add'
        err_msg = 'Failed to add GBP bridge domain on {node}!'\
                  .format(node=node['host'])

        args_in = dict(
            bd=dict(
                flags=getattr(GBPBridgeDomainFlags,
                              'GBP_BD_API_FLAG_NONE').value,
                bvi_sw_if_index=bvi_sw_if_index,
                uu_fwd_sw_if_index=uu_fwd_sw_if_index,
                bm_flood_sw_if_index=bm_flood_sw_if_index,
                bd_id=bd_id,
                rd_id=rd_id
            )
        )

        with PapiSocketExecutor(node) as papi_exec:
            papi_exec.add(cmd, **args_in).get_reply(err_msg)

    @staticmethod
    def gbp_endpoint_group_add(
            node, sclass, bd_id=1, rd_id=1, vnid=1,
            uplink_sw_if_index=0xffffffff, remote_ep_timeout=0xffffffff):
        """Add GBP endpoint group.

        :param node: Node to add GBP endpoint group on.
        :param sclass: Source CLASS.
        :param bd_id: GBP bridge domain ID.
        :param rd_id: GBP route domain ID.
        :param uplink_sw_if_index: Uplink interface index.
        :param remote_ep_timeout: Remote endpoint interface index.
        :param vnid: VNID.
        :type node: dict
        :type sclass: int
        :type bd_id: int
        :type rd_id: int
        :type vnid: int
        :type uplink_sw_if_index: int
        :type remote_ep_timeout: int
        """
        cmd = 'gbp_endpoint_group_add'
        err_msg = 'Failed to add GBP endpoint group on {node}!'\
                  .format(node=node['host'])

        args_in = dict(
            epg=dict(
                uplink_sw_if_index=uplink_sw_if_index,
                bd_id=bd_id,
                rd_id=rd_id,
                vnid=vnid,
                sclass=sclass,
                retention=dict(
                    remote_ep_timeout=remote_ep_timeout
                )
            )
        )

        with PapiSocketExecutor(node) as papi_exec:
            papi_exec.add(cmd, **args_in).get_reply(err_msg)

    @staticmethod
    def gbp_endpoint_add(node, sw_if_index, ip_addr, mac_addr, sclass):
        """Add GBP endpoint.

        :param node: Node to add GBP endpoint on.
        :param sw_if_index: SW index of interface.
        :param ip_addr: GBP route domain ID.
        :param mac_addr: MAC address.
        :param sclass: Source CLASS.
        :type node: dict
        :type sw_if_index: int
        :type ip_addr: str
        :type mac_addr: str
        :type sclass: int
        """
        cmd = 'gbp_endpoint_add'
        err_msg = 'Failed to add GBP endpoint on {node}!'\
                  .format(node=node['host'])

        ips = list()
        ips.append(IPUtil.create_ip_address_object(
            ip_address(unicode(ip_addr))))
        tun_src = IPUtil.create_ip_address_object(
            ip_address(unicode('0.0.0.0')))
        tun_dst = IPUtil.create_ip_address_object(
            ip_address(unicode('0.0.0.0')))

        args_in = dict(
            endpoint=dict(
                sw_if_index=sw_if_index,
                ips=ips,
                n_ips=len(ips),
                mac=L2Util.mac_to_bin(mac_addr),
                sclass=sclass,
                flags=getattr(GBPEndpointFlags,
                              'GBP_API_ENDPOINT_FLAG_EXTERNAL').value,
                tun=dict(
                    src=tun_src,
                    dst=tun_dst
                )
            )
        )

        with PapiSocketExecutor(node) as papi_exec:
            papi_exec.add(cmd, **args_in).get_reply(err_msg)

    @staticmethod
    def gbp_ext_itf_add_del(node, sw_if_index, bd_id=1, rd_id=1):
        """Add external interface to GBP.

        :param node: Node to add external GBP interface on.
        :param sw_if_index: SW index of interface.
        :param bd_id: GBP bridge domain ID.
        :param rd_id: GBP route domain ID.
        :type node: dict
        :type sw_if_index: int
        :type bd_id: int
        :type rd_id: int
        """
        cmd = 'gbp_ext_itf_add_del'
        err_msg = 'Failed to add external GBP interface on {node}!'\
                  .format(node=node['host'])

        args_in = dict(
            is_add=1,
            ext_itf=dict(
                sw_if_index=sw_if_index,
                bd_id=bd_id,
                rd_id=rd_id,
                flags=getattr(GBPExtItfFlags,
                              'GBP_API_EXT_ITF_F_NONE').value
            )
        )

        with PapiSocketExecutor(node) as papi_exec:
            papi_exec.add(cmd, **args_in).get_reply(err_msg)

    @staticmethod
    def gbp_subnet_add_del(
            node, address, subnet_length, sclass, rd_id=1,
            sw_if_index=0xffffffff):
        """Add external interface to GBP.

        :param node: Node to add GBP subnet on.
        :param address: IPv4 adddress.
        :param subnet_length: IPv4 address subnet.
        :param sclass: Source CLASS.
        :param rd_id: GBP route domain ID.
        :param sw_if_index: Interface index.
        :type node: dict
        :type address: int
        :type subnet_length: int
        :type sclass: int
        :type rd_id: int
        :type sw_if_index: int
        """
        cmd = 'gbp_subnet_add_del'
        err_msg = 'Failed to add GBP subnet on {node}!'\
                  .format(node=node['host'])

        args_in = dict(
            is_add=1,
            subnet=dict(
                type=getattr(GBPSubnetType,
                             'GBP_API_SUBNET_L3_OUT').value,
                sw_if_index=sw_if_index,
                sclass=sclass,
                prefix=dict(
                    address=IPUtil.create_ip_address_object(
                        ip_address(unicode(address))),
                    len=int(subnet_length)
                ),
                rd_id=rd_id
            )
        )

        with PapiSocketExecutor(node) as papi_exec:
            papi_exec.add(cmd, **args_in).get_reply(err_msg)

    @staticmethod
    def gbp_contract_add_del(node, sclass, dclass, acl_index=0):
        """Add GBP contract.

        :param node: Node to add GBP contract on.
        :param sclass: Source CLASS.
        :param dclass: Destination CLASS.
        :param acl_index: Index of ACL rule.
        :type node: dict
        :type sclass: int
        :type dclass: int
        :type acl_index: int
        """
        cmd = 'gbp_contract_add_del'
        err_msg = 'Failed to add GBP contract on {node}!'\
                  .format(node=node['host'])

        rule_permit = dict(
            action=getattr(GBPRuleAction,
                           'GBP_API_RULE_PERMIT').value,
            nh_set=dict(
                hash_mode=list(),
                n_nhs=8,
                nhs=[dict()]*8,
            )
        )
        rules = [rule_permit, rule_permit]

        args_in = dict(
            is_add=1,
            contract=dict(
                acl_index=acl_index,
                sclass=sclass,
                dclass=dclass,
                n_rules=len(rules),
                rules=rules,
                n_ether_types=16,
                allowed_ethertypes=[0x800, 0x86dd] + [0]*14
            )
        )

        with PapiSocketExecutor(node) as papi_exec:
            papi_exec.add(cmd, **args_in).get_reply(err_msg)