From 163b0ddd64020535a65e8923491d568726936129 Mon Sep 17 00:00:00 2001 From: Vratko Polak Date: Mon, 22 Apr 2024 17:59:42 +0200 Subject: style(ipsec): add type hints to IPsecUtil + Update docstrings to match. Change-Id: I20f8f0247a9cf38a2d9a98fa8905eb55d91817f7 Signed-off-by: Vratko Polak --- resources/libraries/python/IPsecUtil.py | 482 +++++++++++++++++--------------- 1 file changed, 253 insertions(+), 229 deletions(-) (limited to 'resources') diff --git a/resources/libraries/python/IPsecUtil.py b/resources/libraries/python/IPsecUtil.py index dd7bd068fa..8ecfbc3d98 100644 --- a/resources/libraries/python/IPsecUtil.py +++ b/resources/libraries/python/IPsecUtil.py @@ -15,10 +15,11 @@ """IPsec utilities library.""" from enum import Enum, IntEnum -from io import open -from ipaddress import ip_network, ip_address +from io import open, TextIOWrapper +from ipaddress import ip_network, ip_address, IPv4Address, IPv6Address from random import choice from string import ascii_letters +from typing import Iterable, List, Optional, Sequence, Tuple, Union from robot.libraries.BuiltIn import BuiltIn @@ -46,7 +47,7 @@ IPSEC_UDP_PORT_DEFAULT = 4500 IPSEC_REPLAY_WINDOW_DEFAULT = 64 -def gen_key(length): +def gen_key(length: int) -> bytes: """Generate random string as a key. :param length: Length of generated payload. @@ -66,14 +67,14 @@ class PolicyAction(Enum): DISCARD = ("discard", 1) PROTECT = ("protect", 3) - def __init__(self, policy_name, policy_int_repr): + def __init__(self, policy_name: str, policy_int_repr: int): self.policy_name = policy_name self.policy_int_repr = policy_int_repr - def __str__(self): + def __str__(self) -> str: return self.policy_name - def __int__(self): + def __int__(self) -> int: return self.policy_int_repr @@ -85,7 +86,9 @@ class CryptoAlg(Enum): AES_GCM_128 = ("aes-gcm-128", 7, "AES-GCM", 16) AES_GCM_256 = ("aes-gcm-256", 9, "AES-GCM", 32) - def __init__(self, alg_name, alg_int_repr, scapy_name, key_len): + def __init__( + self, alg_name: str, alg_int_repr: int, scapy_name: str, key_len: int + ): self.alg_name = alg_name self.alg_int_repr = alg_int_repr self.scapy_name = scapy_name @@ -98,7 +101,9 @@ class IntegAlg(Enum): SHA_256_128 = ("sha-256-128", 4, "SHA2-256-128", 32) SHA_512_256 = ("sha-512-256", 6, "SHA2-512-256", 64) - def __init__(self, alg_name, alg_int_repr, scapy_name, key_len): + def __init__( + self, alg_name: str, alg_int_repr: int, scapy_name: str, key_len: int + ): self.alg_name = alg_name self.alg_int_repr = alg_int_repr self.scapy_name = scapy_name @@ -160,7 +165,7 @@ class IPsecUtil: """IPsec utilities.""" @staticmethod - def policy_action_bypass(): + def policy_action_bypass() -> PolicyAction: """Return policy action bypass. :returns: PolicyAction enum BYPASS object. @@ -169,7 +174,7 @@ class IPsecUtil: return PolicyAction.BYPASS @staticmethod - def policy_action_discard(): + def policy_action_discard() -> PolicyAction: """Return policy action discard. :returns: PolicyAction enum DISCARD object. @@ -178,7 +183,7 @@ class IPsecUtil: return PolicyAction.DISCARD @staticmethod - def policy_action_protect(): + def policy_action_protect() -> PolicyAction: """Return policy action protect. :returns: PolicyAction enum PROTECT object. @@ -187,7 +192,7 @@ class IPsecUtil: return PolicyAction.PROTECT @staticmethod - def crypto_alg_aes_cbc_128(): + def crypto_alg_aes_cbc_128() -> CryptoAlg: """Return encryption algorithm aes-cbc-128. :returns: CryptoAlg enum AES_CBC_128 object. @@ -196,7 +201,7 @@ class IPsecUtil: return CryptoAlg.AES_CBC_128 @staticmethod - def crypto_alg_aes_cbc_256(): + def crypto_alg_aes_cbc_256() -> CryptoAlg: """Return encryption algorithm aes-cbc-256. :returns: CryptoAlg enum AES_CBC_256 object. @@ -205,7 +210,7 @@ class IPsecUtil: return CryptoAlg.AES_CBC_256 @staticmethod - def crypto_alg_aes_gcm_128(): + def crypto_alg_aes_gcm_128() -> CryptoAlg: """Return encryption algorithm aes-gcm-128. :returns: CryptoAlg enum AES_GCM_128 object. @@ -214,7 +219,7 @@ class IPsecUtil: return CryptoAlg.AES_GCM_128 @staticmethod - def crypto_alg_aes_gcm_256(): + def crypto_alg_aes_gcm_256() -> CryptoAlg: """Return encryption algorithm aes-gcm-256. :returns: CryptoAlg enum AES_GCM_128 object. @@ -223,7 +228,7 @@ class IPsecUtil: return CryptoAlg.AES_GCM_256 @staticmethod - def get_crypto_alg_key_len(crypto_alg): + def get_crypto_alg_key_len(crypto_alg: CryptoAlg) -> int: """Return encryption algorithm key length. :param crypto_alg: Encryption algorithm. @@ -234,7 +239,7 @@ class IPsecUtil: return crypto_alg.key_len @staticmethod - def get_crypto_alg_scapy_name(crypto_alg): + def get_crypto_alg_scapy_name(crypto_alg: CryptoAlg) -> str: """Return encryption algorithm scapy name. :param crypto_alg: Encryption algorithm. @@ -245,7 +250,7 @@ class IPsecUtil: return crypto_alg.scapy_name @staticmethod - def integ_alg_sha_256_128(): + def integ_alg_sha_256_128() -> IntegAlg: """Return integrity algorithm SHA-256-128. :returns: IntegAlg enum SHA_256_128 object. @@ -254,7 +259,7 @@ class IPsecUtil: return IntegAlg.SHA_256_128 @staticmethod - def integ_alg_sha_512_256(): + def integ_alg_sha_512_256() -> IntegAlg: """Return integrity algorithm SHA-512-256. :returns: IntegAlg enum SHA_512_256 object. @@ -263,7 +268,7 @@ class IPsecUtil: return IntegAlg.SHA_512_256 @staticmethod - def get_integ_alg_key_len(integ_alg): + def get_integ_alg_key_len(integ_alg: Optional[IntegAlg]) -> int: """Return integrity algorithm key length. None argument is accepted, returning zero. @@ -276,7 +281,7 @@ class IPsecUtil: return 0 if integ_alg is None else integ_alg.key_len @staticmethod - def get_integ_alg_scapy_name(integ_alg): + def get_integ_alg_scapy_name(integ_alg: Optional[IntegAlg]) -> str: """Return integrity algorithm scapy name. :param integ_alg: Integrity algorithm. @@ -287,7 +292,7 @@ class IPsecUtil: return integ_alg.scapy_name @staticmethod - def ipsec_proto_esp(): + def ipsec_proto_esp() -> int: """Return IPSec protocol ESP. :returns: IPsecProto enum ESP object. @@ -296,7 +301,7 @@ class IPsecUtil: return int(IPsecProto.IPSEC_API_PROTO_ESP) @staticmethod - def ipsec_proto_ah(): + def ipsec_proto_ah() -> int: """Return IPSec protocol AH. :returns: IPsecProto enum AH object. @@ -305,7 +310,9 @@ class IPsecUtil: return int(IPsecProto.IPSEC_API_PROTO_AH) @staticmethod - def vpp_ipsec_select_backend(node, protocol, index=1): + def vpp_ipsec_select_backend( + node: dict, protocol: int, index: int = 1 + ) -> None: """Select IPsec backend. :param node: VPP node to select IPsec backend on. @@ -324,7 +331,7 @@ class IPsecUtil: papi_exec.add(cmd, **args).get_reply(err_msg) @staticmethod - def vpp_ipsec_set_async_mode(node, async_enable=1): + def vpp_ipsec_set_async_mode(node: dict, async_enable: int = 1) -> None: """Set IPsec async mode on|off. Unconditionally, attempt to switch crypto dispatch into polling mode. @@ -354,8 +361,8 @@ class IPsecUtil: @staticmethod def vpp_ipsec_crypto_sw_scheduler_set_worker( - node, workers, crypto_enable=False - ): + node: dict, workers: Iterable[int], crypto_enable: bool = False + ) -> None: """Enable or disable crypto on specific vpp worker threads. :param node: VPP node to enable or disable crypto for worker threads. @@ -379,8 +386,8 @@ class IPsecUtil: @staticmethod def vpp_ipsec_crypto_sw_scheduler_set_worker_on_all_duts( - nodes, crypto_enable=False - ): + nodes: dict, crypto_enable: bool = False + ) -> None: """Enable or disable crypto on specific vpp worker threads. :param node: VPP node to enable or disable crypto for worker threads. @@ -410,16 +417,16 @@ class IPsecUtil: @staticmethod def vpp_ipsec_add_sad_entry( - node, - sad_id, - spi, - crypto_alg, - crypto_key, - integ_alg=None, - integ_key="", - tunnel_src=None, - tunnel_dst=None, - ): + node: dict, + sad_id: int, + spi: int, + crypto_alg: CryptoAlg, + crypto_key: str, + integ_alg: Optional[IntegAlg] = None, + integ_key: str = "", + tunnel_src: Optional[str] = None, + tunnel_dst: Optional[str] = None, + ) -> None: """Create Security Association Database entry on the VPP node. :param node: VPP node to add SAD entry on. @@ -440,8 +447,8 @@ class IPsecUtil: :type crypto_key: str :type integ_alg: Optional[IntegAlg] :type integ_key: str - :type tunnel_src: str - :type tunnel_dst: str + :type tunnel_src: Optional[str] + :type tunnel_dst: Optional[str] """ if isinstance(crypto_key, str): crypto_key = crypto_key.encode(encoding="utf-8") @@ -496,18 +503,18 @@ class IPsecUtil: @staticmethod def vpp_ipsec_add_sad_entries( - node, - n_entries, - sad_id, - spi, - crypto_alg, - crypto_key, - integ_alg=None, - integ_key="", - tunnel_src=None, - tunnel_dst=None, - tunnel_addr_incr=True, - ): + node: dict, + n_entries: int, + sad_id: int, + spi: int, + crypto_alg: CryptoAlg, + crypto_key: str, + integ_alg: Optional[IntegAlg] = None, + integ_key: str = "", + tunnel_src: Optional[str] = None, + tunnel_dst: Optional[str] = None, + tunnel_addr_incr: bool = True, + ) -> None: """Create multiple Security Association Database entries on VPP node. :param node: VPP node to add SAD entry on. @@ -534,8 +541,8 @@ class IPsecUtil: :type crypto_key: str :type integ_alg: Optional[IntegAlg] :type integ_key: str - :type tunnel_src: str - :type tunnel_dst: str + :type tunnel_src: Optional[str] + :type tunnel_dst: Optional[str] :type tunnel_addr_incr: bool """ if isinstance(crypto_key, str): @@ -616,15 +623,15 @@ class IPsecUtil: @staticmethod def vpp_ipsec_set_ip_route( - node, - n_tunnels, - tunnel_src, - traffic_addr, - tunnel_dst, - interface, - raddr_range, - dst_mac=None, - ): + node: dict, + n_tunnels: int, + tunnel_src: str, + traffic_addr: str, + tunnel_dst: str, + interface: str, + raddr_range: int, + dst_mac: Optional[str] = None, + ) -> None: """Set IP address and route on interface. :param node: VPP node to add config on. @@ -644,7 +651,7 @@ class IPsecUtil: :type tunnel_dst: str :type interface: str :type raddr_range: int - :type dst_mac: str + :type dst_mac: Optional[str] """ tunnel_src = ip_address(tunnel_src) tunnel_dst = ip_address(tunnel_dst) @@ -717,7 +724,7 @@ class IPsecUtil: papi_exec.get_replies(err_msg) @staticmethod - def vpp_ipsec_add_spd(node, spd_id): + def vpp_ipsec_add_spd(node: dict, spd_id: int) -> None: """Create Security Policy Database on the VPP node. :param node: VPP node to add SPD on. @@ -734,7 +741,9 @@ class IPsecUtil: papi_exec.add(cmd, **args).get_reply(err_msg) @staticmethod - def vpp_ipsec_spd_add_if(node, spd_id, interface): + def vpp_ipsec_spd_add_if( + node: dict, spd_id: int, interface: Union[str, int] + ) -> None: """Add interface to the Security Policy Database. :param node: VPP node. @@ -759,16 +768,16 @@ class IPsecUtil: @staticmethod def vpp_ipsec_create_spds_match_nth_entry( - node, - dir1_interface, - dir2_interface, - entry_amount, - local_addr_range, - remote_addr_range, - action=PolicyAction.BYPASS, - inbound=False, - bidirectional=True, - ): + node: dict, + dir1_interface: Union[str, int], + dir2_interface: Union[str, int], + entry_amount: int, + local_addr_range: Union[str, IPv4Address, IPv6Address], + remote_addr_range: Union[str, IPv4Address, IPv6Address], + action: PolicyAction = PolicyAction.BYPASS, + inbound: bool = False, + bidirectional: bool = True, + ) -> None: """Create one matching SPD entry for inbound or outbound traffic on a DUT for each traffic direction and also create entry_amount - 1 non-matching SPD entries. Create a Security Policy Database on each @@ -798,14 +807,14 @@ class IPsecUtil: :param bidirectional: When True, will create SPDs in both directions of traffic. When False, only in one direction. :type node: dict - :type dir1_interface: Union[string, int] - :type dir2_interface: Union[string, int] + :type dir1_interface: Union[str, int] + :type dir2_interface: Union[str, int] :type entry_amount: int :type local_addr_range: - Union[string, ipaddress.IPv4Address, ipaddress.IPv6Address] + Union[str, IPv4Address, IPv6Address] :type remote_addr_range: - Union[string, ipaddress.IPv4Address, ipaddress.IPv6Address] - :type action: IPsecUtil.PolicyAction + Union[str, IPv4Address, IPv6Address] + :type action: PolicyAction :type inbound: bool :type bidirectional: bool :raises NotImplementedError: When the action is PolicyAction.PROTECT. @@ -901,19 +910,19 @@ class IPsecUtil: @staticmethod def _vpp_ipsec_add_spd_entry_internal( - executor, - spd_id, - priority, - action, - inbound=True, - sa_id=None, - proto=None, - laddr_range=None, - raddr_range=None, - lport_range=None, - rport_range=None, - is_ipv6=False, - ): + executor: PapiSocketExecutor, + spd_id: int, + priority: int, + action: PolicyAction, + inbound: bool = True, + sa_id: Optional[int] = None, + proto: Optional[int] = None, + laddr_range: Optional[str] = None, + raddr_range: Optional[str] = None, + lport_range: Optional[str] = None, + rport_range: Optional[str] = None, + is_ipv6: bool = False, + ) -> None: """Prepare to create Security Policy Database entry on the VPP node. This just adds one more command to the executor. @@ -943,14 +952,14 @@ class IPsecUtil: :type executor: PapiSocketExecutor :type spd_id: int :type priority: int - :type action: IPsecUtil.PolicyAction + :type action: PolicyAction :type inbound: bool - :type sa_id: int - :type proto: int - :type laddr_range: string - :type raddr_range: string - :type lport_range: string - :type rport_range: string + :type sa_id: Optional[int] + :type proto: Optional[int] + :type laddr_range: Optional[str] + :type raddr_range: Optional[str] + :type lport_range: Optional[str] + :type rport_range: Optional[str] :type is_ipv6: bool """ if laddr_range is None: @@ -1001,19 +1010,19 @@ class IPsecUtil: @staticmethod def vpp_ipsec_add_spd_entry( - node, - spd_id, - priority, - action, - inbound=True, - sa_id=None, - proto=None, - laddr_range=None, - raddr_range=None, - lport_range=None, - rport_range=None, - is_ipv6=False, - ): + node: dict, + spd_id: int, + priority: int, + action: PolicyAction, + inbound: bool = True, + sa_id: Optional[int] = None, + proto: Optional[int] = None, + laddr_range: Optional[str] = None, + raddr_range: Optional[str] = None, + lport_range: Optional[str] = None, + rport_range: Optional[str] = None, + is_ipv6: bool = False, + ) -> None: """Create Security Policy Database entry on the VPP node. :param node: VPP node to add SPD entry on. @@ -1039,14 +1048,14 @@ class IPsecUtil: :type node: dict :type spd_id: int :type priority: int - :type action: IPsecUtil.PolicyAction + :type action: PolicyAction :type inbound: bool - :type sa_id: int - :type proto: int - :type laddr_range: string - :type raddr_range: string - :type lport_range: string - :type rport_range: string + :type sa_id: Optional[int] + :type proto: Optional[int] + :type laddr_range: Optional[str] + :type raddr_range: Optional[str] + :type lport_range: Optional[str] + :type rport_range: Optional[str] :type is_ipv6: bool """ err_msg = ( @@ -1072,20 +1081,20 @@ class IPsecUtil: @staticmethod def vpp_ipsec_add_spd_entries( - node, - n_entries, - spd_id, - priority, - action, - inbound, - sa_id=None, - proto=None, - laddr_range=None, - raddr_range=None, - lport_range=None, - rport_range=None, - is_ipv6=False, - ): + node: dict, + n_entries: int, + spd_id: int, + priority: Optional[ObjIncrement], + action: PolicyAction, + inbound: bool, + sa_id: Optional[ObjIncrement] = None, + proto: Optional[int] = None, + laddr_range: Optional[NetworkIncrement] = None, + raddr_range: Optional[NetworkIncrement] = None, + lport_range: Optional[str] = None, + rport_range: Optional[str] = None, + is_ipv6: bool = False, + ) -> None: """Create multiple Security Policy Database entries on the VPP node. :param node: VPP node to add SPD entries on. @@ -1112,15 +1121,15 @@ class IPsecUtil: :type node: dict :type n_entries: int :type spd_id: int - :type priority: IPsecUtil.ObjIncrement - :type action: IPsecUtil.PolicyAction + :type priority: Optional[ObjIncrement] + :type action: PolicyAction :type inbound: bool - :type sa_id: IPsecUtil.ObjIncrement - :type proto: int - :type laddr_range: IPsecUtil.NetworkIncrement - :type raddr_range: IPsecUtil.NetworkIncrement - :type lport_range: string - :type rport_range: string + :type sa_id: Optional[ObjIncrement] + :type proto: Optional[int] + :type laddr_range: Optional[NetworkIncrement] + :type raddr_range: Optional[NetworkIncrement] + :type lport_range: Optional[str] + :type rport_range: Optional[str] :type is_ipv6: bool """ if laddr_range is None: @@ -1154,7 +1163,9 @@ class IPsecUtil: papi_exec.get_replies(err_msg) @staticmethod - def _ipsec_create_loopback_dut1_papi(nodes, tun_ips, if1_key, if2_key): + def _ipsec_create_loopback_dut1_papi( + nodes: dict, tun_ips: dict, if1_key: str, if2_key: str + ) -> int: """Create loopback interface and set IP address on VPP node 1 interface using PAPI. @@ -1169,6 +1180,8 @@ class IPsecUtil: :type tun_ips: dict :type if1_key: str :type if2_key: str + :returns: sw_if_idx Of the created loopback interface. + :rtype: int """ with PapiSocketExecutor(nodes["DUT1"]) as papi_exec: # Create loopback interface on DUT1, set it to up state @@ -1235,18 +1248,18 @@ class IPsecUtil: @staticmethod def _ipsec_create_tunnel_interfaces_dut1_papi( - nodes, - tun_ips, - if1_key, - if2_key, - n_tunnels, - crypto_alg, - integ_alg, - raddr_ip2, - addr_incr, - spi_d, - existing_tunnels=0, - ): + nodes: dict, + tun_ips: dict, + if1_key: str, + if2_key: str, + n_tunnels: int, + crypto_alg: CryptoAlg, + integ_alg: Optional[IntegAlg], + raddr_ip2: Union[IPv4Address, IPv6Address], + addr_incr: int, + spi_d: dict, + existing_tunnels: int = 0, + ) -> Tuple[List[bytes], List[bytes]]: """Create multiple IPsec tunnel interfaces on DUT1 node using PAPI. Generate random keys and return them (so DUT2 or TG can decrypt). @@ -1274,7 +1287,7 @@ class IPsecUtil: :type n_tunnels: int :type crypto_alg: CryptoAlg :type integ_alg: Optional[IntegAlg] - :type raddr_ip2: IPv4Address or IPv6Address + :type raddr_ip2: Union[IPv4Address, IPv6Address] :type addr_incr: int :type spi_d: dict :type existing_tunnels: int @@ -1480,19 +1493,19 @@ class IPsecUtil: @staticmethod def _ipsec_create_tunnel_interfaces_dut2_papi( - nodes, - tun_ips, - if2_key, - n_tunnels, - crypto_alg, - ckeys, - integ_alg, - ikeys, - raddr_ip1, - addr_incr, - spi_d, - existing_tunnels=0, - ): + nodes: dict, + tun_ips: dict, + if2_key: str, + n_tunnels: int, + crypto_alg: CryptoAlg, + ckeys: Sequence[bytes], + integ_alg: Optional[IntegAlg], + ikeys: Sequence[bytes], + raddr_ip1: Union[IPv4Address, IPv6Address], + addr_incr: int, + spi_d: dict, + existing_tunnels: int = 0, + ) -> None: """Create multiple IPsec tunnel interfaces on DUT2 node using PAPI. This method accesses keys generated by DUT1 method @@ -1509,6 +1522,8 @@ class IPsecUtil: :param ckeys: List of encryption keys. :param integ_alg: The integrity algorithm name. :param ikeys: List of integrity keys. + :param raddr_ip1: Policy selector remote IPv4/IPv6 start address for the + first tunnel in direction node1->node2. :param spi_d: Dictionary with SPIs for VPP node 1 and VPP node 2. :param addr_incr: IP / IPv6 address incremental step. :param existing_tunnels: Number of tunnel interfaces before creation. @@ -1521,6 +1536,7 @@ class IPsecUtil: :type ckeys: Sequence[bytes] :type integ_alg: Optional[IntegAlg] :type ikeys: Sequence[bytes] + :type raddr_ip1: Union[IPv4Address, IPv6Address] :type addr_incr: int :type spi_d: dict :type existing_tunnels: int @@ -1727,20 +1743,20 @@ class IPsecUtil: @staticmethod def vpp_ipsec_create_tunnel_interfaces( - nodes, - tun_if1_ip_addr, - tun_if2_ip_addr, - if1_key, - if2_key, - n_tunnels, - crypto_alg, - integ_alg, - raddr_ip1, - raddr_ip2, - raddr_range, - existing_tunnels=0, - return_keys=False, - ): + nodes: dict, + tun_if1_ip_addr: str, + tun_if2_ip_addr: str, + if1_key: str, + if2_key: str, + n_tunnels: int, + crypto_alg: CryptoAlg, + integ_alg: Optional[IntegAlg], + raddr_ip1: str, + raddr_ip2: str, + raddr_range: int, + existing_tunnels: int = 0, + return_keys: bool = False, + ) -> Optional[Tuple[List[bytes], List[bytes], int, int]]: """Create multiple IPsec tunnel interfaces between two VPP nodes. Some deployments (e.g. devicetest) need to know the generated keys. @@ -1775,14 +1791,14 @@ class IPsecUtil: :type if2_key: str :type n_tunnels: int :type crypto_alg: CryptoAlg - :type integ_alg: Optonal[IntegAlg] - :type raddr_ip1: string - :type raddr_ip2: string + :type integ_alg: Optional[IntegAlg] + :type raddr_ip1: str + :type raddr_ip2: str :type raddr_range: int :type existing_tunnels: int :type return_keys: bool :returns: Ckeys, ikeys, spi_1, spi_2. - :rtype: Optional[List[bytes], List[bytes], int, int] + :rtype: Optional[Tuple[List[bytes], List[bytes], int, int]] """ n_tunnels = int(n_tunnels) existing_tunnels = int(existing_tunnels) @@ -1832,13 +1848,17 @@ class IPsecUtil: return None @staticmethod - def _create_ipsec_script_files(dut, instances): + def _create_ipsec_script_files( + dut: str, instances: int + ) -> List[TextIOWrapper]: """Create script files for configuring IPsec in containers :param dut: DUT node on which to create the script files :param instances: number of containers on DUT node - :type dut: string + :type dut: str :type instances: int + :returns: Created opened file handles. + :rtype: List[TextIOWrapper] """ scripts = [] for cnf in range(0, instances): @@ -1849,14 +1869,16 @@ class IPsecUtil: return scripts @staticmethod - def _close_and_copy_ipsec_script_files(dut, nodes, instances, scripts): + def _close_and_copy_ipsec_script_files( + dut: str, nodes: dict, instances: int, scripts: Sequence[TextIOWrapper] + ) -> None: """Close created scripts and copy them to containers :param dut: DUT node on which to create the script files :param nodes: VPP nodes :param instances: number of containers on DUT node :param scripts: dictionary holding the script files - :type dut: string + :type dut: str :type nodes: dict :type instances: int :type scripts: dict @@ -1870,17 +1892,17 @@ class IPsecUtil: @staticmethod def vpp_ipsec_create_tunnel_interfaces_in_containers( - nodes, - if1_ip_addr, - if2_ip_addr, - n_tunnels, - crypto_alg, - integ_alg, - raddr_ip1, - raddr_ip2, - raddr_range, - n_instances, - ): + nodes: dict, + if1_ip_addr: str, + if2_ip_addr: str, + n_tunnels: int, + crypto_alg: CryptoAlg, + integ_alg: Optional[IntegAlg], + raddr_ip1: str, + raddr_ip2: str, + raddr_range: int, + n_instances: int, + ) -> None: """Create multiple IPsec tunnel interfaces between two VPP nodes. :param nodes: VPP nodes to create tunnel interfaces. @@ -1902,8 +1924,8 @@ class IPsecUtil: :type n_tunnels: int :type crypto_alg: CryptoAlg :type integ_alg: Optional[IntegAlg] - :type raddr_ip1: string - :type raddr_ip2: string + :type raddr_ip1: str + :type raddr_ip2: str :type raddr_range: int :type n_instances: int """ @@ -1987,19 +2009,19 @@ class IPsecUtil: @staticmethod def vpp_ipsec_add_multiple_tunnels( - nodes, - interface1, - interface2, - n_tunnels, - crypto_alg, - integ_alg, - tunnel_ip1, - tunnel_ip2, - raddr_ip1, - raddr_ip2, - raddr_range, - tunnel_addr_incr=True, - ): + nodes: dict, + interface1: Union[str, int], + interface2: Union[str, int], + n_tunnels: int, + crypto_alg: CryptoAlg, + integ_alg: Optional[IntegAlg], + tunnel_ip1: str, + tunnel_ip2: str, + raddr_ip1: str, + raddr_ip2: str, + raddr_range: int, + tunnel_addr_incr: bool = True, + ) -> None: """Create multiple IPsec tunnels between two VPP nodes. :param nodes: VPP nodes to create tunnels. @@ -2019,15 +2041,15 @@ class IPsecUtil: :param tunnel_addr_incr: Enable or disable tunnel IP address incremental step. :type nodes: dict - :type interface1: str or int - :type interface2: str or int + :type interface1: Union[str, int] + :type interface2: Union[str, int] :type n_tunnels: int :type crypto_alg: CryptoAlg :type integ_alg: Optional[IntegAlg] :type tunnel_ip1: str :type tunnel_ip2: str - :type raddr_ip1: string - :type raddr_ip2: string + :type raddr_ip1: str + :type raddr_ip2: str :type raddr_range: int :type tunnel_addr_incr: bool """ @@ -2243,7 +2265,7 @@ class IPsecUtil: ) @staticmethod - def vpp_ipsec_show_all(node): + def vpp_ipsec_show_all(node: dict) -> None: """Run "show ipsec all" debug CLI command. :param node: Node to run command on. @@ -2252,7 +2274,7 @@ class IPsecUtil: PapiSocketExecutor.run_cli_cmd(node, "show ipsec all") @staticmethod - def show_ipsec_security_association(node): + def show_ipsec_security_association(node: dict) -> None: """Show IPSec security association. :param node: DUT node. @@ -2262,7 +2284,9 @@ class IPsecUtil: PapiSocketExecutor.dump_and_log(node, [cmd]) @staticmethod - def vpp_ipsec_flow_enable_rss(node, proto, rss_type, function="default"): + def vpp_ipsec_flow_enable_rss( + node: dict, proto: str, rss_type: str, function: str = "default" + ) -> int: """Ipsec flow enable rss action. :param node: DUT node. @@ -2275,6 +2299,7 @@ class IPsecUtil: :type rss_type: str :type function: str :returns: flow_index. + :rtype: int """ # TODO: to be fixed to use full PAPI when it is ready in VPP cmd = ( @@ -2288,8 +2313,8 @@ class IPsecUtil: @staticmethod def vpp_create_ipsec_flows_on_dut( - node, n_flows, rx_queues, spi_start, interface - ): + node: dict, n_flows: int, rx_queues: int, spi_start: int, interface: str + ) -> None: """Create mutiple ipsec flows and enable flows onto interface. :param node: DUT node. @@ -2303,7 +2328,6 @@ class IPsecUtil: :type rx_queues: int :type spi_start: int :type interface: str - :returns: flow_index. """ for i in range(0, n_flows): -- cgit 1.2.3-korg