aboutsummaryrefslogtreecommitdiffstats
path: root/resources/libraries/python/IPsecUtil.py
diff options
context:
space:
mode:
Diffstat (limited to 'resources/libraries/python/IPsecUtil.py')
-rw-r--r--resources/libraries/python/IPsecUtil.py540
1 files changed, 312 insertions, 228 deletions
diff --git a/resources/libraries/python/IPsecUtil.py b/resources/libraries/python/IPsecUtil.py
index bb46b49b14..1abfee2cec 100644
--- a/resources/libraries/python/IPsecUtil.py
+++ b/resources/libraries/python/IPsecUtil.py
@@ -24,6 +24,7 @@ from typing import Iterable, List, Optional, Sequence, Tuple, Union
from robot.libraries.BuiltIn import BuiltIn
from resources.libraries.python.Constants import Constants
+from resources.libraries.python.enum_util import get_enum_instance
from resources.libraries.python.IncrementUtil import ObjIncrement
from resources.libraries.python.InterfaceUtil import (
InterfaceUtil,
@@ -60,31 +61,59 @@ def gen_key(length: int) -> bytes:
)
-class PolicyAction(Enum):
- """Policy actions."""
+# TODO: Introduce a metaclass that adds .find and .InputType automatically?
+class IpsecSpdAction(Enum):
+ """IPsec SPD actions.
- BYPASS = ("bypass", 0)
+ Mirroring VPP: src/vnet/ipsec/ipsec_types.api enum ipsec_spd_action.
+ """
+
+ BYPASS = NONE = ("bypass", 0)
DISCARD = ("discard", 1)
+ RESOLVE = ("resolve", 2)
PROTECT = ("protect", 3)
- def __init__(self, policy_name: str, policy_int_repr: int):
- self.policy_name = policy_name
- self.policy_int_repr = policy_int_repr
+ def __init__(self, action_name: str, action_int_repr: int):
+ self.action_name = action_name
+ self.action_int_repr = action_int_repr
def __str__(self) -> str:
- return self.policy_name
+ return self.action_name
def __int__(self) -> int:
- return self.policy_int_repr
+ return self.action_int_repr
class CryptoAlg(Enum):
- """Encryption algorithms."""
+ """Encryption algorithms.
+
+ API names and numeric enums from ipsec_types.api (enum ipsec_crypto_alg).
+
+ Lowercase names from ipsec_sa.h (foreach_ipsec_crypto_alg).
+
+ Scapy names are from:
+ https://github.com/secdev/scapy/blob/master/scapy/layers/ipsec.py
+
+ Key lengths from crypto.h
+ (foreach_crypto_cipher_alg and foreach_crypto_aead_alg).
+ """
+ NONE = ("none", 0, "none", 0)
AES_CBC_128 = ("aes-cbc-128", 1, "AES-CBC", 16)
+ AES_CBC_192 = ("aes-cbc-192", 2, "AES-CBC", 24)
AES_CBC_256 = ("aes-cbc-256", 3, "AES-CBC", 32)
+ AES_CTR_128 = ("aes-ctr-128", 4, "AES-CTR", 16)
+ AES_CTR_192 = ("aes-ctr-192", 5, "AES-CTR", 24)
+ AES_CTR_256 = ("aes-ctr-256", 6, "AES-CTR", 32)
AES_GCM_128 = ("aes-gcm-128", 7, "AES-GCM", 16)
+ AES_GCM_192 = ("aes-gcm-192", 8, "AES-GCM", 24)
AES_GCM_256 = ("aes-gcm-256", 9, "AES-GCM", 32)
+ DES_CBC = ("des-cbc", 10, "DES", 7)
+ _3DES_CBC = ("3des-cbc", 11, "3DES", 24)
+ CHACHA20_POLY1305 = ("chacha20-poly1305", 12, "CHACHA20-POLY1305", 32)
+ AES_NULL_GMAC_128 = ("aes-null-gmac-128", 13, "AES-NULL-GMAC", 16)
+ AES_NULL_GMAC_192 = ("aes-null-gmac-192", 14, "AES-NULL-GMAC", 24)
+ AES_NULL_GMAC_256 = ("aes-null-gmac-256", 15, "AES-NULL-GMAC", 32)
def __init__(
self, alg_name: str, alg_int_repr: int, scapy_name: str, key_len: int
@@ -94,11 +123,37 @@ class CryptoAlg(Enum):
self.scapy_name = scapy_name
self.key_len = key_len
+ # TODO: Investigate if __int__ works with PAPI. It was not enough for "if".
+ def __bool__(self):
+ """A shorthand to enable "if crypto_alg:" constructs."""
+ return self.alg_int_repr != 0
+
class IntegAlg(Enum):
- """Integrity algorithm."""
+ """Integrity algorithms.
+
+ API names and numeric enums from ipsec_types.api (enum ipsec_integ_alg).
+
+ Lowercase names from ipsec_sa.h (foreach_ipsec_integ_alg).
+
+ Scapy names are from:
+ https://github.com/secdev/scapy/blob/master/scapy/layers/ipsec.py
+ Among those, "AES-CMAC-96" may be a mismatch,
+ but there is no sha2-related item with "96" in it.
+
+ Key lengths seem to be given double of digest length
+ from crypto.h (foreach_crypto_link_async_alg),
+ but data there is not complete
+ (e.g. it does not distinguish sha-256-96 from sha-256-128).
+ The missing values are chosen based on last number (e.g. 192 / 4 = 48).
+ """
+ NONE = ("none", 0, "none", 0)
+ MD5_96 = ("md5-96", 1, "HMAC-MD5-96", 24)
+ SHA1_96 = ("sha1-96", 2, "HMAC-SHA1-96", 24)
+ SHA_256_96 = ("sha-256-96", 3, "AES-CMAC-96", 24)
SHA_256_128 = ("sha-256-128", 4, "SHA2-256-128", 32)
+ SHA_384_192 = ("sha-384-192", 5, "SHA2-384-192", 48)
SHA_512_256 = ("sha-512-256", 6, "SHA2-512-256", 64)
def __init__(
@@ -109,18 +164,44 @@ class IntegAlg(Enum):
self.scapy_name = scapy_name
self.key_len = key_len
+ def __bool__(self):
+ """A shorthand to enable "if integ_alg:" constructs."""
+ return self.alg_int_repr != 0
+
+# TODO: Base on Enum, so str values can be defined as in alg enums?
class IPsecProto(IntEnum):
- """IPsec protocol."""
+ """IPsec protocol.
+
+ Mirroring VPP: src/vnet/ipsec/ipsec_types.api enum ipsec_proto.
+ """
+
+ ESP = 50
+ AH = 51
+ NONE = 255
+
+ def __str__(self) -> str:
+ """Return string suitable for CLI commands.
+
+ None is not supported.
- IPSEC_API_PROTO_ESP = 50
- IPSEC_API_PROTO_AH = 51
+ :returns: Lowercase name of the proto.
+ :rtype: str
+ :raises: ValueError if the numeric value is not recognized.
+ """
+ num = int(self)
+ if num == 50:
+ return "esp"
+ if num == 51:
+ return "ah"
+ raise ValueError(f"String form not defined for IPsecProto {num}")
+# The rest of enums do not appear outside this file, so no no change needed yet.
class IPsecSadFlags(IntEnum):
"""IPsec Security Association Database flags."""
- IPSEC_API_SAD_FLAG_NONE = 0
+ IPSEC_API_SAD_FLAG_NONE = NONE = 0
# Enable extended sequence numbers
IPSEC_API_SAD_FLAG_USE_ESN = 0x01
# Enable Anti - replay
@@ -139,7 +220,7 @@ class IPsecSadFlags(IntEnum):
class TunnelEncpaDecapFlags(IntEnum):
"""Flags controlling tunnel behaviour."""
- TUNNEL_API_ENCAP_DECAP_FLAG_NONE = 0
+ TUNNEL_API_ENCAP_DECAP_FLAG_NONE = NONE = 0
# at encap, copy the DF bit of the payload into the tunnel header
TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DF = 1
# at encap, set the DF bit in the tunnel header
@@ -156,177 +237,93 @@ class TunnelMode(IntEnum):
"""Tunnel modes."""
# point-to-point
- TUNNEL_API_MODE_P2P = 0
+ TUNNEL_API_MODE_P2P = NONE = 0
# multi-point
TUNNEL_API_MODE_MP = 1
-class IPsecUtil:
- """IPsec utilities."""
+# Derived types for type hints, based on capabilities of get_enum_instance.
+IpsecSpdAction.InputType = Union[IpsecSpdAction, str, None]
+CryptoAlg.InputType = Union[CryptoAlg, str, None]
+IntegAlg.InputType = Union[IntegAlg, str, None]
+IPsecProto.InputType = Union[IPsecProto, str, int, None]
+# TODO: Introduce a metaclass that adds .find and .InputType automatically?
- @staticmethod
- def policy_action_bypass() -> PolicyAction:
- """Return policy action bypass.
-
- :returns: PolicyAction enum BYPASS object.
- :rtype: PolicyAction
- """
- return PolicyAction.BYPASS
-
- @staticmethod
- def policy_action_discard() -> PolicyAction:
- """Return policy action discard.
-
- :returns: PolicyAction enum DISCARD object.
- :rtype: PolicyAction
- """
- return PolicyAction.DISCARD
-
- @staticmethod
- def policy_action_protect() -> PolicyAction:
- """Return policy action protect.
-
- :returns: PolicyAction enum PROTECT object.
- :rtype: PolicyAction
- """
- return PolicyAction.PROTECT
-
- @staticmethod
- def crypto_alg_aes_cbc_128() -> CryptoAlg:
- """Return encryption algorithm aes-cbc-128.
-
- :returns: CryptoAlg enum AES_CBC_128 object.
- :rtype: CryptoAlg
- """
- return CryptoAlg.AES_CBC_128
-
- @staticmethod
- def crypto_alg_aes_cbc_256() -> CryptoAlg:
- """Return encryption algorithm aes-cbc-256.
-
- :returns: CryptoAlg enum AES_CBC_256 object.
- :rtype: CryptoAlg
- """
- return CryptoAlg.AES_CBC_256
-
- @staticmethod
- def crypto_alg_aes_gcm_128() -> CryptoAlg:
- """Return encryption algorithm aes-gcm-128.
-
- :returns: CryptoAlg enum AES_GCM_128 object.
- :rtype: CryptoAlg
- """
- return CryptoAlg.AES_GCM_128
- @staticmethod
- def crypto_alg_aes_gcm_256() -> CryptoAlg:
- """Return encryption algorithm aes-gcm-256.
+class IPsecUtil:
+ """IPsec utilities."""
- :returns: CryptoAlg enum AES_GCM_128 object.
- :rtype: CryptoAlg
- """
- return CryptoAlg.AES_GCM_256
+ # The following 4 methods are Python one-liners,
+ # but they are useful when called as a Robot keyword.
@staticmethod
- def get_crypto_alg_key_len(crypto_alg: CryptoAlg) -> int:
+ def get_crypto_alg_key_len(crypto_alg: CryptoAlg.InputType) -> int:
"""Return encryption algorithm key length.
+ This is a Python one-liner, but useful when called as a Robot keyword.
+
:param crypto_alg: Encryption algorithm.
- :type crypto_alg: CryptoAlg
+ :type crypto_alg: CryptoAlg.InputType
:returns: Key length.
:rtype: int
"""
- return crypto_alg.key_len
+ return get_enum_instance(CryptoAlg, crypto_alg).key_len
@staticmethod
- def get_crypto_alg_scapy_name(crypto_alg: CryptoAlg) -> str:
+ def get_crypto_alg_scapy_name(crypto_alg: CryptoAlg.InputType) -> str:
"""Return encryption algorithm scapy name.
+ This is a Python one-liner, but useful when called as a Robot keyword.
+
:param crypto_alg: Encryption algorithm.
- :type crypto_alg: CryptoAlg
+ :type crypto_alg: CryptoAlg.InputType
:returns: Algorithm scapy name.
:rtype: str
"""
- return crypto_alg.scapy_name
+ return get_enum_instance(CryptoAlg, crypto_alg).scapy_name
+ # The below to keywords differ only by enum type conversion from str.
@staticmethod
- def integ_alg_sha_256_128() -> IntegAlg:
- """Return integrity algorithm SHA-256-128.
-
- :returns: IntegAlg enum SHA_256_128 object.
- :rtype: IntegAlg
- """
- return IntegAlg.SHA_256_128
-
- @staticmethod
- def integ_alg_sha_512_256() -> IntegAlg:
- """Return integrity algorithm SHA-512-256.
-
- :returns: IntegAlg enum SHA_512_256 object.
- :rtype: IntegAlg
- """
- return IntegAlg.SHA_512_256
-
- @staticmethod
- def get_integ_alg_key_len(integ_alg: Optional[IntegAlg]) -> int:
+ def get_integ_alg_key_len(integ_alg: IntegAlg.InputType) -> int:
"""Return integrity algorithm key length.
- None argument is accepted, returning zero.
-
:param integ_alg: Integrity algorithm.
- :type integ_alg: Optional[IntegAlg]
+ :type integ_alg: IntegAlg.InputType
:returns: Key length.
:rtype: int
"""
- return 0 if integ_alg is None else integ_alg.key_len
+ return get_enum_instance(IntegAlg, integ_alg).key_len
@staticmethod
- def get_integ_alg_scapy_name(integ_alg: Optional[IntegAlg]) -> str:
+ def get_integ_alg_scapy_name(integ_alg: IntegAlg.InputType) -> str:
"""Return integrity algorithm scapy name.
:param integ_alg: Integrity algorithm.
- :type integ_alg: IntegAlg
+ :type integ_alg: IntegAlg.InputType
:returns: Algorithm scapy name.
:rtype: str
"""
- return integ_alg.scapy_name
-
- @staticmethod
- def ipsec_proto_esp() -> int:
- """Return IPSec protocol ESP.
-
- :returns: IPsecProto enum ESP object.
- :rtype: IPsecProto
- """
- return int(IPsecProto.IPSEC_API_PROTO_ESP)
-
- @staticmethod
- def ipsec_proto_ah() -> int:
- """Return IPSec protocol AH.
-
- :returns: IPsecProto enum AH object.
- :rtype: IPsecProto
- """
- return int(IPsecProto.IPSEC_API_PROTO_AH)
+ return get_enum_instance(IntegAlg, integ_alg).scapy_name
@staticmethod
def vpp_ipsec_select_backend(
- node: dict, protocol: int, index: int = 1
+ node: dict, proto: IPsecProto.InputType, index: int = 1
) -> None:
"""Select IPsec backend.
:param node: VPP node to select IPsec backend on.
- :param protocol: IPsec protocol.
+ :param proto: IPsec protocol.
:param index: Backend index.
:type node: dict
- :type protocol: IPsecProto
+ :type proto: IPsecProto.InputType
:type index: int
:raises RuntimeError: If failed to select IPsec backend or if no API
reply received.
"""
+ proto = get_enum_instance(IPsecProto, proto)
cmd = "ipsec_select_backend"
err_msg = f"Failed to select IPsec backend on host {node['host']}"
- args = dict(protocol=protocol, index=index)
+ args = dict(protocol=proto, index=index)
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args).get_reply(err_msg)
@@ -420,9 +417,9 @@ class IPsecUtil:
node: dict,
sad_id: int,
spi: int,
- crypto_alg: CryptoAlg,
- crypto_key: str,
- integ_alg: Optional[IntegAlg] = None,
+ crypto_alg: CryptoAlg.InputType = None,
+ crypto_key: str = "",
+ integ_alg: IntegAlg.InputType = None,
integ_key: str = "",
tunnel_src: Optional[str] = None,
tunnel_dst: Optional[str] = None,
@@ -443,13 +440,15 @@ class IPsecUtil:
:type node: dict
:type sad_id: int
:type spi: int
- :type crypto_alg: CryptoAlg
+ :type crypto_alg: CryptoAlg.InputType
:type crypto_key: str
- :type integ_alg: Optional[IntegAlg]
+ :type integ_alg: IntegAlg.InputType
:type integ_key: str
:type tunnel_src: Optional[str]
:type tunnel_dst: Optional[str]
"""
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
if isinstance(crypto_key, str):
crypto_key = crypto_key.encode(encoding="utf-8")
if isinstance(integ_key, str):
@@ -480,7 +479,7 @@ class IPsecUtil:
spi=int(spi),
crypto_algorithm=crypto_alg.alg_int_repr,
crypto_key=ckey,
- integrity_algorithm=integ_alg.alg_int_repr if integ_alg else 0,
+ integrity_algorithm=integ_alg.alg_int_repr,
integrity_key=ikey,
flags=flags,
tunnel=dict(
@@ -492,7 +491,7 @@ class IPsecUtil:
),
dscp=int(IpDscp.IP_API_DSCP_CS0),
),
- protocol=int(IPsecProto.IPSEC_API_PROTO_ESP),
+ protocol=IPsecProto.ESP,
udp_src_port=IPSEC_UDP_PORT_DEFAULT,
udp_dst_port=IPSEC_UDP_PORT_DEFAULT,
anti_replay_window_size=IPSEC_REPLAY_WINDOW_DEFAULT,
@@ -507,9 +506,9 @@ class IPsecUtil:
n_entries: int,
sad_id: int,
spi: int,
- crypto_alg: CryptoAlg,
- crypto_key: str,
- integ_alg: Optional[IntegAlg] = None,
+ crypto_alg: CryptoAlg.InputType = None,
+ crypto_key: str = "",
+ integ_alg: IntegAlg.InputType = None,
integ_key: str = "",
tunnel_src: Optional[str] = None,
tunnel_dst: Optional[str] = None,
@@ -537,14 +536,16 @@ class IPsecUtil:
:type n_entries: int
:type sad_id: int
:type spi: int
- :type crypto_alg: CryptoAlg
+ :type crypto_alg: CryptoAlg.InputType
:type crypto_key: str
- :type integ_alg: Optional[IntegAlg]
+ :type integ_alg: IntegAlg.InputType
:type integ_key: str
:type tunnel_src: Optional[str]
:type tunnel_dst: Optional[str]
:type tunnel_addr_incr: bool
"""
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
if isinstance(crypto_key, str):
crypto_key = crypto_key.encode(encoding="utf-8")
if isinstance(integ_key, str):
@@ -585,7 +586,7 @@ class IPsecUtil:
spi=int(spi),
crypto_algorithm=crypto_alg.alg_int_repr,
crypto_key=ckey,
- integrity_algorithm=integ_alg.alg_int_repr if integ_alg else 0,
+ integrity_algorithm=integ_alg.alg_int_repr,
integrity_key=ikey,
flags=flags,
tunnel=dict(
@@ -597,7 +598,7 @@ class IPsecUtil:
),
dscp=int(IpDscp.IP_API_DSCP_CS0),
),
- protocol=int(IPsecProto.IPSEC_API_PROTO_ESP),
+ protocol=IPsecProto.ESP,
udp_src_port=IPSEC_UDP_PORT_DEFAULT,
udp_dst_port=IPSEC_UDP_PORT_DEFAULT,
anti_replay_window_size=IPSEC_REPLAY_WINDOW_DEFAULT,
@@ -774,7 +775,7 @@ class IPsecUtil:
entry_amount: int,
local_addr_range: Union[str, IPv4Address, IPv6Address],
remote_addr_range: Union[str, IPv4Address, IPv6Address],
- action: PolicyAction = PolicyAction.BYPASS,
+ action: IpsecSpdAction.InputType = IpsecSpdAction.BYPASS,
inbound: bool = False,
bidirectional: bool = True,
) -> None:
@@ -801,7 +802,7 @@ class IPsecUtil:
:param remote_addr_range: Matching remote address range in
direction 1 in format IP/prefix or IP/mask. If no mask is
provided, it's considered to be /32.
- :param action: Policy action.
+ :param action: IPsec SPD action.
:param inbound: If True policy is for inbound traffic, otherwise
outbound.
:param bidirectional: When True, will create SPDs in both directions
@@ -814,14 +815,16 @@ class IPsecUtil:
Union[str, IPv4Address, IPv6Address]
:type remote_addr_range:
Union[str, IPv4Address, IPv6Address]
- :type action: PolicyAction
+ :type action: IpsecSpdAction.InputType
:type inbound: bool
:type bidirectional: bool
- :raises NotImplementedError: When the action is PolicyAction.PROTECT.
+ :raises NotImplementedError: When the action is IpsecSpdAction.PROTECT.
"""
-
- if action == PolicyAction.PROTECT:
- raise NotImplementedError("Policy action PROTECT is not supported.")
+ action = get_enum_instance(IpsecSpdAction, action)
+ if action == IpsecSpdAction.PROTECT:
+ raise NotImplementedError(
+ "IPsec SPD action PROTECT is not supported."
+ )
spd_id_dir1 = 1
spd_id_dir2 = 2
@@ -913,10 +916,10 @@ class IPsecUtil:
executor: PapiSocketExecutor,
spd_id: int,
priority: int,
- action: PolicyAction,
+ action: IpsecSpdAction.InputType,
inbound: bool = True,
sa_id: Optional[int] = None,
- proto: Optional[int] = None,
+ proto: IPsecProto.InputType = None,
laddr_range: Optional[str] = None,
raddr_range: Optional[str] = None,
lport_range: Optional[str] = None,
@@ -932,10 +935,10 @@ class IPsecUtil:
:param executor: Open PAPI executor (async handling) to add commands to.
:param spd_id: SPD ID to add entry on.
:param priority: SPD entry priority, higher number = higher priority.
- :param action: Policy action.
+ :param action: IPsec SPD action.
:param inbound: If True policy is for inbound traffic, otherwise
outbound.
- :param sa_id: SAD entry ID for action PolicyAction.PROTECT.
+ :param sa_id: SAD entry ID for action IpsecSpdAction.PROTECT.
:param proto: Policy selector next layer protocol number.
:param laddr_range: Policy selector local IPv4 or IPv6 address range
in format IP/prefix or IP/mask. If no mask is provided,
@@ -952,16 +955,18 @@ class IPsecUtil:
:type executor: PapiSocketExecutor
:type spd_id: int
:type priority: int
- :type action: PolicyAction
+ :type action: IpsecSpdAction.InputType
:type inbound: bool
:type sa_id: Optional[int]
- :type proto: Optional[int]
+ :type proto: IPsecProto.InputType
:type laddr_range: Optional[str]
:type raddr_range: Optional[str]
:type lport_range: Optional[str]
:type rport_range: Optional[str]
:type is_ipv6: bool
"""
+ action = get_enum_instance(IpsecSpdAction, action)
+ proto = get_enum_instance(IPsecProto, proto)
if laddr_range is None:
laddr_range = "::/0" if is_ipv6 else "0.0.0.0/0"
@@ -979,7 +984,7 @@ class IPsecUtil:
is_outbound=not inbound,
sa_id=int(sa_id) if sa_id else 0,
policy=int(action),
- protocol=255 if proto is None else int(proto),
+ protocol=proto,
remote_address_start=IPAddress.create_ip_address_object(
remote_net.network_address
),
@@ -1013,10 +1018,10 @@ class IPsecUtil:
node: dict,
spd_id: int,
priority: int,
- action: PolicyAction,
+ action: IpsecSpdAction.InputType,
inbound: bool = True,
sa_id: Optional[int] = None,
- proto: Optional[int] = None,
+ proto: IPsecProto.InputType = None,
laddr_range: Optional[str] = None,
raddr_range: Optional[str] = None,
lport_range: Optional[str] = None,
@@ -1028,10 +1033,10 @@ class IPsecUtil:
:param node: VPP node to add SPD entry on.
:param spd_id: SPD ID to add entry on.
:param priority: SPD entry priority, higher number = higher priority.
- :param action: Policy action.
+ :param action: IPsec SPD action.
:param inbound: If True policy is for inbound traffic, otherwise
outbound.
- :param sa_id: SAD entry ID for action PolicyAction.PROTECT.
+ :param sa_id: SAD entry ID for action IpsecSpdAction.PROTECT.
:param proto: Policy selector next layer protocol number.
:param laddr_range: Policy selector local IPv4 or IPv6 address range
in format IP/prefix or IP/mask. If no mask is provided,
@@ -1048,16 +1053,18 @@ class IPsecUtil:
:type node: dict
:type spd_id: int
:type priority: int
- :type action: PolicyAction
+ :type action: IpsecSpdAction.InputType
:type inbound: bool
:type sa_id: Optional[int]
- :type proto: Optional[int]
+ :type proto: IPsecProto.InputType
:type laddr_range: Optional[str]
:type raddr_range: Optional[str]
:type lport_range: Optional[str]
:type rport_range: Optional[str]
:type is_ipv6: bool
"""
+ action = get_enum_instance(IpsecSpdAction, action)
+ proto = get_enum_instance(IPsecProto, proto)
err_msg = (
"Failed to add entry to Security Policy Database"
f" {spd_id} on host {node['host']}"
@@ -1085,10 +1092,10 @@ class IPsecUtil:
n_entries: int,
spd_id: int,
priority: Optional[ObjIncrement],
- action: PolicyAction,
+ action: IpsecSpdAction.InputType,
inbound: bool,
sa_id: Optional[ObjIncrement] = None,
- proto: Optional[int] = None,
+ proto: IPsecProto.InputType = None,
laddr_range: Optional[NetworkIncrement] = None,
raddr_range: Optional[NetworkIncrement] = None,
lport_range: Optional[str] = None,
@@ -1101,10 +1108,10 @@ class IPsecUtil:
:param n_entries: Number of SPD entries to be added.
:param spd_id: SPD ID to add entries on.
:param priority: SPD entries priority, higher number = higher priority.
- :param action: Policy action.
+ :param action: IPsec SPD action.
:param inbound: If True policy is for inbound traffic, otherwise
outbound.
- :param sa_id: SAD entry ID for action PolicyAction.PROTECT.
+ :param sa_id: SAD entry ID for action IpsecSpdAction.PROTECT.
:param proto: Policy selector next layer protocol number.
:param laddr_range: Policy selector local IPv4 or IPv6 address range
in format IP/prefix or IP/mask. If no mask is provided,
@@ -1122,16 +1129,18 @@ class IPsecUtil:
:type n_entries: int
:type spd_id: int
:type priority: Optional[ObjIncrement]
- :type action: PolicyAction
+ :type action: IpsecSpdAction.InputType
:type inbound: bool
:type sa_id: Optional[ObjIncrement]
- :type proto: Optional[int]
+ :type proto: IPsecProto.InputType
:type laddr_range: Optional[NetworkIncrement]
:type raddr_range: Optional[NetworkIncrement]
:type lport_range: Optional[str]
:type rport_range: Optional[str]
:type is_ipv6: bool
"""
+ action = get_enum_instance(IpsecSpdAction, action)
+ proto = get_enum_instance(IPsecProto, proto)
if laddr_range is None:
laddr_range = "::/0" if is_ipv6 else "0.0.0.0/0"
laddr_range = NetworkIncrement(ip_network(laddr_range), 0)
@@ -1253,12 +1262,14 @@ class IPsecUtil:
if1_key: str,
if2_key: str,
n_tunnels: int,
- crypto_alg: CryptoAlg,
- integ_alg: Optional[IntegAlg],
+ crypto_alg: CryptoAlg.InputType,
+ integ_alg: IntegAlg.InputType,
raddr_ip2: Union[IPv4Address, IPv6Address],
addr_incr: int,
spi_d: dict,
existing_tunnels: int = 0,
+ udp_encap: bool = False,
+ anti_replay: bool = False,
) -> Tuple[List[bytes], List[bytes]]:
"""Create multiple IPsec tunnel interfaces on DUT1 node using PAPI.
@@ -1280,20 +1291,26 @@ class IPsecUtil:
:param addr_incr: IP / IPv6 address incremental step.
:param existing_tunnels: Number of tunnel interfaces before creation.
Useful mainly for reconf tests. Default 0.
+ :param udp_encap: Whether to apply UDP_ENCAP flag.
+ :param anti_replay: Whether to apply USE_ANTI_REPLAY flag.
:type nodes: dict
:type tun_ips: dict
:type if1_key: str
:type if2_key: str
:type n_tunnels: int
- :type crypto_alg: CryptoAlg
- :type integ_alg: Optional[IntegAlg]
+ :type crypto_alg: CryptoAlg.InputType
+ :type integ_alg: IntegAlg.InputType
:type raddr_ip2: Union[IPv4Address, IPv6Address]
:type addr_incr: int
:type spi_d: dict
:type existing_tunnels: int
+ :type udp_encap: bool
+ :type anti_replay: bool
:returns: Generated ckeys and ikeys.
:rtype: List[bytes], List[bytes]
"""
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
if not existing_tunnels:
loop_sw_if_idx = IPsecUtil._ipsec_create_loopback_dut1_papi(
nodes, tun_ips, if1_key, if2_key
@@ -1362,13 +1379,17 @@ class IPsecUtil:
c_key = dict(length=0, data=None)
i_key = dict(length=0, data=None)
common_flags = IPsecSadFlags.IPSEC_API_SAD_FLAG_NONE
+ if udp_encap:
+ common_flags |= IPsecSadFlags.IPSEC_API_SAD_FLAG_UDP_ENCAP
+ if anti_replay:
+ common_flags |= IPsecSadFlags.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
sad_entry = dict(
sad_id=None,
spi=None,
- protocol=int(IPsecProto.IPSEC_API_PROTO_ESP),
+ protocol=IPsecProto.ESP,
crypto_algorithm=crypto_alg.alg_int_repr,
crypto_key=c_key,
- integrity_algorithm=integ_alg.alg_int_repr if integ_alg else 0,
+ integrity_algorithm=integ_alg.alg_int_repr,
integrity_key=i_key,
flags=common_flags,
tunnel=dict(
@@ -1387,12 +1408,8 @@ class IPsecUtil:
)
args = dict(entry=sad_entry)
for i in range(existing_tunnels, n_tunnels):
- ckeys.append(
- gen_key(IPsecUtil.get_crypto_alg_key_len(crypto_alg))
- )
- ikeys.append(
- gen_key(IPsecUtil.get_integ_alg_key_len(integ_alg))
- )
+ ckeys.append(gen_key(crypto_alg.key_len))
+ ikeys.append(gen_key(integ_alg.key_len))
# SAD entry for outband / tx path
sad_entry["sad_id"] = i
sad_entry["spi"] = spi_d["spi_1"] + i
@@ -1497,14 +1514,16 @@ class IPsecUtil:
tun_ips: dict,
if2_key: str,
n_tunnels: int,
- crypto_alg: CryptoAlg,
+ crypto_alg: CryptoAlg.InputType,
ckeys: Sequence[bytes],
- integ_alg: Optional[IntegAlg],
+ integ_alg: IntegAlg.InputType,
ikeys: Sequence[bytes],
raddr_ip1: Union[IPv4Address, IPv6Address],
addr_incr: int,
spi_d: dict,
existing_tunnels: int = 0,
+ udp_encap: bool = False,
+ anti_replay: bool = False,
) -> None:
"""Create multiple IPsec tunnel interfaces on DUT2 node using PAPI.
@@ -1528,19 +1547,25 @@ class IPsecUtil:
:param addr_incr: IP / IPv6 address incremental step.
:param existing_tunnels: Number of tunnel interfaces before creation.
Useful mainly for reconf tests. Default 0.
+ :param udp_encap: Whether to apply UDP_ENCAP flag.
+ :param anti_replay: Whether to apply USE_ANTI_REPLAY flag.
:type nodes: dict
:type tun_ips: dict
:type if2_key: str
:type n_tunnels: int
- :type crypto_alg: CryptoAlg
+ :type crypto_alg: CryptoAlg.InputType
:type ckeys: Sequence[bytes]
- :type integ_alg: Optional[IntegAlg]
+ :type integ_alg: IntegAlg.InputType
:type ikeys: Sequence[bytes]
:type raddr_ip1: Union[IPv4Address, IPv6Address]
:type addr_incr: int
:type spi_d: dict
:type existing_tunnels: int
+ :type udp_encap: bool
+ :type anti_replay: bool
"""
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
with PapiSocketExecutor(nodes["DUT2"], is_async=True) as papi_exec:
if not existing_tunnels:
# Set IP address on VPP node 2 interface
@@ -1602,13 +1627,17 @@ class IPsecUtil:
c_key = dict(length=0, data=None)
i_key = dict(length=0, data=None)
common_flags = IPsecSadFlags.IPSEC_API_SAD_FLAG_NONE
+ if udp_encap:
+ common_flags |= IPsecSadFlags.IPSEC_API_SAD_FLAG_UDP_ENCAP
+ if anti_replay:
+ common_flags |= IPsecSadFlags.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
sad_entry = dict(
sad_id=None,
spi=None,
- protocol=int(IPsecProto.IPSEC_API_PROTO_ESP),
+ protocol=IPsecProto.ESP,
crypto_algorithm=crypto_alg.alg_int_repr,
crypto_key=c_key,
- integrity_algorithm=integ_alg.alg_int_repr if integ_alg else 0,
+ integrity_algorithm=integ_alg.alg_int_repr,
integrity_key=i_key,
flags=common_flags,
tunnel=dict(
@@ -1627,12 +1656,8 @@ class IPsecUtil:
)
args = dict(entry=sad_entry)
for i in range(existing_tunnels, n_tunnels):
- ckeys.append(
- gen_key(IPsecUtil.get_crypto_alg_key_len(crypto_alg))
- )
- ikeys.append(
- gen_key(IPsecUtil.get_integ_alg_key_len(integ_alg))
- )
+ ckeys.append(gen_key(crypto_alg.key_len))
+ ikeys.append(gen_key(integ_alg.key_len))
# SAD entry for outband / tx path
sad_entry["sad_id"] = 100000 + i
sad_entry["spi"] = spi_d["spi_2"] + i
@@ -1749,12 +1774,14 @@ class IPsecUtil:
if1_key: str,
if2_key: str,
n_tunnels: int,
- crypto_alg: CryptoAlg,
- integ_alg: Optional[IntegAlg],
+ crypto_alg: CryptoAlg.InputType,
+ integ_alg: IntegAlg.InputType,
raddr_ip1: str,
raddr_ip2: str,
raddr_range: int,
existing_tunnels: int = 0,
+ udp_encap: bool = False,
+ anti_replay: bool = False,
return_keys: bool = False,
) -> Optional[Tuple[List[bytes], List[bytes], int, int]]:
"""Create multiple IPsec tunnel interfaces between two VPP nodes.
@@ -1784,22 +1811,28 @@ class IPsecUtil:
:param existing_tunnels: Number of tunnel interfaces before creation.
Useful mainly for reconf tests. Default 0.
:param return_keys: Whether generated keys should be returned.
+ :param udp_encap: Whether to apply UDP_ENCAP flag.
+ :param anti_replay: Whether to apply USE_ANTI_REPLAY flag.
:type nodes: dict
:type tun_if1_ip_addr: str
:type tun_if2_ip_addr: str
:type if1_key: str
:type if2_key: str
:type n_tunnels: int
- :type crypto_alg: CryptoAlg
- :type integ_alg: Optional[IntegAlg]
+ :type crypto_alg: CryptoAlg.InputType
+ :type integ_alg: IntegAlg.InputType
:type raddr_ip1: str
:type raddr_ip2: str
:type raddr_range: int
:type existing_tunnels: int
:type return_keys: bool
+ :type udp_encap: bool
+ :type anti_replay: bool
:returns: Ckeys, ikeys, spi_1, spi_2.
:rtype: Optional[Tuple[List[bytes], List[bytes], int, int]]
"""
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
n_tunnels = int(n_tunnels)
existing_tunnels = int(existing_tunnels)
spi_d = dict(spi_1=100000, spi_2=200000)
@@ -1826,6 +1859,8 @@ class IPsecUtil:
addr_incr,
spi_d,
existing_tunnels,
+ udp_encap,
+ anti_replay,
)
if "DUT2" in nodes.keys():
IPsecUtil._ipsec_create_tunnel_interfaces_dut2_papi(
@@ -1841,6 +1876,8 @@ class IPsecUtil:
addr_incr,
spi_d,
existing_tunnels,
+ udp_encap,
+ anti_replay,
)
if return_keys:
@@ -1848,13 +1885,56 @@ class IPsecUtil:
return None
@staticmethod
+ def _create_ipsec_script_files(
+ dut: str, instances: int
+ ) -> List[TextIOWrapper]:
+ """Create script files for configuring IPsec in containers
+
+ :param dut: DUT node on which to create the script files
+ :param instances: number of containers on DUT node
+ :type dut: str
+ :type instances: int
+ :returns: Created opened file handles.
+ :rtype: List[TextIOWrapper]
+ """
+ scripts = []
+ for cnf in range(0, instances):
+ script_filename = (
+ f"/tmp/ipsec_create_tunnel_cnf_{dut}_{cnf + 1}.config"
+ )
+ scripts.append(open(script_filename, "w", encoding="utf-8"))
+ return scripts
+
+ @staticmethod
+ def _close_and_copy_ipsec_script_files(
+ dut: str, nodes: dict, instances: int, scripts: Sequence[TextIOWrapper]
+ ) -> None:
+ """Close created scripts and copy them to containers
+
+ :param dut: DUT node on which to create the script files
+ :param nodes: VPP nodes
+ :param instances: number of containers on DUT node
+ :param scripts: dictionary holding the script files
+ :type dut: str
+ :type nodes: dict
+ :type instances: int
+ :type scripts: dict
+ """
+ for cnf in range(0, instances):
+ scripts[cnf].close()
+ script_filename = (
+ f"/tmp/ipsec_create_tunnel_cnf_{dut}_{cnf + 1}.config"
+ )
+ scp_node(nodes[dut], script_filename, script_filename)
+
+ @staticmethod
def vpp_ipsec_add_multiple_tunnels(
nodes: dict,
interface1: Union[str, int],
interface2: Union[str, int],
n_tunnels: int,
- crypto_alg: CryptoAlg,
- integ_alg: Optional[IntegAlg],
+ crypto_alg: CryptoAlg.InputType,
+ integ_alg: IntegAlg.InputType,
tunnel_ip1: str,
tunnel_ip2: str,
raddr_ip1: str,
@@ -1884,8 +1964,8 @@ class IPsecUtil:
:type interface1: Union[str, int]
:type interface2: Union[str, int]
:type n_tunnels: int
- :type crypto_alg: CryptoAlg
- :type integ_alg: Optional[IntegAlg]
+ :type crypto_alg: CryptoAlg.InputType
+ :type integ_alg: IntegAlg.InputType
:type tunnel_ip1: str
:type tunnel_ip2: str
:type raddr_ip1: str
@@ -1893,6 +1973,9 @@ class IPsecUtil:
:type raddr_range: int
:type tunnel_addr_incr: bool
"""
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
+
spd_id = 1
p_hi = 100
p_lo = 10
@@ -1901,15 +1984,8 @@ class IPsecUtil:
spi_1 = 300000
spi_2 = 400000
- crypto_key = gen_key(
- IPsecUtil.get_crypto_alg_key_len(crypto_alg)
- ).decode()
- integ_key = (
- gen_key(IPsecUtil.get_integ_alg_key_len(integ_alg)).decode()
- if integ_alg
- else ""
- )
-
+ crypto_key = gen_key(crypto_alg.key_len).decode()
+ integ_key = gen_key(integ_alg.key_len).decode()
rmac = (
Topology.get_interface_mac(nodes["DUT2"], interface2)
if "DUT2" in nodes.keys()
@@ -1946,9 +2022,9 @@ class IPsecUtil:
nodes["DUT1"],
spd_id,
p_hi,
- PolicyAction.BYPASS,
+ IpsecSpdAction.BYPASS,
inbound=False,
- proto=50,
+ proto=IPsecProto.ESP,
laddr_range=dut1_local_outbound_range,
raddr_range=dut1_remote_outbound_range,
)
@@ -1956,9 +2032,9 @@ class IPsecUtil:
nodes["DUT1"],
spd_id,
p_hi,
- PolicyAction.BYPASS,
+ IpsecSpdAction.BYPASS,
inbound=True,
- proto=50,
+ proto=IPsecProto.ESP,
laddr_range=dut1_remote_outbound_range,
raddr_range=dut1_local_outbound_range,
)
@@ -1982,7 +2058,7 @@ class IPsecUtil:
n_tunnels,
spd_id,
priority=ObjIncrement(p_lo, 0),
- action=PolicyAction.PROTECT,
+ action=IpsecSpdAction.PROTECT,
inbound=False,
sa_id=ObjIncrement(sa_id_1, 1),
raddr_range=NetworkIncrement(ip_network(raddr_ip2)),
@@ -2006,7 +2082,7 @@ class IPsecUtil:
n_tunnels,
spd_id,
priority=ObjIncrement(p_lo, 0),
- action=PolicyAction.PROTECT,
+ action=IpsecSpdAction.PROTECT,
inbound=True,
sa_id=ObjIncrement(sa_id_2, 1),
raddr_range=NetworkIncrement(ip_network(raddr_ip1)),
@@ -2039,9 +2115,9 @@ class IPsecUtil:
nodes["DUT2"],
spd_id,
p_hi,
- PolicyAction.BYPASS,
+ IpsecSpdAction.BYPASS,
inbound=False,
- proto=50,
+ proto=IPsecProto.ESP,
laddr_range=dut2_remote_outbound_range,
raddr_range=dut2_local_outbound_range,
)
@@ -2049,9 +2125,9 @@ class IPsecUtil:
nodes["DUT2"],
spd_id,
p_hi,
- PolicyAction.BYPASS,
+ IpsecSpdAction.BYPASS,
inbound=True,
- proto=50,
+ proto=IPsecProto.ESP,
laddr_range=dut2_local_outbound_range,
raddr_range=dut2_remote_outbound_range,
)
@@ -2074,7 +2150,7 @@ class IPsecUtil:
n_tunnels,
spd_id,
priority=ObjIncrement(p_lo, 0),
- action=PolicyAction.PROTECT,
+ action=IpsecSpdAction.PROTECT,
inbound=True,
sa_id=ObjIncrement(sa_id_1, 1),
raddr_range=NetworkIncrement(ip_network(raddr_ip2)),
@@ -2098,7 +2174,7 @@ class IPsecUtil:
n_tunnels,
spd_id,
priority=ObjIncrement(p_lo, 0),
- action=PolicyAction.PROTECT,
+ action=IpsecSpdAction.PROTECT,
inbound=False,
sa_id=ObjIncrement(sa_id_2, 1),
raddr_range=NetworkIncrement(ip_network(raddr_ip1)),
@@ -2125,7 +2201,10 @@ class IPsecUtil:
@staticmethod
def vpp_ipsec_flow_enable_rss(
- node: dict, proto: str, rss_type: str, function: str = "default"
+ node: dict,
+ proto: str = "IPSEC_ESP",
+ rss_type: str = "esp",
+ function: str = "default",
) -> int:
"""Ipsec flow enable rss action.
@@ -2133,14 +2212,19 @@ class IPsecUtil:
:param proto: The flow protocol.
:param rss_type: RSS type.
:param function: RSS function.
-
:type node: dict
- :type proto: str
+ :type proto: IPsecProto.InputType
:type rss_type: str
:type function: str
:returns: flow_index.
:rtype: int
"""
+ # The proto argument does not correspond to IPsecProto.
+ # The allowed values come from src/vnet/ip/protocols.def
+ # and we do not have a good enum for that yet.
+ # FlowUtil.FlowType and FlowUtil.FlowProto are close,
+ # but not exactly the same.
+
# TODO: to be fixed to use full PAPI when it is ready in VPP
cmd = (
f"test flow add src-ip any proto {proto} rss function"