aboutsummaryrefslogtreecommitdiffstats
path: root/resources/libraries/python/IPsecUtil.py
diff options
context:
space:
mode:
Diffstat (limited to 'resources/libraries/python/IPsecUtil.py')
-rw-r--r--resources/libraries/python/IPsecUtil.py420
1 files changed, 194 insertions, 226 deletions
diff --git a/resources/libraries/python/IPsecUtil.py b/resources/libraries/python/IPsecUtil.py
index 19995e547d..59374ab73f 100644
--- a/resources/libraries/python/IPsecUtil.py
+++ b/resources/libraries/python/IPsecUtil.py
@@ -24,6 +24,7 @@ from typing import Iterable, List, Optional, Sequence, Tuple, Union
from robot.libraries.BuiltIn import BuiltIn
from resources.libraries.python.Constants import Constants
+from resources.libraries.python.enum_util import get_enum_instance
from resources.libraries.python.IncrementUtil import ObjIncrement
from resources.libraries.python.InterfaceUtil import (
InterfaceUtil,
@@ -60,27 +61,33 @@ def gen_key(length: int) -> bytes:
)
-class PolicyAction(Enum):
- """Policy actions."""
+# TODO: Introduce a metaclass that adds .find and .InputType automatically?
+class IpsecSpdAction(Enum):
+ """IPsec SPD actions.
- BYPASS = ("bypass", 0)
+ Mirroring VPP: src/vnet/ipsec/ipsec_types.api enum ipsec_spd_action.
+ """
+
+ BYPASS = NONE = ("bypass", 0)
DISCARD = ("discard", 1)
+ RESOLVE = ("resolve", 2)
PROTECT = ("protect", 3)
- def __init__(self, policy_name: str, policy_int_repr: int):
- self.policy_name = policy_name
- self.policy_int_repr = policy_int_repr
+ def __init__(self, action_name: str, action_int_repr: int):
+ self.action_name = action_name
+ self.action_int_repr = action_int_repr
def __str__(self) -> str:
- return self.policy_name
+ return self.action_name
def __int__(self) -> int:
- return self.policy_int_repr
+ return self.action_int_repr
class CryptoAlg(Enum):
"""Encryption algorithms."""
+ NONE = ("none", 0, "none", 0)
AES_CBC_128 = ("aes-cbc-128", 1, "AES-CBC", 16)
AES_CBC_256 = ("aes-cbc-256", 3, "AES-CBC", 32)
AES_GCM_128 = ("aes-gcm-128", 7, "AES-GCM", 16)
@@ -94,10 +101,16 @@ class CryptoAlg(Enum):
self.scapy_name = scapy_name
self.key_len = key_len
+ # TODO: Investigate if __int__ works with PAPI. It was not enough for "if".
+ def __bool__(self):
+ """A shorthand to enable "if crypto_alg:" constructs."""
+ return self.alg_int_repr != 0
+
class IntegAlg(Enum):
"""Integrity algorithm."""
+ NONE = ("none", 0, "none", 0)
SHA_256_128 = ("sha-256-128", 4, "SHA2-256-128", 32)
SHA_512_256 = ("sha-512-256", 6, "SHA2-512-256", 64)
@@ -109,18 +122,44 @@ class IntegAlg(Enum):
self.scapy_name = scapy_name
self.key_len = key_len
+ def __bool__(self):
+ """A shorthand to enable "if integ_alg:" constructs."""
+ return self.alg_int_repr != 0
+
+# TODO: Base on Enum, so str values can be defined as in alg enums?
class IPsecProto(IntEnum):
- """IPsec protocol."""
+ """IPsec protocol.
+
+ Mirroring VPP: src/vnet/ipsec/ipsec_types.api enum ipsec_proto.
+ """
+
+ ESP = 50
+ AH = 51
+ NONE = 255
+
+ def __str__(self) -> str:
+ """Return string suitable for CLI commands.
- IPSEC_API_PROTO_ESP = 50
- IPSEC_API_PROTO_AH = 51
+ None is not supported.
+
+ :returns: Lowercase name of the proto.
+ :rtype: str
+ :raises: ValueError if the numeric value is not recognized.
+ """
+ num = int(self)
+ if num == 50:
+ return "esp"
+ if num == 51:
+ return "ah"
+ raise ValueError(f"String form not defined for IPsecProto {num}")
+# The rest of enums do not appear outside this file, so no no change needed yet.
class IPsecSadFlags(IntEnum):
"""IPsec Security Association Database flags."""
- IPSEC_API_SAD_FLAG_NONE = 0
+ IPSEC_API_SAD_FLAG_NONE = NONE = 0
# Enable extended sequence numbers
IPSEC_API_SAD_FLAG_USE_ESN = 0x01
# Enable Anti - replay
@@ -139,7 +178,7 @@ class IPsecSadFlags(IntEnum):
class TunnelEncpaDecapFlags(IntEnum):
"""Flags controlling tunnel behaviour."""
- TUNNEL_API_ENCAP_DECAP_FLAG_NONE = 0
+ TUNNEL_API_ENCAP_DECAP_FLAG_NONE = NONE = 0
# at encap, copy the DF bit of the payload into the tunnel header
TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DF = 1
# at encap, set the DF bit in the tunnel header
@@ -156,177 +195,93 @@ class TunnelMode(IntEnum):
"""Tunnel modes."""
# point-to-point
- TUNNEL_API_MODE_P2P = 0
+ TUNNEL_API_MODE_P2P = NONE = 0
# multi-point
TUNNEL_API_MODE_MP = 1
-class IPsecUtil:
- """IPsec utilities."""
-
- @staticmethod
- def policy_action_bypass() -> PolicyAction:
- """Return policy action bypass.
-
- :returns: PolicyAction enum BYPASS object.
- :rtype: PolicyAction
- """
- return PolicyAction.BYPASS
-
- @staticmethod
- def policy_action_discard() -> PolicyAction:
- """Return policy action discard.
-
- :returns: PolicyAction enum DISCARD object.
- :rtype: PolicyAction
- """
- return PolicyAction.DISCARD
-
- @staticmethod
- def policy_action_protect() -> PolicyAction:
- """Return policy action protect.
-
- :returns: PolicyAction enum PROTECT object.
- :rtype: PolicyAction
- """
- return PolicyAction.PROTECT
-
- @staticmethod
- def crypto_alg_aes_cbc_128() -> CryptoAlg:
- """Return encryption algorithm aes-cbc-128.
-
- :returns: CryptoAlg enum AES_CBC_128 object.
- :rtype: CryptoAlg
- """
- return CryptoAlg.AES_CBC_128
-
- @staticmethod
- def crypto_alg_aes_cbc_256() -> CryptoAlg:
- """Return encryption algorithm aes-cbc-256.
-
- :returns: CryptoAlg enum AES_CBC_256 object.
- :rtype: CryptoAlg
- """
- return CryptoAlg.AES_CBC_256
-
- @staticmethod
- def crypto_alg_aes_gcm_128() -> CryptoAlg:
- """Return encryption algorithm aes-gcm-128.
+# Derived types for type hints, based on capabilities of get_enum_instance.
+IpsecSpdAction.InputType = Union[IpsecSpdAction, str, None]
+CryptoAlg.InputType = Union[CryptoAlg, str, None]
+IntegAlg.InputType = Union[IntegAlg, str, None]
+IPsecProto.InputType = Union[IPsecProto, str, int, None]
+# TODO: Introduce a metaclass that adds .find and .InputType automatically?
- :returns: CryptoAlg enum AES_GCM_128 object.
- :rtype: CryptoAlg
- """
- return CryptoAlg.AES_GCM_128
- @staticmethod
- def crypto_alg_aes_gcm_256() -> CryptoAlg:
- """Return encryption algorithm aes-gcm-256.
+class IPsecUtil:
+ """IPsec utilities."""
- :returns: CryptoAlg enum AES_GCM_128 object.
- :rtype: CryptoAlg
- """
- return CryptoAlg.AES_GCM_256
+ # The following 4 methods are Python one-liners,
+ # but they are useful when called as a Robot keyword.
@staticmethod
- def get_crypto_alg_key_len(crypto_alg: CryptoAlg) -> int:
+ def get_crypto_alg_key_len(crypto_alg: CryptoAlg.InputType) -> int:
"""Return encryption algorithm key length.
+ This is a Python one-liner, but useful when called as a Robot keyword.
+
:param crypto_alg: Encryption algorithm.
- :type crypto_alg: CryptoAlg
+ :type crypto_alg: CryptoAlg.InputType
:returns: Key length.
:rtype: int
"""
- return crypto_alg.key_len
+ return get_enum_instance(CryptoAlg, crypto_alg).key_len
@staticmethod
- def get_crypto_alg_scapy_name(crypto_alg: CryptoAlg) -> str:
+ def get_crypto_alg_scapy_name(crypto_alg: CryptoAlg.InputType) -> str:
"""Return encryption algorithm scapy name.
+ This is a Python one-liner, but useful when called as a Robot keyword.
+
:param crypto_alg: Encryption algorithm.
- :type crypto_alg: CryptoAlg
+ :type crypto_alg: CryptoAlg.InputType
:returns: Algorithm scapy name.
:rtype: str
"""
- return crypto_alg.scapy_name
-
- @staticmethod
- def integ_alg_sha_256_128() -> IntegAlg:
- """Return integrity algorithm SHA-256-128.
-
- :returns: IntegAlg enum SHA_256_128 object.
- :rtype: IntegAlg
- """
- return IntegAlg.SHA_256_128
+ return get_enum_instance(CryptoAlg, crypto_alg).scapy_name
+ # The below to keywords differ only by enum type conversion from str.
@staticmethod
- def integ_alg_sha_512_256() -> IntegAlg:
- """Return integrity algorithm SHA-512-256.
-
- :returns: IntegAlg enum SHA_512_256 object.
- :rtype: IntegAlg
- """
- return IntegAlg.SHA_512_256
-
- @staticmethod
- def get_integ_alg_key_len(integ_alg: Optional[IntegAlg]) -> int:
+ def get_integ_alg_key_len(integ_alg: IntegAlg.InputType) -> int:
"""Return integrity algorithm key length.
- None argument is accepted, returning zero.
-
:param integ_alg: Integrity algorithm.
- :type integ_alg: Optional[IntegAlg]
+ :type integ_alg: IntegAlg.InputType
:returns: Key length.
:rtype: int
"""
- return 0 if integ_alg is None else integ_alg.key_len
+ return get_enum_instance(IntegAlg, integ_alg).key_len
@staticmethod
- def get_integ_alg_scapy_name(integ_alg: Optional[IntegAlg]) -> str:
+ def get_integ_alg_scapy_name(integ_alg: IntegAlg.InputType) -> str:
"""Return integrity algorithm scapy name.
:param integ_alg: Integrity algorithm.
- :type integ_alg: IntegAlg
+ :type integ_alg: IntegAlg.InputType
:returns: Algorithm scapy name.
:rtype: str
"""
- return integ_alg.scapy_name
-
- @staticmethod
- def ipsec_proto_esp() -> int:
- """Return IPSec protocol ESP.
-
- :returns: IPsecProto enum ESP object.
- :rtype: IPsecProto
- """
- return int(IPsecProto.IPSEC_API_PROTO_ESP)
-
- @staticmethod
- def ipsec_proto_ah() -> int:
- """Return IPSec protocol AH.
-
- :returns: IPsecProto enum AH object.
- :rtype: IPsecProto
- """
- return int(IPsecProto.IPSEC_API_PROTO_AH)
+ return get_enum_instance(IntegAlg, integ_alg).scapy_name
@staticmethod
def vpp_ipsec_select_backend(
- node: dict, protocol: int, index: int = 1
+ node: dict, proto: IPsecProto.InputType, index: int = 1
) -> None:
"""Select IPsec backend.
:param node: VPP node to select IPsec backend on.
- :param protocol: IPsec protocol.
+ :param proto: IPsec protocol.
:param index: Backend index.
:type node: dict
- :type protocol: IPsecProto
+ :type proto: IPsecProto.InputType
:type index: int
:raises RuntimeError: If failed to select IPsec backend or if no API
reply received.
"""
+ proto = get_enum_instance(IPsecProto, proto)
cmd = "ipsec_select_backend"
err_msg = f"Failed to select IPsec backend on host {node['host']}"
- args = dict(protocol=protocol, index=index)
+ args = dict(protocol=proto, index=index)
with PapiSocketExecutor(node) as papi_exec:
papi_exec.add(cmd, **args).get_reply(err_msg)
@@ -420,9 +375,9 @@ class IPsecUtil:
node: dict,
sad_id: int,
spi: int,
- crypto_alg: CryptoAlg,
- crypto_key: str,
- integ_alg: Optional[IntegAlg] = None,
+ crypto_alg: CryptoAlg.InputType = None,
+ crypto_key: str = "",
+ integ_alg: IntegAlg.InputType = None,
integ_key: str = "",
tunnel_src: Optional[str] = None,
tunnel_dst: Optional[str] = None,
@@ -443,13 +398,15 @@ class IPsecUtil:
:type node: dict
:type sad_id: int
:type spi: int
- :type crypto_alg: CryptoAlg
+ :type crypto_alg: CryptoAlg.InputType
:type crypto_key: str
- :type integ_alg: Optional[IntegAlg]
+ :type integ_alg: IntegAlg.InputType
:type integ_key: str
:type tunnel_src: Optional[str]
:type tunnel_dst: Optional[str]
"""
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
if isinstance(crypto_key, str):
crypto_key = crypto_key.encode(encoding="utf-8")
if isinstance(integ_key, str):
@@ -480,7 +437,7 @@ class IPsecUtil:
spi=int(spi),
crypto_algorithm=crypto_alg.alg_int_repr,
crypto_key=ckey,
- integrity_algorithm=integ_alg.alg_int_repr if integ_alg else 0,
+ integrity_algorithm=integ_alg.alg_int_repr,
integrity_key=ikey,
flags=flags,
tunnel=dict(
@@ -492,7 +449,7 @@ class IPsecUtil:
),
dscp=int(IpDscp.IP_API_DSCP_CS0),
),
- protocol=int(IPsecProto.IPSEC_API_PROTO_ESP),
+ protocol=IPsecProto.ESP,
udp_src_port=IPSEC_UDP_PORT_DEFAULT,
udp_dst_port=IPSEC_UDP_PORT_DEFAULT,
anti_replay_window_size=IPSEC_REPLAY_WINDOW_DEFAULT,
@@ -507,9 +464,9 @@ class IPsecUtil:
n_entries: int,
sad_id: int,
spi: int,
- crypto_alg: CryptoAlg,
- crypto_key: str,
- integ_alg: Optional[IntegAlg] = None,
+ crypto_alg: CryptoAlg.InputType = None,
+ crypto_key: str = "",
+ integ_alg: IntegAlg.InputType = None,
integ_key: str = "",
tunnel_src: Optional[str] = None,
tunnel_dst: Optional[str] = None,
@@ -537,14 +494,16 @@ class IPsecUtil:
:type n_entries: int
:type sad_id: int
:type spi: int
- :type crypto_alg: CryptoAlg
+ :type crypto_alg: CryptoAlg.InputType
:type crypto_key: str
- :type integ_alg: Optional[IntegAlg]
+ :type integ_alg: IntegAlg.InputType
:type integ_key: str
:type tunnel_src: Optional[str]
:type tunnel_dst: Optional[str]
:type tunnel_addr_incr: bool
"""
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
if isinstance(crypto_key, str):
crypto_key = crypto_key.encode(encoding="utf-8")
if isinstance(integ_key, str):
@@ -585,7 +544,7 @@ class IPsecUtil:
spi=int(spi),
crypto_algorithm=crypto_alg.alg_int_repr,
crypto_key=ckey,
- integrity_algorithm=integ_alg.alg_int_repr if integ_alg else 0,
+ integrity_algorithm=integ_alg.alg_int_repr,
integrity_key=ikey,
flags=flags,
tunnel=dict(
@@ -597,7 +556,7 @@ class IPsecUtil:
),
dscp=int(IpDscp.IP_API_DSCP_CS0),
),
- protocol=int(IPsecProto.IPSEC_API_PROTO_ESP),
+ protocol=IPsecProto.ESP,
udp_src_port=IPSEC_UDP_PORT_DEFAULT,
udp_dst_port=IPSEC_UDP_PORT_DEFAULT,
anti_replay_window_size=IPSEC_REPLAY_WINDOW_DEFAULT,
@@ -774,7 +733,7 @@ class IPsecUtil:
entry_amount: int,
local_addr_range: Union[str, IPv4Address, IPv6Address],
remote_addr_range: Union[str, IPv4Address, IPv6Address],
- action: PolicyAction = PolicyAction.BYPASS,
+ action: IpsecSpdAction.InputType = IpsecSpdAction.BYPASS,
inbound: bool = False,
bidirectional: bool = True,
) -> None:
@@ -801,7 +760,7 @@ class IPsecUtil:
:param remote_addr_range: Matching remote address range in
direction 1 in format IP/prefix or IP/mask. If no mask is
provided, it's considered to be /32.
- :param action: Policy action.
+ :param action: IPsec SPD action.
:param inbound: If True policy is for inbound traffic, otherwise
outbound.
:param bidirectional: When True, will create SPDs in both directions
@@ -814,14 +773,16 @@ class IPsecUtil:
Union[str, IPv4Address, IPv6Address]
:type remote_addr_range:
Union[str, IPv4Address, IPv6Address]
- :type action: PolicyAction
+ :type action: IpsecSpdAction.InputType
:type inbound: bool
:type bidirectional: bool
- :raises NotImplementedError: When the action is PolicyAction.PROTECT.
+ :raises NotImplementedError: When the action is IpsecSpdAction.PROTECT.
"""
-
- if action == PolicyAction.PROTECT:
- raise NotImplementedError("Policy action PROTECT is not supported.")
+ action = get_enum_instance(IpsecSpdAction, action)
+ if action == IpsecSpdAction.PROTECT:
+ raise NotImplementedError(
+ "IPsec SPD action PROTECT is not supported."
+ )
spd_id_dir1 = 1
spd_id_dir2 = 2
@@ -913,10 +874,10 @@ class IPsecUtil:
executor: PapiSocketExecutor,
spd_id: int,
priority: int,
- action: PolicyAction,
+ action: IpsecSpdAction.InputType,
inbound: bool = True,
sa_id: Optional[int] = None,
- proto: Optional[int] = None,
+ proto: IPsecProto.InputType = None,
laddr_range: Optional[str] = None,
raddr_range: Optional[str] = None,
lport_range: Optional[str] = None,
@@ -932,10 +893,10 @@ class IPsecUtil:
:param executor: Open PAPI executor (async handling) to add commands to.
:param spd_id: SPD ID to add entry on.
:param priority: SPD entry priority, higher number = higher priority.
- :param action: Policy action.
+ :param action: IPsec SPD action.
:param inbound: If True policy is for inbound traffic, otherwise
outbound.
- :param sa_id: SAD entry ID for action PolicyAction.PROTECT.
+ :param sa_id: SAD entry ID for action IpsecSpdAction.PROTECT.
:param proto: Policy selector next layer protocol number.
:param laddr_range: Policy selector local IPv4 or IPv6 address range
in format IP/prefix or IP/mask. If no mask is provided,
@@ -952,16 +913,18 @@ class IPsecUtil:
:type executor: PapiSocketExecutor
:type spd_id: int
:type priority: int
- :type action: PolicyAction
+ :type action: IpsecSpdAction.InputType
:type inbound: bool
:type sa_id: Optional[int]
- :type proto: Optional[int]
+ :type proto: IPsecProto.InputType
:type laddr_range: Optional[str]
:type raddr_range: Optional[str]
:type lport_range: Optional[str]
:type rport_range: Optional[str]
:type is_ipv6: bool
"""
+ action = get_enum_instance(IpsecSpdAction, action)
+ proto = get_enum_instance(IPsecProto, proto)
if laddr_range is None:
laddr_range = "::/0" if is_ipv6 else "0.0.0.0/0"
@@ -979,7 +942,7 @@ class IPsecUtil:
is_outbound=not inbound,
sa_id=int(sa_id) if sa_id else 0,
policy=int(action),
- protocol=255 if proto is None else int(proto),
+ protocol=proto,
remote_address_start=IPAddress.create_ip_address_object(
remote_net.network_address
),
@@ -1013,10 +976,10 @@ class IPsecUtil:
node: dict,
spd_id: int,
priority: int,
- action: PolicyAction,
+ action: IpsecSpdAction.InputType,
inbound: bool = True,
sa_id: Optional[int] = None,
- proto: Optional[int] = None,
+ proto: IPsecProto.InputType = None,
laddr_range: Optional[str] = None,
raddr_range: Optional[str] = None,
lport_range: Optional[str] = None,
@@ -1028,10 +991,10 @@ class IPsecUtil:
:param node: VPP node to add SPD entry on.
:param spd_id: SPD ID to add entry on.
:param priority: SPD entry priority, higher number = higher priority.
- :param action: Policy action.
+ :param action: IPsec SPD action.
:param inbound: If True policy is for inbound traffic, otherwise
outbound.
- :param sa_id: SAD entry ID for action PolicyAction.PROTECT.
+ :param sa_id: SAD entry ID for action IpsecSpdAction.PROTECT.
:param proto: Policy selector next layer protocol number.
:param laddr_range: Policy selector local IPv4 or IPv6 address range
in format IP/prefix or IP/mask. If no mask is provided,
@@ -1048,16 +1011,18 @@ class IPsecUtil:
:type node: dict
:type spd_id: int
:type priority: int
- :type action: PolicyAction
+ :type action: IpsecSpdAction.InputType
:type inbound: bool
:type sa_id: Optional[int]
- :type proto: Optional[int]
+ :type proto: IPsecProto.InputType
:type laddr_range: Optional[str]
:type raddr_range: Optional[str]
:type lport_range: Optional[str]
:type rport_range: Optional[str]
:type is_ipv6: bool
"""
+ action = get_enum_instance(IpsecSpdAction, action)
+ proto = get_enum_instance(IPsecProto, proto)
err_msg = (
"Failed to add entry to Security Policy Database"
f" {spd_id} on host {node['host']}"
@@ -1085,10 +1050,10 @@ class IPsecUtil:
n_entries: int,
spd_id: int,
priority: Optional[ObjIncrement],
- action: PolicyAction,
+ action: IpsecSpdAction.InputType,
inbound: bool,
sa_id: Optional[ObjIncrement] = None,
- proto: Optional[int] = None,
+ proto: IPsecProto.InputType = None,
laddr_range: Optional[NetworkIncrement] = None,
raddr_range: Optional[NetworkIncrement] = None,
lport_range: Optional[str] = None,
@@ -1101,10 +1066,10 @@ class IPsecUtil:
:param n_entries: Number of SPD entries to be added.
:param spd_id: SPD ID to add entries on.
:param priority: SPD entries priority, higher number = higher priority.
- :param action: Policy action.
+ :param action: IPsec SPD action.
:param inbound: If True policy is for inbound traffic, otherwise
outbound.
- :param sa_id: SAD entry ID for action PolicyAction.PROTECT.
+ :param sa_id: SAD entry ID for action IpsecSpdAction.PROTECT.
:param proto: Policy selector next layer protocol number.
:param laddr_range: Policy selector local IPv4 or IPv6 address range
in format IP/prefix or IP/mask. If no mask is provided,
@@ -1122,16 +1087,18 @@ class IPsecUtil:
:type n_entries: int
:type spd_id: int
:type priority: Optional[ObjIncrement]
- :type action: PolicyAction
+ :type action: IpsecSpdAction.InputType
:type inbound: bool
:type sa_id: Optional[ObjIncrement]
- :type proto: Optional[int]
+ :type proto: IPsecProto.InputType
:type laddr_range: Optional[NetworkIncrement]
:type raddr_range: Optional[NetworkIncrement]
:type lport_range: Optional[str]
:type rport_range: Optional[str]
:type is_ipv6: bool
"""
+ action = get_enum_instance(IpsecSpdAction, action)
+ proto = get_enum_instance(IPsecProto, proto)
if laddr_range is None:
laddr_range = "::/0" if is_ipv6 else "0.0.0.0/0"
laddr_range = NetworkIncrement(ip_network(laddr_range), 0)
@@ -1253,8 +1220,8 @@ class IPsecUtil:
if1_key: str,
if2_key: str,
n_tunnels: int,
- crypto_alg: CryptoAlg,
- integ_alg: Optional[IntegAlg],
+ crypto_alg: CryptoAlg.InputType,
+ integ_alg: IntegAlg.InputType,
raddr_ip2: Union[IPv4Address, IPv6Address],
addr_incr: int,
spi_d: dict,
@@ -1285,8 +1252,8 @@ class IPsecUtil:
:type if1_key: str
:type if2_key: str
:type n_tunnels: int
- :type crypto_alg: CryptoAlg
- :type integ_alg: Optional[IntegAlg]
+ :type crypto_alg: CryptoAlg.InputType
+ :type integ_alg: IntegAlg.InputType
:type raddr_ip2: Union[IPv4Address, IPv6Address]
:type addr_incr: int
:type spi_d: dict
@@ -1294,6 +1261,8 @@ class IPsecUtil:
:returns: Generated ckeys and ikeys.
:rtype: List[bytes], List[bytes]
"""
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
if not existing_tunnels:
loop_sw_if_idx = IPsecUtil._ipsec_create_loopback_dut1_papi(
nodes, tun_ips, if1_key, if2_key
@@ -1365,10 +1334,10 @@ class IPsecUtil:
sad_entry = dict(
sad_id=None,
spi=None,
- protocol=int(IPsecProto.IPSEC_API_PROTO_ESP),
+ protocol=IPsecProto.ESP,
crypto_algorithm=crypto_alg.alg_int_repr,
crypto_key=c_key,
- integrity_algorithm=integ_alg.alg_int_repr if integ_alg else 0,
+ integrity_algorithm=integ_alg.alg_int_repr,
integrity_key=i_key,
flags=common_flags,
tunnel=dict(
@@ -1387,12 +1356,8 @@ class IPsecUtil:
)
args = dict(entry=sad_entry)
for i in range(existing_tunnels, n_tunnels):
- ckeys.append(
- gen_key(IPsecUtil.get_crypto_alg_key_len(crypto_alg))
- )
- ikeys.append(
- gen_key(IPsecUtil.get_integ_alg_key_len(integ_alg))
- )
+ ckeys.append(gen_key(crypto_alg.key_len))
+ ikeys.append(gen_key(integ_alg.key_len))
# SAD entry for outband / tx path
sad_entry["sad_id"] = i
sad_entry["spi"] = spi_d["spi_1"] + i
@@ -1497,9 +1462,9 @@ class IPsecUtil:
tun_ips: dict,
if2_key: str,
n_tunnels: int,
- crypto_alg: CryptoAlg,
+ crypto_alg: CryptoAlg.InputType,
ckeys: Sequence[bytes],
- integ_alg: Optional[IntegAlg],
+ integ_alg: IntegAlg.InputType,
ikeys: Sequence[bytes],
raddr_ip1: Union[IPv4Address, IPv6Address],
addr_incr: int,
@@ -1532,15 +1497,17 @@ class IPsecUtil:
:type tun_ips: dict
:type if2_key: str
:type n_tunnels: int
- :type crypto_alg: CryptoAlg
+ :type crypto_alg: CryptoAlg.InputType
:type ckeys: Sequence[bytes]
- :type integ_alg: Optional[IntegAlg]
+ :type integ_alg: IntegAlg.InputType
:type ikeys: Sequence[bytes]
:type raddr_ip1: Union[IPv4Address, IPv6Address]
:type addr_incr: int
:type spi_d: dict
:type existing_tunnels: int
"""
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
with PapiSocketExecutor(nodes["DUT2"], is_async=True) as papi_exec:
if not existing_tunnels:
# Set IP address on VPP node 2 interface
@@ -1605,10 +1572,10 @@ class IPsecUtil:
sad_entry = dict(
sad_id=None,
spi=None,
- protocol=int(IPsecProto.IPSEC_API_PROTO_ESP),
+ protocol=IPsecProto.ESP,
crypto_algorithm=crypto_alg.alg_int_repr,
crypto_key=c_key,
- integrity_algorithm=integ_alg.alg_int_repr if integ_alg else 0,
+ integrity_algorithm=integ_alg.alg_int_repr,
integrity_key=i_key,
flags=common_flags,
tunnel=dict(
@@ -1627,12 +1594,8 @@ class IPsecUtil:
)
args = dict(entry=sad_entry)
for i in range(existing_tunnels, n_tunnels):
- ckeys.append(
- gen_key(IPsecUtil.get_crypto_alg_key_len(crypto_alg))
- )
- ikeys.append(
- gen_key(IPsecUtil.get_integ_alg_key_len(integ_alg))
- )
+ ckeys.append(gen_key(crypto_alg.key_len))
+ ikeys.append(gen_key(integ_alg.key_len))
# SAD entry for outband / tx path
sad_entry["sad_id"] = 100000 + i
sad_entry["spi"] = spi_d["spi_2"] + i
@@ -1749,8 +1712,8 @@ class IPsecUtil:
if1_key: str,
if2_key: str,
n_tunnels: int,
- crypto_alg: CryptoAlg,
- integ_alg: Optional[IntegAlg],
+ crypto_alg: CryptoAlg.InputType,
+ integ_alg: IntegAlg.InputType,
raddr_ip1: str,
raddr_ip2: str,
raddr_range: int,
@@ -1790,8 +1753,8 @@ class IPsecUtil:
:type if1_key: str
:type if2_key: str
:type n_tunnels: int
- :type crypto_alg: CryptoAlg
- :type integ_alg: Optional[IntegAlg]
+ :type crypto_alg: CryptoAlg.InputType
+ :type integ_alg: IntegAlg.InputType
:type raddr_ip1: str
:type raddr_ip2: str
:type raddr_range: int
@@ -1800,6 +1763,8 @@ class IPsecUtil:
:returns: Ckeys, ikeys, spi_1, spi_2.
:rtype: Optional[Tuple[List[bytes], List[bytes], int, int]]
"""
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
n_tunnels = int(n_tunnels)
existing_tunnels = int(existing_tunnels)
spi_d = dict(spi_1=100000, spi_2=200000)
@@ -1896,8 +1861,8 @@ class IPsecUtil:
interface1: Union[str, int],
interface2: Union[str, int],
n_tunnels: int,
- crypto_alg: CryptoAlg,
- integ_alg: Optional[IntegAlg],
+ crypto_alg: CryptoAlg.InputType,
+ integ_alg: IntegAlg.InputType,
tunnel_ip1: str,
tunnel_ip2: str,
raddr_ip1: str,
@@ -1927,8 +1892,8 @@ class IPsecUtil:
:type interface1: Union[str, int]
:type interface2: Union[str, int]
:type n_tunnels: int
- :type crypto_alg: CryptoAlg
- :type integ_alg: Optional[IntegAlg]
+ :type crypto_alg: CryptoAlg.InputType
+ :type integ_alg: IntegAlg.InputType
:type tunnel_ip1: str
:type tunnel_ip2: str
:type raddr_ip1: str
@@ -1936,6 +1901,9 @@ class IPsecUtil:
:type raddr_range: int
:type tunnel_addr_incr: bool
"""
+ crypto_alg = get_enum_instance(CryptoAlg, crypto_alg)
+ integ_alg = get_enum_instance(IntegAlg, integ_alg)
+
spd_id = 1
p_hi = 100
p_lo = 10
@@ -1944,15 +1912,8 @@ class IPsecUtil:
spi_1 = 300000
spi_2 = 400000
- crypto_key = gen_key(
- IPsecUtil.get_crypto_alg_key_len(crypto_alg)
- ).decode()
- integ_key = (
- gen_key(IPsecUtil.get_integ_alg_key_len(integ_alg)).decode()
- if integ_alg
- else ""
- )
-
+ crypto_key = gen_key(crypto_alg.key_len).decode()
+ integ_key = gen_key(integ_alg.key_len).decode()
rmac = (
Topology.get_interface_mac(nodes["DUT2"], interface2)
if "DUT2" in nodes.keys()
@@ -1989,9 +1950,9 @@ class IPsecUtil:
nodes["DUT1"],
spd_id,
p_hi,
- PolicyAction.BYPASS,
+ IpsecSpdAction.BYPASS,
inbound=False,
- proto=50,
+ proto=IPsecProto.ESP,
laddr_range=dut1_local_outbound_range,
raddr_range=dut1_remote_outbound_range,
)
@@ -1999,9 +1960,9 @@ class IPsecUtil:
nodes["DUT1"],
spd_id,
p_hi,
- PolicyAction.BYPASS,
+ IpsecSpdAction.BYPASS,
inbound=True,
- proto=50,
+ proto=IPsecProto.ESP,
laddr_range=dut1_remote_outbound_range,
raddr_range=dut1_local_outbound_range,
)
@@ -2025,7 +1986,7 @@ class IPsecUtil:
n_tunnels,
spd_id,
priority=ObjIncrement(p_lo, 0),
- action=PolicyAction.PROTECT,
+ action=IpsecSpdAction.PROTECT,
inbound=False,
sa_id=ObjIncrement(sa_id_1, 1),
raddr_range=NetworkIncrement(ip_network(raddr_ip2)),
@@ -2049,7 +2010,7 @@ class IPsecUtil:
n_tunnels,
spd_id,
priority=ObjIncrement(p_lo, 0),
- action=PolicyAction.PROTECT,
+ action=IpsecSpdAction.PROTECT,
inbound=True,
sa_id=ObjIncrement(sa_id_2, 1),
raddr_range=NetworkIncrement(ip_network(raddr_ip1)),
@@ -2082,9 +2043,9 @@ class IPsecUtil:
nodes["DUT2"],
spd_id,
p_hi,
- PolicyAction.BYPASS,
+ IpsecSpdAction.BYPASS,
inbound=False,
- proto=50,
+ proto=IPsecProto.ESP,
laddr_range=dut2_remote_outbound_range,
raddr_range=dut2_local_outbound_range,
)
@@ -2092,9 +2053,9 @@ class IPsecUtil:
nodes["DUT2"],
spd_id,
p_hi,
- PolicyAction.BYPASS,
+ IpsecSpdAction.BYPASS,
inbound=True,
- proto=50,
+ proto=IPsecProto.ESP,
laddr_range=dut2_local_outbound_range,
raddr_range=dut2_remote_outbound_range,
)
@@ -2117,7 +2078,7 @@ class IPsecUtil:
n_tunnels,
spd_id,
priority=ObjIncrement(p_lo, 0),
- action=PolicyAction.PROTECT,
+ action=IpsecSpdAction.PROTECT,
inbound=True,
sa_id=ObjIncrement(sa_id_1, 1),
raddr_range=NetworkIncrement(ip_network(raddr_ip2)),
@@ -2141,7 +2102,7 @@ class IPsecUtil:
n_tunnels,
spd_id,
priority=ObjIncrement(p_lo, 0),
- action=PolicyAction.PROTECT,
+ action=IpsecSpdAction.PROTECT,
inbound=False,
sa_id=ObjIncrement(sa_id_2, 1),
raddr_range=NetworkIncrement(ip_network(raddr_ip1)),
@@ -2168,7 +2129,10 @@ class IPsecUtil:
@staticmethod
def vpp_ipsec_flow_enable_rss(
- node: dict, proto: str, rss_type: str, function: str = "default"
+ node: dict,
+ proto: str = "IPSEC_ESP",
+ rss_type: str = "esp",
+ function: str = "default",
) -> int:
"""Ipsec flow enable rss action.
@@ -2176,14 +2140,18 @@ class IPsecUtil:
:param proto: The flow protocol.
:param rss_type: RSS type.
:param function: RSS function.
-
:type node: dict
- :type proto: str
+ :type proto: IPsecProto.InputType
:type rss_type: str
:type function: str
:returns: flow_index.
:rtype: int
"""
+ # The proto argument does not correspond to IPsecProto.
+ # The allowed values come from src/vnet/ip/protocols.def
+ # and we do not have a good enum for that yet.
+ # FlowUti. and FlowUtil. are close but not exactly the same.
+
# TODO: to be fixed to use full PAPI when it is ready in VPP
cmd = (
f"test flow add src-ip any proto {proto} rss function"