# Copyright (c) 2020 Cisco and/or its affiliates. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """IPsec utilities library.""" import os from enum import Enum, IntEnum from io import open from random import choice from string import ascii_letters from ipaddress import ip_network, ip_address from resources.libraries.python.Constants import Constants from resources.libraries.python.InterfaceUtil import InterfaceUtil, \ InterfaceStatusFlags from resources.libraries.python.IPAddress import IPAddress from resources.libraries.python.IPUtil import IPUtil, IpDscp, MPLS_LABEL_INVALID from resources.libraries.python.PapiExecutor import PapiSocketExecutor from resources.libraries.python.ssh import scp_node from resources.libraries.python.topology import Topology from resources.libraries.python.VatExecutor import VatExecutor IPSEC_UDP_PORT_NONE = 0xffff def gen_key(length): """Generate random string as a key. :param length: Length of generated payload. :type length: int :returns: The generated payload. :rtype: bytes """ return u"".join( choice(ascii_letters) for _ in range(length) ).encode(encoding=u"utf-8") class PolicyAction(Enum): """Policy actions.""" BYPASS = (u"bypass", 0) DISCARD = (u"discard", 1) PROTECT = (u"protect", 3) def __init__(self, policy_name, policy_int_repr): self.policy_name = policy_name self.policy_int_repr = policy_int_repr class CryptoAlg(Enum): """Encryption algorithms.""" AES_CBC_128 = (u"aes-cbc-128", 1, u"AES-CBC", 16) AES_CBC_256 = (u"aes-cbc-256", 3, u"AES-CBC", 32) AES_GCM_128 = (u"aes-gcm-128", 7, u"AES-GCM", 16) AES_GCM_256 = (u"aes-gcm-256", 9, u"AES-GCM", 32) def __init__(self, alg_name, alg_int_repr, scapy_name, key_len): self.alg_name = alg_name self.alg_int_repr = alg_int_repr self.scapy_name = scapy_name self.key_len = key_len class IntegAlg(Enum): """Integrity algorithm.""" SHA_256_128 = (u"sha-256-128", 4, u"SHA2-256-128", 32) SHA_512_256 = (u"sha-512-256", 6, u"SHA2-512-256", 64) def __init__(self, alg_name, alg_int_repr, scapy_name, key_len): self.alg_name = alg_name self.alg_int_repr = alg_int_repr self.scapy_name = scapy_name self.key_len = key_len class IPsecProto(IntEnum): """IPsec protocol.""" IPSEC_API_PROTO_ESP = 50 IPSEC_API_PROTO_AH = 51 class IPsecSadFlags(IntEnum): """IPsec Security Association Database flags.""" IPSEC_API_SAD_FLAG_NONE = 0, # Enable extended sequence numbers IPSEC_API_SAD_FLAG_USE_ESN = 0x01, # Enable Anti - replay IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY = 0x02, # IPsec tunnel mode if non-zero, else transport mode IPSEC_API_SAD_FLAG_IS_TUNNEL = 0x04, # IPsec tunnel mode is IPv6 if non-zero, else IPv4 tunnel # only valid if is_tunnel is non-zero IPSEC_API_SAD_FLAG_IS_TUNNEL_V6 = 0x08, # Enable UDP encapsulation for NAT traversal IPSEC_API_SAD_FLAG_UDP_ENCAP = 0x10, # IPsec SA is or inbound traffic IPSEC_API_SAD_FLAG_IS_INBOUND = 0x40 class TunnelEncpaDecapFlags(IntEnum): """Flags controlling tunnel behaviour.""" TUNNEL_API_ENCAP_DECAP_FLAG_NONE = 0 # at encap, copy the DF bit of the payload into the tunnel header TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DF = 1 # at encap, set the DF bit in the tunnel header TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_SET_DF = 2 # at encap, copy the DSCP bits of the payload into the tunnel header TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP = 4 # at encap, copy the ECN bit of the payload into the tunnel header TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN = 8 # at decap, copy the ECN bit of the tunnel header into the payload TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_SET_ECN = 16 class TunnelMode(IntEnum): """Tunnel modes.""" # point-to-point TUNNEL_API_MODE_P2P = 0 # multi-point TUNNEL_API_MODE_MP = 1 class IPsecUtil: """IPsec utilities.""" @staticmethod def policy_action_bypass(): """Return policy action bypass. :returns: PolicyAction enum BYPASS object. :rtype: PolicyAction """ return PolicyAction.BYPASS @staticmethod def policy_action_discard(): """Return policy action discard. :returns: PolicyAction enum DISCARD object. :rtype: PolicyAction """ return PolicyAction.DISCARD @staticmethod def policy_action_protect(): """Return policy action protect. :returns: PolicyAction enum PROTECT object. :rtype: PolicyAction """ return PolicyAction.PROTECT @staticmethod def crypto_alg_aes_cbc_128(): """Return encryption algorithm aes-cbc-128. :returns: CryptoAlg enum AES_CBC_128 object. :rtype: CryptoAlg """ return CryptoAlg.AES_CBC_128 @staticmethod def crypto_alg_aes_cbc_256(): """Return encryption algorithm aes-cbc-256. :returns: CryptoAlg enum AES_CBC_256 object. :rtype: CryptoAlg """ return CryptoAlg.AES_CBC_256 @staticmethod def crypto_alg_aes_gcm_128(): """Return encryption algorithm aes-gcm-128. :returns: CryptoAlg enum AES_GCM_128 object. :rtype: CryptoAlg """ return CryptoAlg.AES_GCM_128 @staticmethod def crypto_alg_aes_gcm_256(): """Return encryption algorithm aes-gcm-256. :returns: CryptoAlg enum AES_GCM_128 object. :rtype: CryptoAlg """ return CryptoAlg.AES_GCM_256 @staticmethod def get_crypto_alg_key_len(crypto_alg): """Return encryption algorithm key length. :param crypto_alg: Encryption algorithm. :type crypto_alg: CryptoAlg :returns: Key length. :rtype: int """ return crypto_alg.key_len @staticmethod def get_crypto_alg_scapy_name(crypto_alg): """Return encryption algorithm scapy name. :param crypto_alg: Encryption algorithm. :type crypto_alg: CryptoAlg :returns: Algorithm scapy name. :rtype: str """ return crypto_alg.scapy_name @staticmethod def integ_alg_sha_256_128(): """Return integrity algorithm SHA-256-128. :returns: IntegAlg enum SHA_256_128 object. :rtype: IntegAlg """ return IntegAlg.SHA_256_128 @staticmethod def integ_alg_sha_512_256(): """Return integrity algorithm SHA-512-256. :returns: IntegAlg enum SHA_512_256 object. :rtype: IntegAlg """ return IntegAlg.SHA_512_256 @staticmethod def get_integ_alg_key_len(integ_alg): """Return integrity algorithm key length. :param integ_alg: Integrity algorithm. :type integ_alg: IntegAlg :returns: Key length. :rtype: int """ return integ_alg.key_len @staticmethod def get_integ_alg_scapy_name(integ_alg): """Return integrity algorithm scapy name. :param integ_alg: Integrity algorithm. :type integ_alg: IntegAlg :returns: Algorithm scapy name. :rtype: str """ return integ_alg.scapy_name @staticmethod def ipsec_proto_esp(): """Return IPSec protocol ESP. :returns: IPsecProto enum ESP object. :rtype: IPsecProto """ return int(IPsecProto.IPSEC_API_PROTO_ESP) @staticmethod def ipsec_proto_ah(): """Return IPSec protocol AH. :returns: IPsecProto enum AH object. :rtype: IPsecProto """ return int(IPsecProto.IPSEC_API_PROTO_AH) @staticmethod def vpp_ipsec_select_backend(node, protocol, index=1): """Select IPsec backend. :param node: VPP node to select IPsec backend on. :param protocol: IPsec protocol. :param index: Backend index. :type node: dict :type protocol: IPsecProto :type index: int :raises RuntimeError: If failed to select IPsec backend or if no API reply received. """ cmd = u"ipsec_select_backend" err_msg = f"Failed to select IPsec backend on host {node[u'host']}" args = dict( protocol=protocol, index=index ) with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) @staticmethod def vpp_ipsec_set_async_mode(node, async_enable=1): """Set IPsec async mode on|off. :param node: VPP node to set IPsec async mode. :param async_enable: Async mode on or off. :type node: dict :type async_enable: int :raises RuntimeError: If failed to set IPsec async mode or if no API reply received. """ cmd = u"ipsec_set_async_mode" err_msg = f"Failed to set IPsec async mode on host {node[u'host']}" args = dict( async_enable=async_enable ) with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) @staticmethod def vpp_ipsec_add_sad_entry( node, sad_id, spi, crypto_alg, crypto_key, integ_alg=None, integ_key=u"", tunnel_src=None, tunnel_dst=None): """Create Security Association Database entry on the VPP node. :param node: VPP node to add SAD entry on. :param sad_id: SAD entry ID. :param spi: Security Parameter Index of this SAD entry. :param crypto_alg: The encryption algorithm name. :param crypto_key: The encryption key string. :param integ_alg: The integrity algorithm name. :param integ_key: The integrity key string. :param tunnel_src: Tunnel header source IPv4 or IPv6 address. If not specified ESP transport mode is used. :param tunnel_dst: Tunnel header destination IPv4 or IPv6 address. If not specified ESP transport mode is used. :type node: dict :type sad_id: int :type spi: int :type crypto_alg: CryptoAlg :type crypto_key: str :type integ_alg: IntegAlg :type integ_key: str :type tunnel_src: str :type tunnel_dst: str """ if isinstance(crypto_key, str): crypto_key = crypto_key.encode(encoding=u"utf-8") if isinstance(integ_key, str): integ_key = integ_key.encode(encoding=u"utf-8") ckey = dict( length=len(crypto_key), data=crypto_key ) ikey = dict( length=len(integ_key), data=integ_key if integ_key else 0 ) flags = int(IPsecSadFlags.IPSEC_API_SAD_FLAG_NONE) if tunnel_src and tunnel_dst: flags = flags | int(IPsecSadFlags.IPSEC_API_SAD_FLAG_IS_TUNNEL) src_addr = ip_address(tunnel_src) dst_addr = ip_address(tunnel_dst) if src_addr.version == 6: flags = \ flags | int(IPsecSadFlags.IPSEC_API_SAD_FLAG_IS_TUNNEL_V6) else: src_addr = u"" dst_addr = u"" cmd = u"ipsec_sad_entry_add_del" err_msg = f"Failed to add Security Association Database entry " \ f"on host {node[u'host']}" sad_entry = dict( sad_id=int(sad_id), spi=int(spi), crypto_algorithm=crypto_alg.alg_int_repr, crypto_key=ckey, integrity_algorithm=integ_alg.alg_int_repr if integ_alg else 0, integrity_key=ikey, flags=flags, tunnel_src=str(src_addr), tunnel_dst=str(dst_addr), protocol=int(IPsecProto.IPSEC_API_PROTO_ESP), udp_src_port=4500, # default value in api udp_dst_port=4500 # default value in api ) args = dict( is_add=True, entry=sad_entry ) with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) @staticmethod def vpp_ipsec_add_sad_entries( node, n_entries, sad_id, spi, crypto_alg, crypto_key, integ_alg=None, integ_key=u"", tunnel_src=None, tunnel_dst=None): """Create multiple Security Association Database entries on VPP node. :param node: VPP node to add SAD entry on. :param n_entries: Number of SAD entries to be created. :param sad_id: First SAD entry ID. All subsequent SAD entries will have id incremented by 1. :param spi: Security Parameter Index of first SAD entry. All subsequent SAD entries will have spi incremented by 1. :param crypto_alg: The encryption algorithm name. :param crypto_key: The encryption key string. :param integ_alg: The integrity algorithm name. :param integ_key: The integrity key string. :param tunnel_src: Tunnel header source IPv4 or IPv6 address. If not specified ESP transport mode is used. :param tunnel_dst: Tunnel header destination IPv4 or IPv6 address. If not specified ESP transport mode is used. :type node: dict :type n_entries: int :type sad_id: int :type spi: int :type crypto_alg: CryptoAlg :type crypto_key: str :type integ_alg: IntegAlg :type integ_key: str :type tunnel_src: str :type tunnel_dst: str """ if isinstance(crypto_key, str): crypto_key = crypto_key.encode(encoding=u"utf-8") if isinstance(integ_key, str): integ_key = integ_key.encode(encoding=u"utf-8") if tunnel_src and tunnel_dst: src_addr = ip_address(tunnel_src) dst_addr = ip_address(tunnel_dst) else: src_addr = u"" dst_addr = u"" addr_incr = 1 << (128 - 96) if src_addr.version == 6 \ else 1 << (32 - 24) if int(n_entries) > 10: tmp_filename = f"/tmp/ipsec_sad_{sad_id}_add_del_entry.script" with open(tmp_filename, 'w') as tmp_file: for i in range(n_entries): integ = f"integ-alg {integ_alg.alg_name} " \ f"integ-key {integ_key.hex()}" \ if integ_alg else u"" tunnel = f"tunnel-src {src_addr + i * addr_incr} " \ f"tunnel-dst {dst_addr + i * addr_incr}" \ if tunnel_src and tunnel_dst else u"" conf = f"exec ipsec sa add {sad_id + i} esp spi {spi + i} "\ f"crypto-alg {crypto_alg.alg_name} " \ f"crypto-key {crypto_key.hex()} " \ f"{integ} {tunnel}\n" tmp_file.write(conf) vat = VatExecutor() vat.execute_script( tmp_filename, node, timeout=300, json_out=False, copy_on_execute=True ) os.remove(tmp_filename) return ckey = dict( length=len(crypto_key), data=crypto_key ) ikey = dict( length=len(integ_key), data=integ_key if integ_key else 0 ) flags = int(IPsecSadFlags.IPSEC_API_SAD_FLAG_NONE) if tunnel_src and tunnel_dst: flags = flags | int(IPsecSadFlags.IPSEC_API_SAD_FLAG_IS_TUNNEL) if src_addr.version == 6: flags = flags | int( IPsecSadFlags.IPSEC_API_SAD_FLAG_IS_TUNNEL_V6 ) cmd = u"ipsec_sad_entry_add_del" err_msg = f"Failed to add Security Association Database entry " \ f"on host {node[u'host']}" sad_entry = dict( sad_id=int(sad_id), spi=int(spi), crypto_algorithm=crypto_alg.alg_int_repr, crypto_key=ckey, integrity_algorithm=integ_alg.alg_int_repr if integ_alg else 0, integrity_key=ikey, flags=flags, tunnel_src=str(src_addr), tunnel_dst=str(dst_addr), protocol=int(IPsecProto.IPSEC_API_PROTO_ESP), udp_src_port=4500, # default value in api udp_dst_port=4500 # default value in api ) args = dict( is_add=True, entry=sad_entry ) with PapiSocketExecutor(node) as papi_exec: for i in range(n_entries): args[u"entry"][u"sad_id"] = int(sad_id) + i args[u"entry"][u"spi"] = int(spi) + i args[u"entry"][u"tunnel_src"] = str(src_addr + i * addr_incr) \ if tunnel_src and tunnel_dst else src_addr args[u"entry"][u"tunnel_dst"] = str(dst_addr + i * addr_incr) \ if tunnel_src and tunnel_dst else dst_addr history = bool(not 1 < i < n_entries - 2) papi_exec.add(cmd, history=history, **args) papi_exec.get_replies(err_msg) @staticmethod def vpp_ipsec_set_ip_route( node, n_tunnels, tunnel_src, traffic_addr, tunnel_dst, interface, raddr_range): """Set IP address and route on interface. :param node: VPP node to add config on. :param n_tunnels: Number of tunnels to create. :param tunnel_src: Tunnel header source IPv4 or IPv6 address. :param traffic_addr: Traffic destination IP address to route. :param tunnel_dst: Tunnel header destination IPv4 or IPv6 address. :param interface: Interface key on node 1. :param raddr_range: Mask specifying range of Policy selector Remote IP addresses. Valid values are from 1 to 32 in case of IPv4 and to 128 in case of IPv6. :type node: dict :type n_tunnels: int :type tunnel_src: str :type traffic_addr: str :type tunnel_dst: str :type interface: str :type raddr_range: int """ tunnel_src = ip_address(tunnel_src) tunnel_dst = ip_address(tunnel_dst) traffic_addr = ip_address(traffic_addr) addr_incr = 1 << (128 - raddr_range) if tunnel_src.version == 6 \ else 1 << (32 - raddr_range) if int(n_tunnels) > 10: tmp_filename = u"/tmp/ipsec_set_ip.script" with open(tmp_filename, 'w') as tmp_file: if_name = Topology.get_interface_name(node, interface) for i in range(n_tunnels): conf = f"exec set interface ip address {if_name} " \ f"{tunnel_src + i * addr_incr}/{raddr_range}\n" \ f"exec ip route add {traffic_addr + i}/" \ f"{128 if traffic_addr.version == 6 else 32} " \ f"via {tunnel_dst + i * addr_incr} {if_name}\n" tmp_file.write(conf) VatExecutor().execute_script( tmp_filename, node, timeout=300, json_out=False, copy_on_execute=True ) os.remove(tmp_filename) return cmd1 = u"sw_interface_add_del_address" args1 = dict( sw_if_index=InterfaceUtil.get_interface_index(node, interface), is_add=True, del_all=False, prefix=None ) cmd2 = u"ip_route_add_del" args2 = dict( is_add=1, is_multipath=0, route=None ) err_msg = f"Failed to configure IP addresses and IP routes " \ f"on interface {interface} on host {node[u'host']}" with PapiSocketExecutor(node) as papi_exec: for i in range(n_tunnels): args1[u"prefix"] = IPUtil.create_prefix_object( tunnel_src + i * addr_incr, raddr_range ) args2[u"route"] = IPUtil.compose_vpp_route_structure( node, traffic_addr + i, prefix_len=128 if traffic_addr.version == 6 else 32, interface=interface, gateway=tunnel_dst + i * addr_incr ) history = bool(not 1 < i < n_tunnels - 2) papi_exec.add(cmd1, history=history, **args1).\ add(cmd2, history=history, **args2) papi_exec.get_replies(err_msg) @staticmethod def vpp_ipsec_add_spd(node, spd_id): """Create Security Policy Database on the VPP node. :param node: VPP node to add SPD on. :param spd_id: SPD ID. :type node: dict :type spd_id: int """ cmd = u"ipsec_spd_add_del" err_msg = f"Failed to add Security Policy Database " \ f"on host {node[u'host']}" args = dict( is_add=True, spd_id=int(spd_id) ) with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) @staticmethod def vpp_ipsec_spd_add_if(node, spd_id, interface): """Add interface to the Security Policy Database. :param node: VPP node. :param spd_id: SPD ID to add interface on. :param interface: Interface name or sw_if_index. :type node: dict :type spd_id: int :type interface: str or int """ cmd = u"ipsec_interface_add_del_spd" err_msg = f"Failed to add interface {interface} to Security Policy " \ f"Database {spd_id} on host {node[u'host']}" args = dict( is_add=True, sw_if_index=InterfaceUtil.get_interface_index(node, interface), spd_id=int(spd_id) ) with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) @staticmethod def vpp_ipsec_policy_add( node, spd_id, priority, action, inbound=True, sa_id=None, laddr_range=None, raddr_range=None, proto=None, lport_range=None, rport_range=None, is_ipv6=False): """Create Security Policy Database entry on the VPP node. :param node: VPP node to add SPD entry on. :param spd_id: SPD ID to add entry on. :param priority: SPD entry priority, higher number = higher priority. :param action: Policy action. :param inbound: If True policy is for inbound traffic, otherwise outbound. :param sa_id: SAD entry ID for protect action. :param laddr_range: Policy selector local IPv4 or IPv6 address range in format IP/prefix or IP/mask. If no mask is provided, it's considered to be /32. :param raddr_range: Policy selector remote IPv4 or IPv6 address range in format IP/prefix or IP/mask. If no mask is provided, it's considered to be /32. :param proto: Policy selector next layer protocol number. :param lport_range: Policy selector local TCP/UDP port range in format -. :param rport_range: Policy selector remote TCP/UDP port range in format -. :param is_ipv6: True in case of IPv6 policy when IPv6 address range is not defined so it will default to address ::/0, otherwise False. :type node: dict :type spd_id: int :type priority: int :type action: PolicyAction :type inbound: bool :type sa_id: int :type laddr_range: string :type raddr_range: string :type proto: int :type lport_range: string :type rport_range: string :type is_ipv6: bool """ if laddr_range is None: laddr_range = u"::/0" if is_ipv6 else u"0.0.0.0/0" if raddr_range is None: raddr_range = u"::/0" if is_ipv6 else u"0.0.0.0/0" cmd = u"ipsec_spd_entry_add_del" err_msg = f"Failed to add entry to Security Policy Database {spd_id} " \ f"on host {node[u'host']}" spd_entry = dict( spd_id=int(spd_id), priority=int(priority), is_outbound=not inbound, sa_id=int(sa_id) if sa_id else 0, policy=action.policy_int_repr, protocol=int(proto) if proto else 0, remote_address_start=IPAddress.create_ip_address_object( ip_network(raddr_range, strict=False).network_address ), remote_address_stop=IPAddress.create_ip_address_object( ip_network(raddr_range, strict=False).broadcast_address ), local_address_start=IPAddress.create_ip_address_object( ip_network(laddr_range, strict=False).network_address ), local_address_stop=IPAddress.create_ip_address_object( ip_network(laddr_range, strict=False).broadcast_address ), remote_port_start=int(rport_range.split(u"-")[0]) if rport_range else 0, remote_port_stop=int(rport_range.split(u"-")[1]) if rport_range else 65535, local_port_start=int(lport_range.split(u"-")[0]) if lport_range else 0, local_port_stop=int(lport_range.split(u"-")[1]) if rport_range else 65535 ) args = dict( is_add=True, entry=spd_entry ) with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) @staticmethod def vpp_ipsec_spd_add_entries( node, n_entries, spd_id, priority, inbound, sa_id, raddr_ip, raddr_range=0): """Create multiple Security Policy Database entries on the VPP node. :param node: VPP node to add SPD entries on. :param n_entries: Number of SPD entries to be added. :param spd_id: SPD ID to add entries on. :param priority: SPD entries priority, higher number = higher priority. :param inbound: If True policy is for inbound traffic, otherwise outbound. :param sa_id: SAD entry ID for first entry. Each subsequent entry will
# Copyright (c) 2020 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Special test configurations library."""

from ipaddress import ip_address, AddressValueError
from robot.api import logger

from resources.libraries.python.Constants import Constants
from resources.libraries.python.InterfaceUtil import InterfaceUtil, \
    InterfaceStatusFlags
from resources.libraries.python.IPAddress import IPAddress
from resources.libraries.python.IPUtil import IPUtil
from resources.libraries.python.PapiExecutor import PapiSocketExecutor
from resources.libraries.python.topology import Topology
from resources.libraries.python.VatExecutor import VatExecutor


class TestConfig:
    """Contains special test configurations implemented in python for faster
    execution."""

    @staticmethod
    def vpp_create_multiple_vxlan_ipv4_tunnels(
            node, node_vxlan_if, node_vlan_if, op_node, op_node_if,
            n_tunnels, vni_start, src_ip_start, dst_ip_start, ip_step,
            bd_id_start):
        """Create multiple VXLAN tunnel interfaces and VLAN sub-interfaces on
        VPP node.

        Put each pair of VXLAN tunnel interface and VLAN sub-interface to
        separate bridge-domain.

        :param node: VPP node to create VXLAN tunnel interfaces.
        :param node_vxlan_if: VPP node interface key to create VXLAN tunnel
            interfaces.
        :param node_vlan_if: VPP node interface key to create VLAN
            sub-interface.
        :param op_node: Opposite VPP node for VXLAN tunnel interfaces.
        :param op_node_if: Opposite VPP node interface key for VXLAN tunnel
            interfaces.
        :param n_tunnels: Number of tunnel interfaces to create.
        :param vni_start: VNI start ID.
        :param src_ip_start: VXLAN tunnel source IP address start.
        :param dst_ip_start: VXLAN tunnel destination IP address start.
        :param ip_step: IP address incremental step.
        :param bd_id_start: Bridge-domain ID start.
        :type node: dict
        :type node_vxlan_if: str
        :type node_vlan_if: str
        :type op_node: dict
        :type op_node_if: str
        :type n_tunnels: int
        :type vni_start: int
        :type src_ip_start: str
        :type dst_ip_start: str
        :type ip_step: int
        :type bd_id_start: int
        """
        # configure IPs, create VXLAN interfaces and VLAN sub-interfaces
        vxlan_count = TestConfig.vpp_create_vxlan_and_vlan_interfaces(
            node, node_vxlan_if, node_vlan_if, n_tunnels, vni_start,
            src_ip_start, dst_ip_start, ip_step
        )

        # update topology with VXLAN interfaces and VLAN sub-interfaces data
        # and put interfaces up
        TestConfig.vpp_put_vxlan_and_vlan_interfaces_up(
            node, vxlan_count, node_vlan_if
        )

        # configure bridge domains, ARPs and routes
        TestConfig.vpp_put_vxlan_and_vlan_interfaces_to_bridge_domain(
            node, node_vxlan_if, vxlan_count, op_node, op_node_if, dst_ip_start,
            ip_step, bd_id_start
        )

    @staticmethod
    def vpp_create_vxlan_and_vlan_interfaces(
            node, node_vxlan_if, node_vlan_if, vxlan_count, vni_start,
            src_ip_start, dst_ip_start, ip_step):
        """
        Configure IPs, create VXLAN interfaces and VLAN sub-interfaces on VPP
        node.

        :param node: VPP node.
        :param node_vxlan_if: VPP node interface key to create VXLAN tunnel
            interfaces.
        :param node_vlan_if: VPP node interface key to create VLAN
            sub-interface.
        :param vxlan_count: Number of tunnel interfaces to create.
        :param vni_start: VNI start ID.
        :param src_ip_start: VXLAN tunnel source IP address start.
        :param dst_ip_start: VXLAN tunnel destination IP address start.
        :param ip_step: IP address incremental step.
        :type node: dict
        :type node_vxlan_if: str
        :type node_vlan_if: str
        :type vxlan_count: int
        :type vni_start: int
        :type src_ip_start: str
        :type dst_ip_start: str
        :type ip_step: int
        :returns: Number of created VXLAN interfaces.
        :rtype: int
        """
        src_ip_start = ip_address(src_ip_start)
        dst_ip_start = ip_address(dst_ip_start)

        if vxlan_count > 10:
            commands = list()
            for i in range(0, vxlan_count):
                try:
                    src_ip = src_ip_start + i * ip_step
                    dst_ip = dst_ip_start + i * ip_step
                except AddressValueError:
                    logger.warn(
                        u"Can't do more iterations - IP address limit "
                        u"has been reached."
                    )
                    vxlan_count = i
                    break
                commands.append(
                    f"sw_interface_add_del_address sw_if_index "
                    f"{Topology.get_interface_sw_index(node, node_vxlan_if)} "
                    f"{src_ip}/{128 if src_ip.version == 6 else 32}\n"
                )
                commands.append(
                    f"vxlan_add_del_tunnel src {src_ip} dst {dst_ip} "
                    f"vni {vni_start + i}\n"
                )
                commands.append(
                    f"create_vlan_subif sw_if_index "
                    f"{Topology.get_interface_sw_index(node, node_vlan_if)} "
                    f"vlan {i + 1}\n"
                )
            VatExecutor().write_and_execute_script(
                node, u"/tmp/create_vxlan_interfaces.config", commands
            )
            return vxlan_count

        cmd1 = u"sw_interface_add_del_address"
        args1 = dict(
            sw_if_index=InterfaceUtil.get_interface_index(node, node_vxlan_if),
            is_add=True,
            del_all=False,
            prefix=None
        )
        cmd2 = u"vxlan_add_del_tunnel"
        args2 = dict(
            is_add=True,
            instance=Constants.BITWISE_NON_ZERO,
            src_address=None,
            dst_address=None,
            mcast_sw_if_index=Constants.BITWISE_NON_ZERO,
            encap_vrf_id=0,
            decap_next_index=Constants.BITWISE_NON_ZERO,
            vni=None
        )
        cmd3 = u"create_vlan_subif"
        args3 = dict(
            sw_if_index=InterfaceUtil.get_interface_index(
                node, node_vlan_if),
            vlan_id=None
        )

        with PapiSocketExecutor(node) as papi_exec:
            for i in range(0, vxlan_count):
                try:
                    src_ip = src_ip_start + i * ip_step
                    dst_ip = dst_ip_start + i * ip_step
                except AddressValueError:
                    logger.warn(
                        u"Can't do more iterations - IP address limit "
                        u"has been reached."
                    )
                    vxlan_count = i
                    break
                args1[u"prefix"] = IPUtil.create_prefix_object(
                    src_ip, 128 if src_ip_start.version == 6 else 32
                )
                args2[u"src_address"] = IPAddress.create_ip_address_object(
                    src_ip
                )
                args2[u"dst_address"] = IPAddress.create_ip_address_object(
                    dst_ip
                )
                args2[u"vni"] = int(vni_start) + i
                args3[u"vlan_id"] = i + 1
                history = bool(not 1 < i < vxlan_count - 1)
                papi_exec.add(cmd1, history=history, **args1).\
                    add(cmd2, history=history, **args2).\
                    add(cmd3, history=history, **args3)
            papi_exec.get_replies()

        return vxlan_count

    @staticmethod
    def vpp_put_vxlan_and_vlan_interfaces_up(node, vxlan_count, node_vlan_if):
        """
        Update topology with VXLAN interfaces and VLAN sub-interfaces data
        and put interfaces up.

        :param node: VPP node.
        :param vxlan_count: Number of tunnel interfaces.
        :param node_vlan_if: VPP node interface key where VLAN sub-interfaces
            have been created.
        :type node: dict
        :type vxlan_count: int
        :type node_vlan_if: str
        """
        if_data = InterfaceUtil.vpp_get_interface_data(node)
        if vxlan_count > 10:
            commands = list()
            for i in range(0, vxlan_count):
                vxlan_subif_key = Topology.add_new_port(node, u"vxlan_tunnel")
                vxlan_subif_name = f"vxlan_tunnel{i}"
                founds = dict(vxlan=False, vlan=False)
                vxlan_subif_idx = None
                vlan_subif_key = Topology.add_new_port(node, u"vlan_subif")
                vlan_subif_name = \
                    f"{Topology.get_interface_name(node, node_vlan_if)}.{i + 1}"
                vlan_idx = None
                for data in if_data:
                    if_name = data[u"interface_name"]
                    if not founds[u"vxlan"] and if_name == vxlan_subif_name:
                        vxlan_subif_idx = data[u"sw_if_index"]
                        founds[u"vxlan"] = True
                    elif not founds[u"vlan"] and if_name == vlan_subif_name:
                        vlan_idx = data[u"sw_if_index"]
                        founds[u"vlan"] = True
                    if founds[u"vxlan"] and founds[u"vlan"]:
                        break
                Topology.update_interface_sw_if_index(
                    node, vxlan_subif_key, vxlan_subif_idx)
                Topology.update_interface_name(
                    node, vxlan_subif_key, vxlan_subif_name)
                commands.append(
                    f"sw_interface_set_flags sw_if_index {vxlan_subif_idx} "
                    f"admin-up link-up\n"
                )
                Topology.update_interface_sw_if_index(
                    node, vlan_subif_key, vlan_idx
                )
                Topology.update_interface_name(
                    node, vlan_subif_key, vlan_subif_name
                )
                commands.append(
                    f"sw_interface_set_flags sw_if_index {vlan_idx} admin-up "
                    f"link-up\n"
                )
            VatExecutor().write_and_execute_script(
                node, u"/tmp/put_subinterfaces_up.config", commands
            )
            return

        cmd = u"sw_interface_set_flags"
        args1 = dict(
            sw_if_index=None,
            flags=InterfaceStatusFlags.IF_STATUS_API_FLAG_ADMIN_UP.value
        )
        args2 = dict(
            sw_if_index=None,
            flags=InterfaceStatusFlags.IF_STATUS_API_FLAG_ADMIN_UP.value
        )

        with PapiSocketExecutor(node) as papi_exec:
            for i in range(0, vxlan_count):
                vxlan_subif_key = Topology.add_new_port(node, u"vxlan_tunnel")
                vxlan_subif_name = f"vxlan_tunnel{i}"
                founds = dict(vxlan=False, vlan=False)
                vxlan_subif_idx = None
                vlan_subif_key = Topology.add_new_port(node, u"vlan_subif")
                vlan_subif_name = \
                    f"{Topology.get_interface_name(node, node_vlan_if)}.{i+1}"
                vlan_idx = None
                for data in if_data:
                    if not founds[u"vxlan"] \
                            and data[u"interface_name"] == vxlan_subif_name:
                        vxlan_subif_idx = data[u"sw_if_index"]
                        founds[u"vxlan"] = True
                    elif not founds[u"vlan"] \
                            and data[u"interface_name"] == vlan_subif_name:
                        vlan_idx = data[u"sw_if_index"]
                        founds[u"vlan"] = True
                    if founds[u"vxlan"] and founds[u"vlan"]:
                        break
                Topology.update_interface_sw_if_index(
                    node, vxlan_subif_key, vxlan_subif_idx
                )
                Topology.update_interface_name(
                    node, vxlan_subif_key, vxlan_subif_name
                )
                args1[u"sw_if_index"] = vxlan_subif_idx
                Topology.update_interface_sw_if_index(
                    node, vlan_subif_key, vlan_idx
                )
                Topology.update_interface_name(
                    node, vlan_subif_key, vlan_subif_name
                )
                args2[u"sw_if_index"] = vlan_idx
                history = bool(not 1 < i < vxlan_count - 1)
                papi_exec.add(cmd, history=history, **args1). \
                    add(cmd, history=history, **args2)
                papi_exec.add(cmd, **args1).add(cmd, **args2)
            papi_exec.get_replies()

    @staticmethod
    def vpp_put_vxlan_and_vlan_interfaces_to_bridge_domain(
            node, node_vxlan_if, vxlan_count, op_node, op_node_if, dst_ip_start,
            ip_step, bd_id_start):
        """
        Configure ARPs and routes for VXLAN interfaces and put each pair of
        VXLAN tunnel interface and VLAN sub-interface to separate bridge-domain.

        :param node: VPP node.
        :param node_vxlan_if: VPP node interface key where VXLAN tunnel
            interfaces have been created.
        :param vxlan_count: Number of tunnel interfaces.
        :param op_node: Opposite VPP node for VXLAN tunnel interfaces.
        :param op_node_if: Opposite VPP node interface key for VXLAN tunnel
            interfaces.
        :param dst_ip_start: VXLAN tunnel destination IP address start.
        :param ip_step: IP address incremental step.
        :param bd_id_start: Bridge-domain ID start.
        :type node: dict
        :type node_vxlan_if: str
        :type vxlan_count: int
        :type op_node: dict
        :type op_node_if:
        :type dst_ip_start: str
        :type ip_step: int
        :type bd_id_start: int
        """
        dst_ip_start = ip_address(dst_ip_start)

        if vxlan_count > 1: