summaryrefslogtreecommitdiffstats
path: root/src/vat2/jsonconvert.c
AgeCommit message (Collapse)AuthorFilesLines
2020-12-11api: fromjson/tojson enum flag supportOle Troan1-10/+13
Represent enum flags as JSON arrays (as these can have multiple values). Add unit tests. Type: improvement Change-Id: I680c5b6f76ef6f05f360e2f3b9c4cbb927e15d7d Signed-off-by: Ole Troan <ot@cisco.com>
2020-11-25api: vat2 and json autogeneration for api messagesOle Troan1-0/+514
VAT2: A completely auto-generated replacement of VAT. Reads input message in JSON from stdin and outputs received messages in JSON. A VAT2 plugin is automatically built for a .api file. There no longer a need for a separate _test.c. Example: vat2 show_version {} { "_msgname": "show_version_reply", "retval": 0, "program": "vpe", "version": "21.01-rc0~411-gf6eb348a6", "build_date": "2020-11-19T09:49:25", "build_directory": "/vpp/autogen3" } vat2 sw_interface_dump '{"sw_if_index": -1, "name_filter_valid": 0, "name_filter": ""}' [{ "_msgname": "sw_interface_details", "sw_if_index": 0, "sup_sw_if_index": 0, "l2_address": "00:00:00:00:00:00", "flags": "Invalid ENUM", "type": "IF_API_TYPE_HARDWARE", "link_duplex": "LINK_DUPLEX_API_UNKNOWN", "link_speed": 0, "link_mtu": 0, "mtu": [0, 0, 0, 0], "sub_id": 0, "sub_number_of_tags": 0, "sub_outer_vlan_id": 0, "sub_inner_vlan_id": 0, "sub_if_flags": "Invalid ENUM", "vtr_op": 0, "vtr_push_dot1q": 0, "vtr_tag1": 0, "vtr_tag2": 0, "outer_tag": 0, "b_dmac": "00:00:00:00:00:00", "b_smac": "00:00:00:00:00:00", "b_vlanid": 0, "i_sid": 0, "interface_name": "local0", "interface_dev_type": "local", "tag": "" }] This is the first phase and vat2 is not integrated in packaging yet. Type: feature Signed-off-by: Ole Troan <ot@cisco.com> Change-Id: Ib45ddeafb180ea7da8c5dc274a9274d7a4edc876 Signed-off-by: Ole Troan <ot@cisco.com>
n157'>157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
from ipaddress import IPv4Address, AddressValueError
from vpp_object import VppObject
from vpp_papi import VppEnum


class AuthMethod:
    v = {"rsa-sig": 1, "shared-key": 2}

    @staticmethod
    def value(key):
        return AuthMethod.v[key]


class IDType:
    v = {"ip4-addr": 1, "fqdn": 2, "ip6-addr": 5}

    @staticmethod
    def value(key):
        return IDType.v[key]


class Profile(VppObject):
    """IKEv2 profile"""

    def __init__(self, test, profile_name):
        self.test = test
        self.vapi = test.vapi
        self.profile_name = profile_name
        self.udp_encap = False
        self.natt = True

    def disable_natt(self):
        self.natt = False

    def add_auth(self, method, data, is_hex=False):
        if isinstance(method, int):
            m = method
        elif isinstance(method, str):
            m = AuthMethod.value(method)
        else:
            raise Exception("unsupported type {}".format(method))
        self.auth = {"auth_method": m, "data": data, "is_hex": is_hex}

    def add_local_id(self, id_type, data):
        if isinstance(id_type, str):
            t = IDType.value(id_type)
        self.local_id = {"id_type": t, "data": data, "is_local": True}

    def add_remote_id(self, id_type, data):
        if isinstance(id_type, str):
            t = IDType.value(id_type)
        self.remote_id = {"id_type": t, "data": data, "is_local": False}

    def add_local_ts(
        self, start_addr, end_addr, start_port=0, end_port=0xFFFF, proto=0, is_ip4=True
    ):
        self.ts_is_ip4 = is_ip4
        self.local_ts = {
            "is_local": True,
            "protocol_id": proto,
            "start_port": start_port,
            "end_port": end_port,
            "start_addr": start_addr,
            "end_addr": end_addr,
        }

    def add_remote_ts(
        self, start_addr, end_addr, start_port=0, end_port=0xFFFF, proto=0
    ):
        try:
            IPv4Address(start_addr)
            is_ip4 = True
        except AddressValueError:
            is_ip4 = False
        self.ts_is_ip4 = is_ip4
        self.remote_ts = {
            "is_local": False,
            "protocol_id": proto,
            "start_port": start_port,
            "end_port": end_port,
            "start_addr": start_addr,
            "end_addr": end_addr,
        }

    def add_responder_hostname(self, hn):
        self.responder_hostname = hn

    def add_responder(self, responder):
        self.responder = responder

    def add_ike_transforms(self, tr):
        self.ike_transforms = tr

    def add_esp_transforms(self, tr):
        self.esp_transforms = tr

    def set_udp_encap(self, udp_encap):
        self.udp_encap = udp_encap

    def set_lifetime_data(self, data):
        self.lifetime_data = data

    def set_ipsec_over_udp_port(self, port):
        self.ipsec_udp_port = {"is_set": 1, "port": port}

    def set_tunnel_interface(self, sw_if_index):
        self.tun_itf = sw_if_index

    def object_id(self):
        return "ikev2-profile-%s" % self.profile_name

    def remove_vpp_config(self):
        self.vapi.ikev2_profile_add_del(name=self.profile_name, is_add=False)

    def add_vpp_config(self):
        self.vapi.ikev2_profile_add_del(name=self.profile_name, is_add=True)
        if hasattr(self, "auth"):
            self.vapi.ikev2_profile_set_auth(
                name=self.profile_name, data_len=len(self.auth["data"]), **self.auth
            )
        if hasattr(self, "local_id"):
            self.vapi.ikev2_profile_set_id(
                name=self.profile_name,
                data_len=len(self.local_id["data"]),
                **self.local_id,
            )
        if hasattr(self, "remote_id"):
            self.vapi.ikev2_profile_set_id(
                name=self.profile_name,
                data_len=len(self.remote_id["data"]),
                **self.remote_id,
            )
        if hasattr(self, "local_ts"):
            self.vapi.ikev2_profile_set_ts(name=self.profile_name, ts=self.local_ts)

        if hasattr(self, "remote_ts"):
            self.vapi.ikev2_profile_set_ts(name=self.profile_name, ts=self.remote_ts)

        if hasattr(self, "responder"):
            self.vapi.ikev2_set_responder(
                name=self.profile_name, responder=self.responder
            )

        if hasattr(self, "responder_hostname"):
            print(self.responder_hostname)
            self.vapi.ikev2_set_responder_hostname(
                name=self.profile_name, **self.responder_hostname
            )

        if hasattr(self, "ike_transforms"):
            self.vapi.ikev2_set_ike_transforms(
                name=self.profile_name, tr=self.ike_transforms
            )

        if hasattr(self, "esp_transforms"):
            self.vapi.ikev2_set_esp_transforms(
                name=self.profile_name, tr=self.esp_transforms
            )

        if self.udp_encap:
            self.vapi.ikev2_profile_set_udp_encap(name=self.profile_name)

        if hasattr(self, "lifetime_data"):
            self.vapi.ikev2_set_sa_lifetime(
                name=self.profile_name, **self.lifetime_data
            )

        if hasattr(self, "ipsec_udp_port"):
            self.vapi.ikev2_profile_set_ipsec_udp_port(
                name=self.profile_name, **self.ipsec_udp_port
            )
        if hasattr(self, "tun_itf"):
            self.vapi.ikev2_set_tunnel_interface(
                name=self.profile_name, sw_if_index=self.tun_itf
            )

        if not self.natt:
            self.vapi.ikev2_profile_disable_natt(name=self.profile_name)

    def query_vpp_config(self):
        res = self.vapi.ikev2_profile_dump()
        for r in res:
            if r.profile.name == self.profile_name:
                return r.profile
        return None