summaryrefslogtreecommitdiffstats
path: root/test/vpp_ipip_tun_interface.py
blob: 50dcb29b7d570b411b4b9f3dcbf1e2b4bfd274b1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from vpp_tunnel_interface import VppTunnelInterface
from vpp_papi import VppEnum


class VppIpIpTunInterface(VppTunnelInterface):
    """
    VPP IP-IP Tunnel interface
    """

    def __init__(
        self, test, parent_if, src, dst, table_id=0, dscp=0x0, flags=0, mode=None
    ):
        super(VppIpIpTunInterface, self).__init__(test, parent_if)
        self.src = src
        self.dst = dst
        self.table_id = table_id
        self.dscp = dscp
        self.flags = flags
        self.mode = mode
        if not self.mode:
            self.mode = VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_P2P

    def add_vpp_config(self):
        r = self.test.vapi.ipip_add_tunnel(
            tunnel={
                "src": self.src,
                "dst": self.dst,
                "table_id": self.table_id,
                "flags": self.flags,
                "dscp": self.dscp,
                "instance": 0xFFFFFFFF,
                "mode": self.mode,
            }
        )
        self.set_sw_if_index(r.sw_if_index)
        r = self.test.vapi.ipip_tunnel_dump(sw_if_index=self.sw_if_index)
        self.instance = r[0].tunnel.instance
        self.test.registry.register(self, self.test.logger)
        return self

    def remove_vpp_config(self):
        self.test.vapi.ipip_del_tunnel(sw_if_index=self._sw_if_index)

    def query_vpp_config(self):
        ts = self.test.vapi.ipip_tunnel_dump(sw_if_index=0xFFFFFFFF)
        for t in ts:
            if t.tunnel.sw_if_index == self._sw_if_index:
                return True
        return False

    def __str__(self):
        return self.object_id()

    def object_id(self):
        return "ipip%d" % self.instance

    @property
    def remote_ip(self):
        return self.dst

    @property
    def local_ip(self):
        return self.src
#008800; font-weight: bold } /* Keyword.Constant */ .highlight .kd { color: #008800; font-weight: bold } /* Keyword.Declaration */ .highlight .kn { color: #008800; font-weight: bold } /* Keyword.Namespace */ .highlight .kp { color: #008800 } /* Keyword.Pseudo */ .highlight .kr { color: #008800; font-weight: bold } /* Keyword.Reserved */ .highlight .kt { color: #888888; font-weight: bold } /* Keyword.Type */ .highlight .m { color: #0000DD; font-weight: bold } /* Literal.Number */ .highlight .s { color: #dd2200; background-color: #fff0f0 } /* Literal.String */ .highlight .na { color: #336699 } /* Name.Attribute */ .highlight .nb { color: #003388 } /* Name.Builtin */ .highlight .nc { color: #bb0066; font-weight: bold } /* Name.Class */ .highlight .no { color: #003366; font-weight: bold } /* Name.Constant */ .highlight .nd { color: #555555 } /* Name.Decorator */ .highlight .ne { color: #bb0066; font-weight: bold } /* Name.Exception */ .highlight .nf { color: #0066bb; font-weight: bold } /* Name.Function */ .highlight .nl { color: #336699; font-style: italic } /* Name.Label */ .highlight .nn { color: #bb0066; font-weight: bold } /* Name.Namespace */ .highlight .py { color: #336699; font-weight: bold } /* Name.Property */ .highlight .nt { color: #bb0066; font-weight: bold } /* Name.Tag */ .highlight .nv { color: #336699 } /* Name.Variable */ .highlight .ow { color: #008800 } /* Operator.Word */ .highlight .w { color: #bbbbbb } /* Text.Whitespace */ .highlight .mb { color: #0000DD; font-weight: bold } /* Literal.Number.Bin */ .highlight .mf { color: #0000DD; font-weight: bold } /* Literal.Number.Float */ .highlight .mh { color: #0000DD; font-weight: bold } /* Literal.Number.Hex */ .highlight .mi { color: #0000DD; font-weight: bold } /* Literal.Number.Integer */ .highlight .mo { color: #0000DD; font-weight: bold } /* Literal.Number.Oct */ .highlight .sa { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Affix */ .highlight .sb { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Backtick */ .highlight .sc { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Char */ .highlight .dl { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Delimiter */ .highlight .sd { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Doc */ .highlight .s2 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Double */ .highlight .se { color: #0044dd; background-color: #fff0f0 } /* Literal.String.Escape */ .highlight .sh { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Heredoc */ .highlight .si { color: #3333bb; background-color: #fff0f0 } /* Literal.String.Interpol */ .highlight .sx { color: #22bb22; background-color: #f0fff0 } /* Literal.String.Other */ .highlight .sr { color: #008800; background-color: #fff0ff } /* Literal.String.Regex */ .highlight .s1 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Single */ .highlight .ss { color: #aa6600; background-color: #fff0f0 } /* Literal.String.Symbol */ .highlight .bp { color: #003388 } /* Name.Builtin.Pseudo */ .highlight .fm { color: #0066bb; font-weight: bold } /* Name.Function.Magic */ .highlight .vc { color: #336699 } /* Name.Variable.Class */ .highlight .vg { color: #dd7700 } /* Name.Variable.Global */ .highlight .vi { color: #3333bb } /* Name.Variable.Instance */ .highlight .vm { color: #336699 } /* Name.Variable.Magic */ .highlight .il { color: #0000DD; font-weight: bold } /* Literal.Number.Integer.Long */ }
from ipaddress import IPv4Network

from vpp_object import VppObject
from vpp_papi import VppEnum
from vpp_ip import INVALID_INDEX
from vpp_papi_provider import UnexpectedApiReturnValueError


class VppAclPlugin(VppObject):

    def __init__(self, test, enable_intf_counters=False):
        self._test = test
        self.enable_intf_counters = enable_intf_counters

    @property
    def enable_intf_counters(self):
        return self._enable_intf_counters

    @enable_intf_counters.setter
    def enable_intf_counters(self, enable):
        self.vapi.acl_stats_intf_counters_enable(enable=enable)

    def add_vpp_config(self):
        pass

    def remove_vpp_config(self):
        pass

    def query_vpp_config(self):
        pass

    def object_id(self):
        return ("acl-plugin-%d" % (self._sw_if_index))


class AclRule():
    """ ACL Rule """

    # port ranges
    PORTS_ALL = -1
    PORTS_RANGE = 0
    PORTS_RANGE_2 = 1
    udp_sport_from = 10
    udp_sport_to = udp_sport_from + 5
    udp_dport_from = 20000
    udp_dport_to = udp_dport_from + 5000
    tcp_sport_from = 30
    tcp_sport_to = tcp_sport_from + 5
    tcp_dport_from = 40000
    tcp_dport_to = tcp_dport_from + 5000

    udp_sport_from_2 = 90
    udp_sport_to_2 = udp_sport_from_2 + 5
    udp_dport_from_2 = 30000
    udp_dport_to_2 = udp_dport_from_2 + 5000
    tcp_sport_from_2 = 130
    tcp_sport_to_2 = tcp_sport_from_2 + 5
    tcp_dport_from_2 = 20000
    tcp_dport_to_2 = tcp_dport_from_2 + 5000

    icmp4_type = 8  # echo request
    icmp4_code = 3
    icmp6_type = 128  # echo request
    icmp6_code = 3

    icmp4_type_2 = 8
    icmp4_code_from_2 = 5
    icmp4_code_to_2 = 20
    icmp6_type_2 = 128
    icmp6_code_from_2 = 8
    icmp6_code_to_2 = 42

    def __init__(self, is_permit, src_prefix=IPv4Network('0.0.0.0/0'),
                 dst_prefix=IPv4Network('0.0.0.0/0'),
                 proto=0, ports=PORTS_ALL, sport_from=None, sport_to=None,
                 dport_from=None, dport_to=None):
        self.is_permit = is_permit
        self.src_prefix = src_prefix
        self.dst_prefix = dst_prefix
        self._proto = proto
        self._ports = ports
        # assign ports by range
        self.update_ports()
        # assign specified ports
        if sport_from:
            self.sport_from = sport_from
        if sport_to:
            self.sport_to = sport_to
        if dport_from:
            self.dport_from = dport_from
        if dport_to:
            self.dport_to = dport_to

    def __copy__(self):
        new_rule = AclRule(self.is_permit, self.src_prefix, self.dst_prefix,
                           self._proto, self._ports, self.sport_from,
                           self.sport_to, self.dport_from, self.dport_to)
        return new_rule

    def update_ports(self):
        if self._ports == self.PORTS_ALL:
            self.sport_from = 0
            self.dport_from = 0
            self.sport_to = 65535
            if self._proto == 1 or self._proto == 58:
                self.sport_to = 255
            self.dport_to = self.sport_to
        elif self._ports == self.PORTS_RANGE:
            if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
                self.sport_from = self.icmp4_type
                self.sport_to = self.icmp4_type
                self.dport_from = self.icmp4_code
                self.dport_to = self.icmp4_code
            elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
                self.sport_from = self.icmp6_type
                self.sport_to = self.icmp6_type
                self.dport_from = self.icmp6_code
                self.dport_to = self.icmp6_code
            elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
                self.sport_from = self.tcp_sport_from
                self.sport_to = self.tcp_sport_to
                self.dport_from = self.tcp_dport_from
                self.dport_to = self.tcp_dport_to
            elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
                self.sport_from = self.udp_sport_from
                self.sport_to = self.udp_sport_to
                self.dport_from = self.udp_dport_from
                self.dport_to = self.udp_dport_to
        elif self._ports == self.PORTS_RANGE_2:
            if self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP:
                self.sport_from = self.icmp4_type_2
                self.sport_to = self.icmp4_type_2
                self.dport_from = self.icmp4_code_from_2
                self.dport_to = self.icmp4_code_to_2
            elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_ICMP6:
                self.sport_from = self.icmp6_type_2
                self.sport_to = self.icmp6_type_2
                self.dport_from = self.icmp6_code_from_2
                self.dport_to = self.icmp6_code_to_2
            elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_TCP:
                self.sport_from = self.tcp_sport_from_2
                self.sport_to = self.tcp_sport_to_2
                self.dport_from = self.tcp_dport_from_2
                self.dport_to = self.tcp_dport_to_2
            elif self._proto == VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP:
                self.sport_from = self.udp_sport_from_2
                self.sport_to = self.udp_sport_to_2
                self.dport_from = self.udp_dport_from_2
                self.dport_to = self.udp_dport_to_2
        else:
            self.sport_from = self._ports
            self.sport_to = self._ports
            self.dport_from = self._ports
            self.dport_to = self._ports

    @property
    def proto(self):
        return self._proto

    @proto.setter
    def proto(self, proto):
        self._proto = proto
        self.update_ports()

    @property
    def ports(self):
        return self._ports

    @ports.setter
    def ports(self, ports):
        self._ports = ports
        self.update_ports()

    def encode(self):
        return {'is_permit': self.is_permit, 'proto': self.proto,
                'srcport_or_icmptype_first': self.sport_from,
                'srcport_or_icmptype_last': self.sport_to,
                'src_prefix': self.src_prefix,
                'dstport_or_icmpcode_first': self.dport_from,
                'dstport_or_icmpcode_last': self.dport_to,
                'dst_prefix': self.dst_prefix}


class VppAcl(VppObject):
    """ VPP ACL """

    def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
        self._test = test
        self._acl_index = acl_index
        self.tag = tag
        self._rules = rules

    @property
    def rules(self):
        return self._rules

    @property
    def acl_index(self):
        return self._acl_index

    @property
    def count(self):
        return len(self._rules)

    def encode_rules(self):
        rules = []
        for rule in self._rules:
            rules.append(rule.encode())
        return rules

    def add_vpp_config(self, expect_error=False):
        try:
            reply = self._test.vapi.acl_add_replace(
                acl_index=self._acl_index, tag=self.tag, count=self.count,
                r=self.encode_rules())
            self._acl_index = reply.acl_index
            self._test.registry.register(self, self._test.logger)
            if expect_error:
                self._test.fail("Unexpected api reply")
            return self
        except UnexpectedApiReturnValueError:
            if not expect_error:
                self._test.fail("Unexpected api reply")
        return None

    def modify_vpp_config(self, rules):
        self._rules = rules
        self.add_vpp_config()

    def remove_vpp_config(self, expect_error=False):
        try:
            self._test.vapi.acl_del(acl_index=self._acl_index)
            if expect_error:
                self._test.fail("Unexpected api reply")
        except UnexpectedApiReturnValueError:
            if not expect_error:
                self._test.fail("Unexpected api reply")

    def dump(self):
        return self._test.vapi.acl_dump(acl_index=self._acl_index)

    def query_vpp_config(self):
        dump = self.dump()
        for rule in dump:
            if rule.acl_index == self._acl_index:
                return True
        return False

    def object_id(self):
        return ("acl-%s-%d" % (self.tag, self._acl_index))


class VppEtypeWhitelist(VppObject):
    """ VPP Etype Whitelist """

    def __init__(self, test, sw_if_index, whitelist, n_input=0):
        self._test = test
        self.whitelist = whitelist
        self.n_input = n_input
        self._sw_if_index = sw_if_index

    @property
    def sw_if_index(self):
        return self._sw_if_index

    @property
    def count(self):
        return len(self.whitelist)

    def add_vpp_config(self):
        self._test.vapi.acl_interface_set_etype_whitelist(
            sw_if_index=self._sw_if_index, count=self.count,
            n_input=self.n_input, whitelist=self.whitelist)
        self._test.registry.register(self, self._test.logger)
        return self

    def remove_vpp_config(self):
        self._test.vapi.acl_interface_set_etype_whitelist(
            sw_if_index=self._sw_if_index, count=0, n_input=0, whitelist=[])

    def query_vpp_config(self):
        self._test.vapi.acl_interface_etype_whitelist_dump(
            sw_if_index=self._sw_if_index)
        return False

    def object_id(self):
        return ("acl-etype_wl-%d" % (self._sw_if_index))


class VppAclInterface(VppObject):
    """ VPP ACL Interface """

    def __init__(self, test, sw_if_index, acls, n_input=0):
        self._test = test
        self._sw_if_index = sw_if_index
        self.n_input = n_input
        self.acls = acls

    @property
    def sw_if_index(self):
        return self._sw_if_index

    @property
    def count(self):
        return len(self.acls)

    def encode_acls(self):
        acls = []
        for acl in self.acls:
            acls.append(acl.acl_index)
        return acls

    def add_vpp_config(self, expect_error=False):
        try:
            reply = self._test.vapi.acl_interface_set_acl_list(
                sw_if_index=self._sw_if_index, n_input=self.n_input,
                count=self.count, acls=self.encode_acls())
            self._test.registry.register(self, self._test.logger)
            if expect_error:
                self._test.fail("Unexpected api reply")
            return self
        except UnexpectedApiReturnValueError:
            if not expect_error:
                self._test.fail("Unexpected api reply")
        return None

    def remove_vpp_config(self, expect_error=False):
        try:
            reply = self._test.vapi.acl_interface_set_acl_list(
                sw_if_index=self._sw_if_index, n_input=0, count=0, acls=[])
            if expect_error:
                self._test.fail("Unexpected api reply")
        except UnexpectedApiReturnValueError:
            if not expect_error:
                self._test.fail("Unexpected api reply")

    def query_vpp_config(self):
        dump = self._test.vapi.acl_interface_list_dump(
            sw_if_index=self._sw_if_index)
        for acl_list in dump:
            if acl_list.count > 0:
                return True
        return False

    def object_id(self):
        return ("acl-if-list-%d" % (self._sw_if_index))


class MacipRule():
    """ Mac Ip rule """

    def __init__(self, is_permit, src_mac=0, src_mac_mask=0,
                 src_prefix=IPv4Network('0.0.0.0/0')):
        self.is_permit = is_permit
        self.src_mac = src_mac
        self.src_mac_mask = src_mac_mask
        self.src_prefix = src_prefix

    def encode(self):
        return {'is_permit': self.is_permit, 'src_mac': self.src_mac,
                'src_mac_mask': self.src_mac_mask,
                'src_prefix': self.src_prefix}


class VppMacipAcl(VppObject):
    """ Vpp Mac Ip ACL """

    def __init__(self, test, rules, acl_index=INVALID_INDEX, tag=None):
        self._test = test
        self._acl_index = acl_index
        self.tag = tag
        self._rules = rules

    @property
    def acl_index(self):
        return self._acl_index

    @property
    def rules(self):
        return self._rules

    @property
    def count(self):
        return len(self._rules)

    def encode_rules(self):
        rules = []
        for rule in self._rules:
            rules.append(rule.encode())
        return rules

    def add_vpp_config(self, expect_error=False):
        try:
            reply = self._test.vapi.macip_acl_add_replace(
                acl_index=self._acl_index, tag=self.tag, count=self.count,
                r=self.encode_rules())
            self._acl_index = reply.acl_index
            self._test.registry.register(self, self._test.logger)
            if expect_error:
                self._test.fail("Unexpected api reply")
            return self
        except UnexpectedApiReturnValueError:
            if not expect_error:
                self._test.fail("Unexpected api reply")
        return None

    def modify_vpp_config(self, rules):
        self._rules = rules
        self.add_vpp_config()

    def remove_vpp_config(self, expect_error=False):
        try:
            self._test.vapi.macip_acl_del(acl_index=self._acl_index)
            if expect_error:
                self._test.fail("Unexpected api reply")
        except UnexpectedApiReturnValueError:
            if not expect_error:
                self._test.fail("Unexpected api reply")

    def dump(self):
        return self._test.vapi.macip_acl_dump(acl_index=self._acl_index)

    def query_vpp_config(self):
        dump = self.dump()
        for rule in dump:
            if rule.acl_index == self._acl_index:
                return True
        return False

    def object_id(self):
        return ("macip-acl-%s-%d" % (self.tag, self._acl_index))


class VppMacipAclInterface(VppObject):
    """ VPP Mac Ip ACL Interface """

    def __init__(self, test, sw_if_index, acls):
        self._test = test
        self._sw_if_index = sw_if_index
        self.acls = acls

    @property
    def sw_if_index(self):
        return self._sw_if_index

    @property
    def count(self):
        return len(self.acls)

    def add_vpp_config(self):
        for acl in self.acls:
            self._test.vapi.macip_acl_interface_add_del(
                is_add=True, sw_if_index=self._sw_if_index,
                acl_index=acl.acl_index)
        self._test.registry.register(self, self._test.logger)

    def remove_vpp_config(self):
        for acl in self.acls:
            self._test.vapi.macip_acl_interface_add_del(
                is_add=False, sw_if_index=self._sw_if_index,
                acl_index=acl.acl_index)

    def dump(self):
        return self._test.vapi.macip_acl_interface_list_dump(
            sw_if_index=self._sw_if_index)

    def query_vpp_config(self):
        dump = self.dump()
        for acl_list in dump:
            for acl_index in acl_list.acls:
                if acl_index != INVALID_INDEX:
                    return True
        return False

    def object_id(self):
        return ("macip-acl-if-list-%d" % (self._sw_if_index))