diff options
-rw-r--r-- | resources/libraries/python/IPsecUtil.py | 1179 |
1 files changed, 714 insertions, 465 deletions
diff --git a/resources/libraries/python/IPsecUtil.py b/resources/libraries/python/IPsecUtil.py index 214764d233..7c80ba983e 100644 --- a/resources/libraries/python/IPsecUtil.py +++ b/resources/libraries/python/IPsecUtil.py @@ -24,11 +24,17 @@ from robot.libraries.BuiltIn import BuiltIn from resources.libraries.python.Constants import Constants from resources.libraries.python.IncrementUtil import ObjIncrement -from resources.libraries.python.InterfaceUtil import InterfaceUtil, \ - InterfaceStatusFlags +from resources.libraries.python.InterfaceUtil import ( + InterfaceUtil, + InterfaceStatusFlags, +) from resources.libraries.python.IPAddress import IPAddress -from resources.libraries.python.IPUtil import IPUtil, IpDscp, \ - MPLS_LABEL_INVALID, NetworkIncrement +from resources.libraries.python.IPUtil import ( + IPUtil, + IpDscp, + MPLS_LABEL_INVALID, + NetworkIncrement, +) from resources.libraries.python.PapiExecutor import PapiSocketExecutor from resources.libraries.python.ssh import scp_node from resources.libraries.python.topology import Topology, NodeType @@ -48,16 +54,17 @@ def gen_key(length): :returns: The generated payload. :rtype: bytes """ - return u"".join( - choice(ascii_letters) for _ in range(length) - ).encode(encoding=u"utf-8") + return "".join(choice(ascii_letters) for _ in range(length)).encode( + encoding="utf-8" + ) class PolicyAction(Enum): """Policy actions.""" - BYPASS = (u"bypass", 0) - DISCARD = (u"discard", 1) - PROTECT = (u"protect", 3) + + BYPASS = ("bypass", 0) + DISCARD = ("discard", 1) + PROTECT = ("protect", 3) def __init__(self, policy_name, policy_int_repr): self.policy_name = policy_name @@ -72,10 +79,11 @@ class PolicyAction(Enum): class CryptoAlg(Enum): """Encryption algorithms.""" - AES_CBC_128 = (u"aes-cbc-128", 1, u"AES-CBC", 16) - AES_CBC_256 = (u"aes-cbc-256", 3, u"AES-CBC", 32) - AES_GCM_128 = (u"aes-gcm-128", 7, u"AES-GCM", 16) - AES_GCM_256 = (u"aes-gcm-256", 9, u"AES-GCM", 32) + + AES_CBC_128 = ("aes-cbc-128", 1, "AES-CBC", 16) + AES_CBC_256 = ("aes-cbc-256", 3, "AES-CBC", 32) + AES_GCM_128 = ("aes-gcm-128", 7, "AES-GCM", 16) + AES_GCM_256 = ("aes-gcm-256", 9, "AES-GCM", 32) def __init__(self, alg_name, alg_int_repr, scapy_name, key_len): self.alg_name = alg_name @@ -86,8 +94,9 @@ class CryptoAlg(Enum): class IntegAlg(Enum): """Integrity algorithm.""" - SHA_256_128 = (u"sha-256-128", 4, u"SHA2-256-128", 32) - SHA_512_256 = (u"sha-512-256", 6, u"SHA2-512-256", 64) + + SHA_256_128 = ("sha-256-128", 4, "SHA2-256-128", 32) + SHA_512_256 = ("sha-512-256", 6, "SHA2-512-256", 64) def __init__(self, alg_name, alg_int_repr, scapy_name, key_len): self.alg_name = alg_name @@ -98,12 +107,14 @@ class IntegAlg(Enum): class IPsecProto(IntEnum): """IPsec protocol.""" + IPSEC_API_PROTO_ESP = 50 IPSEC_API_PROTO_AH = 51 class IPsecSadFlags(IntEnum): """IPsec Security Association Database flags.""" + IPSEC_API_SAD_FLAG_NONE = 0 # Enable extended sequence numbers IPSEC_API_SAD_FLAG_USE_ESN = 0x01 @@ -122,6 +133,7 @@ class IPsecSadFlags(IntEnum): class TunnelEncpaDecapFlags(IntEnum): """Flags controlling tunnel behaviour.""" + TUNNEL_API_ENCAP_DECAP_FLAG_NONE = 0 # at encap, copy the DF bit of the payload into the tunnel header TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DF = 1 @@ -137,6 +149,7 @@ class TunnelEncpaDecapFlags(IntEnum): class TunnelMode(IntEnum): """Tunnel modes.""" + # point-to-point TUNNEL_API_MODE_P2P = 0 # multi-point @@ -304,12 +317,9 @@ class IPsecUtil: :raises RuntimeError: If failed to select IPsec backend or if no API reply received. """ - cmd = u"ipsec_select_backend" + cmd = "ipsec_select_backend" err_msg = f"Failed to select IPsec backend on host {node[u'host']}" - args = dict( - protocol=protocol, - index=index - ) + args = dict(protocol=protocol, index=index) with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) @@ -327,11 +337,9 @@ class IPsecUtil: reply received. """ with PapiSocketExecutor(node) as papi_exec: - cmd = u"ipsec_set_async_mode" + cmd = "ipsec_set_async_mode" err_msg = f"Failed to set IPsec async mode on host {node[u'host']}" - args = dict( - async_enable=async_enable - ) + args = dict(async_enable=async_enable) papi_exec.add(cmd, **args).get_reply(err_msg) cmd = "crypto_set_async_dispatch_v2" err_msg = "Failed to set dispatch mode." @@ -346,7 +354,8 @@ class IPsecUtil: @staticmethod def vpp_ipsec_crypto_sw_scheduler_set_worker( - node, workers, crypto_enable=False): + node, workers, crypto_enable=False + ): """Enable or disable crypto on specific vpp worker threads. :param node: VPP node to enable or disable crypto for worker threads. @@ -359,19 +368,19 @@ class IPsecUtil: thread or if no API reply received. """ for worker in workers: - cmd = u"crypto_sw_scheduler_set_worker" - err_msg = f"Failed to disable/enable crypto for worker thread " \ + cmd = "crypto_sw_scheduler_set_worker" + err_msg = ( + f"Failed to disable/enable crypto for worker thread " f"on host {node[u'host']}" - args = dict( - worker_index=worker - 1, - crypto_enable=crypto_enable ) + args = dict(worker_index=worker - 1, crypto_enable=crypto_enable) with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) @staticmethod def vpp_ipsec_crypto_sw_scheduler_set_worker_on_all_duts( - nodes, crypto_enable=False): + nodes, crypto_enable=False + ): """Enable or disable crypto on specific vpp worker threads. :param node: VPP node to enable or disable crypto for worker threads. @@ -392,7 +401,7 @@ class IPsecUtil: f"${{{node_name}_cpu_dp}}" ) for item in thread_data: - if str(item.cpu_id) in workers.split(u","): + if str(item.cpu_id) in workers.split(","): worker_ids.append(item.id) IPsecUtil.vpp_ipsec_crypto_sw_scheduler_set_worker( @@ -401,8 +410,16 @@ class IPsecUtil: @staticmethod def vpp_ipsec_add_sad_entry( - node, sad_id, spi, crypto_alg, crypto_key, integ_alg=None, - integ_key=u"", tunnel_src=None, tunnel_dst=None): + node, + sad_id, + spi, + crypto_alg, + crypto_key, + integ_alg=None, + integ_key="", + tunnel_src=None, + tunnel_dst=None, + ): """Create Security Association Database entry on the VPP node. :param node: VPP node to add SAD entry on. @@ -427,17 +444,11 @@ class IPsecUtil: :type tunnel_dst: str """ if isinstance(crypto_key, str): - crypto_key = crypto_key.encode(encoding=u"utf-8") + crypto_key = crypto_key.encode(encoding="utf-8") if isinstance(integ_key, str): - integ_key = integ_key.encode(encoding=u"utf-8") - ckey = dict( - length=len(crypto_key), - data=crypto_key - ) - ikey = dict( - length=len(integ_key), - data=integ_key if integ_key else 0 - ) + integ_key = integ_key.encode(encoding="utf-8") + ckey = dict(length=len(crypto_key), data=crypto_key) + ikey = dict(length=len(integ_key), data=integ_key if integ_key else 0) flags = int(IPsecSadFlags.IPSEC_API_SAD_FLAG_NONE) if tunnel_src and tunnel_dst: @@ -445,15 +456,18 @@ class IPsecUtil: src_addr = ip_address(tunnel_src) dst_addr = ip_address(tunnel_dst) if src_addr.version == 6: - flags = \ - flags | int(IPsecSadFlags.IPSEC_API_SAD_FLAG_IS_TUNNEL_V6) + flags = flags | int( + IPsecSadFlags.IPSEC_API_SAD_FLAG_IS_TUNNEL_V6 + ) else: - src_addr = u"" - dst_addr = u"" + src_addr = "" + dst_addr = "" - cmd = u"ipsec_sad_entry_add_v2" - err_msg = f"Failed to add Security Association Database entry " \ + cmd = "ipsec_sad_entry_add_v2" + err_msg = ( + f"Failed to add Security Association Database entry " f"on host {node[u'host']}" + ) sad_entry = dict( sad_id=int(sad_id), spi=int(spi), @@ -482,9 +496,18 @@ class IPsecUtil: @staticmethod def vpp_ipsec_add_sad_entries( - node, n_entries, sad_id, spi, crypto_alg, crypto_key, - integ_alg=None, integ_key=u"", tunnel_src=None, tunnel_dst=None, - tunnel_addr_incr=True): + node, + n_entries, + sad_id, + spi, + crypto_alg, + crypto_key, + integ_alg=None, + integ_key="", + tunnel_src=None, + tunnel_dst=None, + tunnel_addr_incr=True, + ): """Create multiple Security Association Database entries on VPP node. :param node: VPP node to add SAD entry on. @@ -516,30 +539,25 @@ class IPsecUtil: :type tunnel_addr_incr: bool """ if isinstance(crypto_key, str): - crypto_key = crypto_key.encode(encoding=u"utf-8") + crypto_key = crypto_key.encode(encoding="utf-8") if isinstance(integ_key, str): - integ_key = integ_key.encode(encoding=u"utf-8") + integ_key = integ_key.encode(encoding="utf-8") if tunnel_src and tunnel_dst: src_addr = ip_address(tunnel_src) dst_addr = ip_address(tunnel_dst) else: - src_addr = u"" - dst_addr = u"" + src_addr = "" + dst_addr = "" if tunnel_addr_incr: - addr_incr = 1 << (128 - 96) if src_addr.version == 6 \ - else 1 << (32 - 24) + addr_incr = ( + 1 << (128 - 96) if src_addr.version == 6 else 1 << (32 - 24) + ) else: addr_incr = 0 - ckey = dict( - length=len(crypto_key), - data=crypto_key - ) - ikey = dict( - length=len(integ_key), - data=integ_key if integ_key else 0 - ) + ckey = dict(length=len(crypto_key), data=crypto_key) + ikey = dict(length=len(integ_key), data=integ_key if integ_key else 0) flags = int(IPsecSadFlags.IPSEC_API_SAD_FLAG_NONE) if tunnel_src and tunnel_dst: @@ -549,9 +567,11 @@ class IPsecUtil: IPsecSadFlags.IPSEC_API_SAD_FLAG_IS_TUNNEL_V6 ) - cmd = u"ipsec_sad_entry_add_v2" - err_msg = f"Failed to add Security Association Database entry " \ + cmd = "ipsec_sad_entry_add_v2" + err_msg = ( + f"Failed to add Security Association Database entry " f"on host {node[u'host']}" + ) sad_entry = dict( sad_id=int(sad_id), @@ -578,15 +598,17 @@ class IPsecUtil: args = dict(entry=sad_entry) with PapiSocketExecutor(node, is_async=True) as papi_exec: for i in range(n_entries): - args[u"entry"][u"sad_id"] = int(sad_id) + i - args[u"entry"][u"spi"] = int(spi) + i - args[u"entry"][u"tunnel"][u"src"] = ( + args["entry"]["sad_id"] = int(sad_id) + i + args["entry"]["spi"] = int(spi) + i + args["entry"]["tunnel"]["src"] = ( str(src_addr + i * addr_incr) - if tunnel_src and tunnel_dst else src_addr + if tunnel_src and tunnel_dst + else src_addr ) - args[u"entry"][u"tunnel"][u"dst"] = ( + args["entry"]["tunnel"]["dst"] = ( str(dst_addr + i * addr_incr) - if tunnel_src and tunnel_dst else dst_addr + if tunnel_src and tunnel_dst + else dst_addr ) history = bool(not 1 < i < n_entries - 2) papi_exec.add(cmd, history=history, **args) @@ -594,8 +616,15 @@ class IPsecUtil: @staticmethod def vpp_ipsec_set_ip_route( - node, n_tunnels, tunnel_src, traffic_addr, tunnel_dst, interface, - raddr_range, dst_mac=None): + node, + n_tunnels, + tunnel_src, + traffic_addr, + tunnel_dst, + interface, + raddr_range, + dst_mac=None, + ): """Set IP address and route on interface. :param node: VPP node to add config on. @@ -621,62 +650,67 @@ class IPsecUtil: tunnel_dst = ip_address(tunnel_dst) traffic_addr = ip_address(traffic_addr) tunnel_dst_prefix = 128 if tunnel_dst.version == 6 else 32 - addr_incr = 1 << (128 - raddr_range) if tunnel_src.version == 6 \ + addr_incr = ( + 1 << (128 - raddr_range) + if tunnel_src.version == 6 else 1 << (32 - raddr_range) + ) - cmd1 = u"sw_interface_add_del_address" + cmd1 = "sw_interface_add_del_address" args1 = dict( sw_if_index=InterfaceUtil.get_interface_index(node, interface), is_add=True, del_all=False, - prefix=None - ) - cmd2 = u"ip_route_add_del" - args2 = dict( - is_add=1, - is_multipath=0, - route=None + prefix=None, ) - cmd3 = u"ip_neighbor_add_del" + cmd2 = "ip_route_add_del" + args2 = dict(is_add=1, is_multipath=0, route=None) + cmd3 = "ip_neighbor_add_del" args3 = dict( is_add=True, neighbor=dict( sw_if_index=Topology.get_interface_sw_index(node, interface), flags=0, mac_address=str(dst_mac), - ip_address=None - ) + ip_address=None, + ), + ) + err_msg = ( + f"Failed to configure IP addresses, IP routes and " + f"IP neighbor on interface {interface} on host {node[u'host']}" + if dst_mac + else f"Failed to configure IP addresses and IP routes " + f"on interface {interface} on host {node[u'host']}" ) - err_msg = f"Failed to configure IP addresses, IP routes and " \ - f"IP neighbor on interface {interface} on host {node[u'host']}" \ - if dst_mac \ - else f"Failed to configure IP addresses and IP routes " \ - f"on interface {interface} on host {node[u'host']}" with PapiSocketExecutor(node, is_async=True) as papi_exec: for i in range(n_tunnels): tunnel_dst_addr = tunnel_dst + i * addr_incr - args1[u"prefix"] = IPUtil.create_prefix_object( + args1["prefix"] = IPUtil.create_prefix_object( tunnel_src + i * addr_incr, raddr_range ) - args2[u"route"] = IPUtil.compose_vpp_route_structure( - node, traffic_addr + i, + args2["route"] = IPUtil.compose_vpp_route_structure( + node, + traffic_addr + i, prefix_len=tunnel_dst_prefix, - interface=interface, gateway=tunnel_dst_addr + interface=interface, + gateway=tunnel_dst_addr, ) history = bool(not 1 < i < n_tunnels - 2) papi_exec.add(cmd1, history=history, **args1) papi_exec.add(cmd2, history=history, **args2) - args2[u"route"] = IPUtil.compose_vpp_route_structure( - node, tunnel_dst_addr, + args2["route"] = IPUtil.compose_vpp_route_structure( + node, + tunnel_dst_addr, prefix_len=tunnel_dst_prefix, - interface=interface, gateway=tunnel_dst_addr + interface=interface, + gateway=tunnel_dst_addr, ) papi_exec.add(cmd2, history=history, **args2) if dst_mac: - args3[u"neighbor"][u"ip_address"] = ip_address( + args3["neighbor"]["ip_address"] = ip_address( tunnel_dst_addr ) papi_exec.add(cmd3, history=history, **args3) @@ -691,13 +725,12 @@ class IPsecUtil: :type node: dict :type spd_id: int """ - cmd = u"ipsec_spd_add_del" - err_msg = f"Failed to add Security Policy Database " \ + cmd = "ipsec_spd_add_del" + err_msg = ( + f"Failed to add Security Policy Database " f"on host {node[u'host']}" - args = dict( - is_add=True, - spd_id=int(spd_id) ) + args = dict(is_add=True, spd_id=int(spd_id)) with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) @@ -712,22 +745,31 @@ class IPsecUtil: :type spd_id: int :type interface: str or int """ - cmd = u"ipsec_interface_add_del_spd" - err_msg = f"Failed to add interface {interface} to Security Policy " \ + cmd = "ipsec_interface_add_del_spd" + err_msg = ( + f"Failed to add interface {interface} to Security Policy " f"Database {spd_id} on host {node[u'host']}" + ) args = dict( is_add=True, sw_if_index=InterfaceUtil.get_interface_index(node, interface), - spd_id=int(spd_id) + spd_id=int(spd_id), ) with PapiSocketExecutor(node) as papi_exec: papi_exec.add(cmd, **args).get_reply(err_msg) @staticmethod def vpp_ipsec_create_spds_match_nth_entry( - node, dir1_interface, dir2_interface, entry_amount, - local_addr_range, remote_addr_range, action=PolicyAction.BYPASS, - inbound=False, bidirectional=True): + node, + dir1_interface, + dir2_interface, + entry_amount, + local_addr_range, + remote_addr_range, + action=PolicyAction.BYPASS, + inbound=False, + bidirectional=True, + ): """Create one matching SPD entry for inbound or outbound traffic on a DUT for each traffic direction and also create entry_amount - 1 non-matching SPD entries. Create a Security Policy Database on each @@ -771,7 +813,7 @@ class IPsecUtil: """ if action == PolicyAction.PROTECT: - raise NotImplementedError('Policy action PROTECT is not supported.') + raise NotImplementedError("Policy action PROTECT is not supported.") spd_id_dir1 = 1 spd_id_dir2 = 2 @@ -781,9 +823,13 @@ class IPsecUtil: IPsecUtil.vpp_ipsec_spd_add_if(node, spd_id_dir1, dir1_interface) # matching entry direction 1 IPsecUtil.vpp_ipsec_add_spd_entry( - node, spd_id_dir1, matching_priority, action, - inbound=inbound, laddr_range=local_addr_range, - raddr_range=remote_addr_range + node, + spd_id_dir1, + matching_priority, + action, + inbound=inbound, + laddr_range=local_addr_range, + raddr_range=remote_addr_range, ) if bidirectional: @@ -792,9 +838,13 @@ class IPsecUtil: # matching entry direction 2, the address ranges are switched IPsecUtil.vpp_ipsec_add_spd_entry( - node, spd_id_dir2, matching_priority, action, - inbound=inbound, laddr_range=remote_addr_range, - raddr_range=local_addr_range + node, + spd_id_dir2, + matching_priority, + action, + inbound=inbound, + laddr_range=remote_addr_range, + raddr_range=local_addr_range, ) # non-matching entries @@ -814,10 +864,14 @@ class IPsecUtil: # non-matching entries direction 1 IPsecUtil.vpp_ipsec_add_spd_entries( - node, no_match_entry_amount, spd_id_dir1, - ObjIncrement(matching_priority + 1, 1), action, - inbound=inbound, laddr_range=no_match_local_addr_range, - raddr_range=no_match_remote_addr_range + node, + no_match_entry_amount, + spd_id_dir1, + ObjIncrement(matching_priority + 1, 1), + action, + inbound=inbound, + laddr_range=no_match_local_addr_range, + raddr_range=no_match_remote_addr_range, ) if bidirectional: @@ -834,19 +888,33 @@ class IPsecUtil: next(no_match_local_addr_range) # non-matching entries direction 2 IPsecUtil.vpp_ipsec_add_spd_entries( - node, no_match_entry_amount, spd_id_dir2, - ObjIncrement(matching_priority + 1, 1), action, - inbound=inbound, laddr_range=no_match_local_addr_range, - raddr_range=no_match_remote_addr_range + node, + no_match_entry_amount, + spd_id_dir2, + ObjIncrement(matching_priority + 1, 1), + action, + inbound=inbound, + laddr_range=no_match_local_addr_range, + raddr_range=no_match_remote_addr_range, ) IPsecUtil.vpp_ipsec_show_all(node) @staticmethod def _vpp_ipsec_add_spd_entry_internal( - executor, spd_id, priority, action, inbound=True, sa_id=None, - proto=None, laddr_range=None, raddr_range=None, lport_range=None, - rport_range=None, is_ipv6=False): + executor, + spd_id, + priority, + action, + inbound=True, + sa_id=None, + proto=None, + laddr_range=None, + raddr_range=None, + lport_range=None, + rport_range=None, + is_ipv6=False, + ): """Prepare to create Security Policy Database entry on the VPP node. This just adds one more command to the executor. @@ -887,15 +955,15 @@ class IPsecUtil: :type is_ipv6: bool """ if laddr_range is None: - laddr_range = u"::/0" if is_ipv6 else u"0.0.0.0/0" + laddr_range = "::/0" if is_ipv6 else "0.0.0.0/0" if raddr_range is None: - raddr_range = u"::/0" if is_ipv6 else u"0.0.0.0/0" + raddr_range = "::/0" if is_ipv6 else "0.0.0.0/0" local_net = ip_network(laddr_range, strict=False) remote_net = ip_network(raddr_range, strict=False) - cmd = u"ipsec_spd_entry_add_del_v2" + cmd = "ipsec_spd_entry_add_del_v2" spd_entry = dict( spd_id=int(spd_id), @@ -916,26 +984,37 @@ class IPsecUtil: local_address_stop=IPAddress.create_ip_address_object( local_net.broadcast_address ), - remote_port_start=int(rport_range.split(u"-")[0]) if rport_range - else 0, - remote_port_stop=int(rport_range.split(u"-")[1]) if rport_range - else 65535, - local_port_start=int(lport_range.split(u"-")[0]) if lport_range - else 0, - local_port_stop=int(lport_range.split(u"-")[1]) if rport_range - else 65535 - ) - args = dict( - is_add=True, - entry=spd_entry + remote_port_start=( + int(rport_range.split("-")[0]) if rport_range else 0 + ), + remote_port_stop=( + int(rport_range.split("-")[1]) if rport_range else 65535 + ), + local_port_start=( + int(lport_range.split("-")[0]) if lport_range else 0 + ), + local_port_stop=( + int(lport_range.split("-")[1]) if rport_range else 65535 + ), ) + args = dict(is_add=True, entry=spd_entry) executor.add(cmd, **args) @staticmethod def vpp_ipsec_add_spd_entry( - node, spd_id, priority, action, inbound=True, sa_id=None, - proto=None, laddr_range=None, raddr_range=None, lport_range=None, - rport_range=None, is_ipv6=False): + node, + spd_id, + priority, + action, + inbound=True, + sa_id=None, + proto=None, + laddr_range=None, + raddr_range=None, + lport_range=None, + rport_range=None, + is_ipv6=False, + ): """Create Security Policy Database entry on the VPP node. :param node: VPP node to add SPD entry on. @@ -971,20 +1050,43 @@ class IPsecUtil: :type rport_range: string :type is_ipv6: bool """ - err_msg = f"Failed to add entry to Security Policy Database " \ - f"{spd_id} on host {node[u'host']}" + err_msg = ( + f"Failed to add entry to Security Policy Database " + f"{spd_id} on host {node[u'host']}" + ) with PapiSocketExecutor(node, is_async=True) as papi_exec: IPsecUtil._vpp_ipsec_add_spd_entry_internal( - papi_exec, spd_id, priority, action, inbound, sa_id, proto, - laddr_range, raddr_range, lport_range, rport_range, is_ipv6 + papi_exec, + spd_id, + priority, + action, + inbound, + sa_id, + proto, + laddr_range, + raddr_range, + lport_range, + rport_range, + is_ipv6, ) papi_exec.get_replies(err_msg) @staticmethod def vpp_ipsec_add_spd_entries( - node, n_entries, spd_id, priority, action, inbound, sa_id=None, - proto=None, laddr_range=None, raddr_range=None, lport_range=None, - rport_range=None, is_ipv6=False): + node, + n_entries, + spd_id, + priority, + action, + inbound, + sa_id=None, + proto=None, + laddr_range=None, + raddr_range=None, + lport_range=None, + rport_range=None, + is_ipv6=False, + ): """Create multiple Security Policy Database entries on the VPP node. :param node: VPP node to add SPD entries on. @@ -1023,32 +1125,42 @@ class IPsecUtil: :type is_ipv6: bool """ if laddr_range is None: - laddr_range = u"::/0" if is_ipv6 else u"0.0.0.0/0" + laddr_range = "::/0" if is_ipv6 else "0.0.0.0/0" laddr_range = NetworkIncrement(ip_network(laddr_range), 0) if raddr_range is None: - raddr_range = u"::/0" if is_ipv6 else u"0.0.0.0/0" + raddr_range = "::/0" if is_ipv6 else "0.0.0.0/0" raddr_range = NetworkIncrement(ip_network(raddr_range), 0) lport_range_start = 0 lport_range_stop = 65535 if lport_range: - lport_range_start, lport_range_stop = lport_range.split('-') + lport_range_start, lport_range_stop = lport_range.split("-") rport_range_start = 0 rport_range_stop = 65535 if rport_range: - rport_range_start, rport_range_stop = rport_range.split('-') + rport_range_start, rport_range_stop = rport_range.split("-") - err_msg = f"Failed to add entry to Security Policy Database " \ - f"{spd_id} on host {node[u'host']}" + err_msg = ( + f"Failed to add entry to Security Policy Database " + f"{spd_id} on host {node[u'host']}" + ) with PapiSocketExecutor(node, is_async=True) as papi_exec: for _ in range(n_entries): IPsecUtil._vpp_ipsec_add_spd_entry_internal( - papi_exec, spd_id, next(priority), action, inbound, + papi_exec, + spd_id, + next(priority), + action, + inbound, next(sa_id) if sa_id is not None else sa_id, - proto, next(laddr_range), next(raddr_range), lport_range, - rport_range, is_ipv6 + proto, + next(laddr_range), + next(raddr_range), + lport_range, + rport_range, + is_ipv6, ) papi_exec.get_replies(err_msg) @@ -1069,59 +1181,63 @@ class IPsecUtil: :type if1_key: str :type if2_key: str """ - with PapiSocketExecutor(nodes[u"DUT1"]) as papi_exec: + with PapiSocketExecutor(nodes["DUT1"]) as papi_exec: # Create loopback interface on DUT1, set it to up state - cmd = u"create_loopback_instance" + cmd = "create_loopback_instance" args = dict( mac_address=0, is_specified=False, user_instance=0, ) - err_msg = f"Failed to create loopback interface " \ + err_msg = ( + f"Failed to create loopback interface " f"on host {nodes[u'DUT1'][u'host']}" + ) papi_exec.add(cmd, **args) loop_sw_if_idx = papi_exec.get_sw_if_index(err_msg) - cmd = u"sw_interface_set_flags" + cmd = "sw_interface_set_flags" args = dict( sw_if_index=loop_sw_if_idx, - flags=InterfaceStatusFlags.IF_STATUS_API_FLAG_ADMIN_UP.value + flags=InterfaceStatusFlags.IF_STATUS_API_FLAG_ADMIN_UP.value, ) - err_msg = f"Failed to set loopback interface state up " \ + err_msg = ( + f"Failed to set loopback interface state up " f"on host {nodes[u'DUT1'][u'host']}" + ) papi_exec.add(cmd, **args).get_reply(err_msg) # Set IP address on VPP node 1 interface - cmd = u"sw_interface_add_del_address" + cmd = "sw_interface_add_del_address" args = dict( sw_if_index=InterfaceUtil.get_interface_index( - nodes[u"DUT1"], if1_key + nodes["DUT1"], if1_key ), is_add=True, del_all=False, prefix=IPUtil.create_prefix_object( - tun_ips[u"ip2"] - 1, 96 if tun_ips[u"ip2"].version == 6 - else 24 - ) + tun_ips["ip2"] - 1, + 96 if tun_ips["ip2"].version == 6 else 24, + ), ) - err_msg = f"Failed to set IP address on interface {if1_key} " \ + err_msg = ( + f"Failed to set IP address on interface {if1_key} " f"on host {nodes[u'DUT1'][u'host']}" + ) papi_exec.add(cmd, **args).get_reply(err_msg) - cmd2 = u"ip_neighbor_add_del" + cmd2 = "ip_neighbor_add_del" args2 = dict( is_add=1, neighbor=dict( sw_if_index=Topology.get_interface_sw_index( - nodes[u"DUT1"], if1_key + nodes["DUT1"], if1_key ), flags=1, mac_address=str( - Topology.get_interface_mac(nodes[u"DUT2"], if2_key) - if u"DUT2" in nodes.keys() - else Topology.get_interface_mac( - nodes[u"TG"], if2_key - ) + Topology.get_interface_mac(nodes["DUT2"], if2_key) + if "DUT2" in nodes.keys() + else Topology.get_interface_mac(nodes["TG"], if2_key) ), - ip_address=tun_ips[u"ip2"].compressed - ) + ip_address=tun_ips["ip2"].compressed, + ), ) err_msg = f"Failed to add IP neighbor on interface {if1_key}" papi_exec.add(cmd2, **args2).get_reply(err_msg) @@ -1130,8 +1246,18 @@ class IPsecUtil: @staticmethod def _ipsec_create_tunnel_interfaces_dut1_papi( - nodes, tun_ips, if1_key, if2_key, n_tunnels, crypto_alg, integ_alg, - raddr_ip2, addr_incr, spi_d, existing_tunnels=0): + nodes, + tun_ips, + if1_key, + if2_key, + n_tunnels, + crypto_alg, + integ_alg, + raddr_ip2, + addr_incr, + spi_d, + existing_tunnels=0, + ): """Create multiple IPsec tunnel interfaces on DUT1 node using PAPI. Generate random keys and return them (so DUT2 or TG can decrypt). @@ -1172,27 +1298,27 @@ class IPsecUtil: ) else: loop_sw_if_idx = InterfaceUtil.vpp_get_interface_sw_index( - nodes[u"DUT1"], u"loop0" + nodes["DUT1"], "loop0" ) - with PapiSocketExecutor(nodes[u"DUT1"], is_async=True) as papi_exec: + with PapiSocketExecutor(nodes["DUT1"], is_async=True) as papi_exec: # Configure IP addresses on loop0 interface - cmd = u"sw_interface_add_del_address" + cmd = "sw_interface_add_del_address" args = dict( sw_if_index=loop_sw_if_idx, is_add=True, del_all=False, - prefix=None + prefix=None, ) for i in range(existing_tunnels, n_tunnels): - args[u"prefix"] = IPUtil.create_prefix_object( - tun_ips[u"ip1"] + i * addr_incr, - 128 if tun_ips[u"ip1"].version == 6 else 32 + args["prefix"] = IPUtil.create_prefix_object( + tun_ips["ip1"] + i * addr_incr, + 128 if tun_ips["ip1"].version == 6 else 32, ) papi_exec.add( cmd, history=bool(not 1 < i < n_tunnels - 2), **args ) # Configure IPIP tunnel interfaces - cmd = u"ipip_add_tunnel" + cmd = "ipip_add_tunnel" ipip_tunnel = dict( instance=Constants.BITWISE_NON_ZERO, src=None, @@ -1202,43 +1328,37 @@ class IPsecUtil: TunnelEncpaDecapFlags.TUNNEL_API_ENCAP_DECAP_FLAG_NONE ), mode=int(TunnelMode.TUNNEL_API_MODE_P2P), - dscp=int(IpDscp.IP_API_DSCP_CS0) - ) - args = dict( - tunnel=ipip_tunnel + dscp=int(IpDscp.IP_API_DSCP_CS0), ) + args = dict(tunnel=ipip_tunnel) ipip_tunnels = [None] * existing_tunnels for i in range(existing_tunnels, n_tunnels): - args[u"tunnel"][u"src"] = IPAddress.create_ip_address_object( - tun_ips[u"ip1"] + i * addr_incr + args["tunnel"]["src"] = IPAddress.create_ip_address_object( + tun_ips["ip1"] + i * addr_incr ) - args[u"tunnel"][u"dst"] = IPAddress.create_ip_address_object( - tun_ips[u"ip2"] + args["tunnel"]["dst"] = IPAddress.create_ip_address_object( + tun_ips["ip2"] ) papi_exec.add( cmd, history=bool(not 1 < i < n_tunnels - 2), **args ) - err_msg = f"Failed to add IPIP tunnel interfaces on host" \ + err_msg = ( + f"Failed to add IPIP tunnel interfaces on host" f" {nodes[u'DUT1'][u'host']}" + ) ipip_tunnels.extend( [ - reply[u"sw_if_index"] + reply["sw_if_index"] for reply in papi_exec.get_replies(err_msg) - if u"sw_if_index" in reply + if "sw_if_index" in reply ] ) # Configure IPSec SAD entries ckeys = [bytes()] * existing_tunnels ikeys = [bytes()] * existing_tunnels - cmd = u"ipsec_sad_entry_add_v2" - c_key = dict( - length=0, - data=None - ) - i_key = dict( - length=0, - data=None - ) + cmd = "ipsec_sad_entry_add_v2" + c_key = dict(length=0, data=None) + i_key = dict(length=0, data=None) sad_entry = dict( sad_id=None, spi=None, @@ -1271,118 +1391,125 @@ class IPsecUtil: gen_key(IPsecUtil.get_integ_alg_key_len(integ_alg)) ) # SAD entry for outband / tx path - args[u"entry"][u"sad_id"] = i - args[u"entry"][u"spi"] = spi_d[u"spi_1"] + i + args["entry"]["sad_id"] = i + args["entry"]["spi"] = spi_d["spi_1"] + i - args[u"entry"][u"crypto_key"][u"length"] = len(ckeys[i]) - args[u"entry"][u"crypto_key"][u"data"] = ckeys[i] + args["entry"]["crypto_key"]["length"] = len(ckeys[i]) + args["entry"]["crypto_key"]["data"] = ckeys[i] if integ_alg: - args[u"entry"][u"integrity_key"][u"length"] = len(ikeys[i]) - args[u"entry"][u"integrity_key"][u"data"] = ikeys[i] - args[u"entry"][u"flags"] = int( + args["entry"]["integrity_key"]["length"] = len(ikeys[i]) + args["entry"]["integrity_key"]["data"] = ikeys[i] + args["entry"]["flags"] = int( IPsecSadFlags.IPSEC_API_SAD_FLAG_NONE ) papi_exec.add( cmd, history=bool(not 1 < i < n_tunnels - 2), **args ) # SAD entry for inband / rx path - args[u"entry"][u"sad_id"] = 100000 + i - args[u"entry"][u"spi"] = spi_d[u"spi_2"] + i + args["entry"]["sad_id"] = 100000 + i + args["entry"]["spi"] = spi_d["spi_2"] + i - args[u"entry"][u"crypto_key"][u"length"] = len(ckeys[i]) - args[u"entry"][u"crypto_key"][u"data"] = ckeys[i] + args["entry"]["crypto_key"]["length"] = len(ckeys[i]) + args["entry"]["crypto_key"]["data"] = ckeys[i] if integ_alg: - args[u"entry"][u"integrity_key"][u"length"] = len(ikeys[i]) - args[u"entry"][u"integrity_key"][u"data"] = ikeys[i] - args[u"entry"][u"flags"] = int( - IPsecSadFlags.IPSEC_API_SAD_FLAG_NONE | - IPsecSadFlags.IPSEC_API_SAD_FLAG_IS_INBOUND + args["entry"]["integrity_key"]["length"] = len(ikeys[i]) + args["entry"]["integrity_key"]["data"] = ikeys[i] + args["entry"]["flags"] = int( + IPsecSadFlags.IPSEC_API_SAD_FLAG_NONE + | IPsecSadFlags.IPSEC_API_SAD_FLAG_IS_INBOUND ) papi_exec.add( cmd, history=bool(not 1 < i < n_tunnels - 2), **args ) - err_msg = f"Failed to add IPsec SAD entries on host" \ + err_msg = ( + f"Failed to add IPsec SAD entries on host" f" {nodes[u'DUT1'][u'host']}" + ) papi_exec.get_replies(err_msg) # Add protection for tunnels with IPSEC - cmd = u"ipsec_tunnel_protect_update" + cmd = "ipsec_tunnel_protect_update" n_hop = dict( address=0, via_label=MPLS_LABEL_INVALID, - obj_id=Constants.BITWISE_NON_ZERO + obj_id=Constants.BITWISE_NON_ZERO, ) ipsec_tunnel_protect = dict( - sw_if_index=None, - nh=n_hop, - sa_out=None, - n_sa_in=1, - sa_in=None - ) - args = dict( - tunnel=ipsec_tunnel_protect + sw_if_index=None, nh=n_hop, sa_out=None, n_sa_in=1, sa_in=None ) + args = dict(tunnel=ipsec_tunnel_protect) for i in range(existing_tunnels, n_tunnels): - args[u"tunnel"][u"sw_if_index"] = ipip_tunnels[i] - args[u"tunnel"][u"sa_out"] = i - args[u"tunnel"][u"sa_in"] = [100000 + i] + args["tunnel"]["sw_if_index"] = ipip_tunnels[i] + args["tunnel"]["sa_out"] = i + args["tunnel"]["sa_in"] = [100000 + i] papi_exec.add( cmd, history=bool(not 1 < i < n_tunnels - 2), **args ) - err_msg = f"Failed to add protection for tunnels with IPSEC " \ + err_msg = ( + f"Failed to add protection for tunnels with IPSEC " f"on host {nodes[u'DUT1'][u'host']}" + ) papi_exec.get_replies(err_msg) # Configure unnumbered interfaces - cmd = u"sw_interface_set_unnumbered" + cmd = "sw_interface_set_unnumbered" args = dict( is_add=True, sw_if_index=InterfaceUtil.get_interface_index( - nodes[u"DUT1"], if1_key + nodes["DUT1"], if1_key ), - unnumbered_sw_if_index=0 + unnumbered_sw_if_index=0, ) for i in range(existing_tunnels, n_tunnels): - args[u"unnumbered_sw_if_index"] = ipip_tunnels[i] + args["unnumbered_sw_if_index"] = ipip_tunnels[i] papi_exec.add( cmd, history=bool(not 1 < i < n_tunnels - 2), **args ) # Set interfaces up - cmd = u"sw_interface_set_flags" + cmd = "sw_interface_set_flags" args = dict( sw_if_index=0, - flags=InterfaceStatusFlags.IF_STATUS_API_FLAG_ADMIN_UP.value + flags=InterfaceStatusFlags.IF_STATUS_API_FLAG_ADMIN_UP.value, ) for i in range(existing_tunnels, n_tunnels): - args[u"sw_if_index"] = ipip_tunnels[i] + args["sw_if_index"] = ipip_tunnels[i] papi_exec.add( cmd, history=bool(not 1 < i < n_tunnels - 2), **args ) # Configure IP routes - cmd = u"ip_route_add_del" - args = dict( - is_add=1, - is_multipath=0, - route=None - ) + cmd = "ip_route_add_del" + args = dict(is_add=1, is_multipath=0, route=None) for i in range(existing_tunnels, n_tunnels): - args[u"route"] = IPUtil.compose_vpp_route_structure( - nodes[u"DUT1"], (raddr_ip2 + i).compressed, + args["route"] = IPUtil.compose_vpp_route_structure( + nodes["DUT1"], + (raddr_ip2 + i).compressed, prefix_len=128 if raddr_ip2.version == 6 else 32, - interface=ipip_tunnels[i] + interface=ipip_tunnels[i], ) papi_exec.add( cmd, history=bool(not 1 < i < n_tunnels - 2), **args ) - err_msg = f"Failed to add IP routes on host " \ - f"{nodes[u'DUT1'][u'host']}" + err_msg = ( + f"Failed to add IP routes on host " f"{nodes[u'DUT1'][u'host']}" + ) papi_exec.get_replies(err_msg) return ckeys, ikeys @staticmethod def _ipsec_create_tunnel_interfaces_dut2_papi( - nodes, tun_ips, if2_key, n_tunnels, crypto_alg, ckeys, integ_alg, - ikeys, raddr_ip1, addr_incr, spi_d, existing_tunnels=0): + nodes, + tun_ips, + if2_key, + n_tunnels, + crypto_alg, + ckeys, + integ_alg, + ikeys, + raddr_ip1, + addr_incr, + spi_d, + existing_tunnels=0, + ): """Create multiple IPsec tunnel interfaces on DUT2 node using PAPI. This method accesses keys generated by DUT1 method @@ -1415,26 +1542,28 @@ class IPsecUtil: :type spi_d: dict :type existing_tunnels: int """ - with PapiSocketExecutor(nodes[u"DUT2"], is_async=True) as papi_exec: + with PapiSocketExecutor(nodes["DUT2"], is_async=True) as papi_exec: if not existing_tunnels: # Set IP address on VPP node 2 interface - cmd = u"sw_interface_add_del_address" + cmd = "sw_interface_add_del_address" args = dict( sw_if_index=InterfaceUtil.get_interface_index( - nodes[u"DUT2"], if2_key + nodes["DUT2"], if2_key ), is_add=True, del_all=False, prefix=IPUtil.create_prefix_object( - tun_ips[u"ip2"], 96 if tun_ips[u"ip2"].version == 6 - else 24 - ) + tun_ips["ip2"], + 96 if tun_ips["ip2"].version == 6 else 24, + ), ) - err_msg = f"Failed to set IP address on interface {if2_key} " \ + err_msg = ( + f"Failed to set IP address on interface {if2_key} " f"on host {nodes[u'DUT2'][u'host']}" + ) papi_exec.add(cmd, **args).get_replies(err_msg) # Configure IPIP tunnel interfaces - cmd = u"ipip_add_tunnel" + cmd = "ipip_add_tunnel" ipip_tunnel = dict( instance=Constants.BITWISE_NON_ZERO, src=None, @@ -1444,41 +1573,35 @@ class IPsecUtil: TunnelEncpaDecapFlags.TUNNEL_API_ENCAP_DECAP_FLAG_NONE ), mode=int(TunnelMode.TUNNEL_API_MODE_P2P), - dscp=int(IpDscp.IP_API_DSCP_CS0) - ) - args = dict( - tunnel=ipip_tunnel + dscp=int(IpDscp.IP_API_DSCP_CS0), ) + args = dict(tunnel=ipip_tunnel) ipip_tunnels = [None] * existing_tunnels for i in range(existing_tunnels, n_tunnels): - args[u"tunnel"][u"src"] = IPAddress.create_ip_address_object( - tun_ips[u"ip2"] + args["tunnel"]["src"] = IPAddress.create_ip_address_object( + tun_ips["ip2"] ) - args[u"tunnel"][u"dst"] = IPAddress.create_ip_address_object( - tun_ips[u"ip1"] + i * addr_incr + args["tunnel"]["dst"] = IPAddress.create_ip_address_object( + tun_ips["ip1"] + i * addr_incr ) papi_exec.add( cmd, history=bool(not 1 < i < n_tunnels - 2), **args ) - err_msg = f"Failed to add IPIP tunnel interfaces on host" \ + err_msg = ( + f"Failed to add IPIP tunnel interfaces on host" f" {nodes[u'DUT2'][u'host']}" + ) ipip_tunnels.extend( [ - reply[u"sw_if_index"] + reply["sw_if_index"] for reply in papi_exec.get_replies(err_msg) - if u"sw_if_index" in reply + if "sw_if_index" in reply ] ) # Configure IPSec SAD entries - cmd = u"ipsec_sad_entry_add_v2" - c_key = dict( - length=0, - data=None - ) - i_key = dict( - length=0, - data=None - ) + cmd = "ipsec_sad_entry_add_v2" + c_key = dict(length=0, data=None) + i_key = dict(length=0, data=None) sad_entry = dict( sad_id=None, spi=None, @@ -1511,132 +1634,136 @@ class IPsecUtil: gen_key(IPsecUtil.get_integ_alg_key_len(integ_alg)) ) # SAD entry for outband / tx path - args[u"entry"][u"sad_id"] = 100000 + i - args[u"entry"][u"spi"] = spi_d[u"spi_2"] + i + args["entry"]["sad_id"] = 100000 + i + args["entry"]["spi"] = spi_d["spi_2"] + i - args[u"entry"][u"crypto_key"][u"length"] = len(ckeys[i]) - args[u"entry"][u"crypto_key"][u"data"] = ckeys[i] + args["entry"]["crypto_key"]["length"] = len(ckeys[i]) + args["entry"]["crypto_key"]["data"] = ckeys[i] if integ_alg: - args[u"entry"][u"integrity_key"][u"length"] = len(ikeys[i]) - args[u"entry"][u"integrity_key"][u"data"] = ikeys[i] - args[u"entry"][u"flags"] = int( + args["entry"]["integrity_key"]["length"] = len(ikeys[i]) + args["entry"]["integrity_key"]["data"] = ikeys[i] + args["entry"]["flags"] = int( IPsecSadFlags.IPSEC_API_SAD_FLAG_NONE ) papi_exec.add( cmd, history=bool(not 1 < i < n_tunnels - 2), **args ) # SAD entry for inband / rx path - args[u"entry"][u"sad_id"] = i - args[u"entry"][u"spi"] = spi_d[u"spi_1"] + i + args["entry"]["sad_id"] = i + args["entry"]["spi"] = spi_d["spi_1"] + i - args[u"entry"][u"crypto_key"][u"length"] = len(ckeys[i]) - args[u"entry"][u"crypto_key"][u"data"] = ckeys[i] + args["entry"]["crypto_key"]["length"] = len(ckeys[i]) + args["entry"]["crypto_key"]["data"] = ckeys[i] if integ_alg: - args[u"entry"][u"integrity_key"][u"length"] = len(ikeys[i]) - args[u"entry"][u"integrity_key"][u"data"] = ikeys[i] - args[u"entry"][u"flags"] = int( - IPsecSadFlags.IPSEC_API_SAD_FLAG_NONE | - IPsecSadFlags.IPSEC_API_SAD_FLAG_IS_INBOUND + args["entry"]["integrity_key"]["length"] = len(ikeys[i]) + args["entry"]["integrity_key"]["data"] = ikeys[i] + args["entry"]["flags"] = int( + IPsecSadFlags.IPSEC_API_SAD_FLAG_NONE + | IPsecSadFlags.IPSEC_API_SAD_FLAG_IS_INBOUND ) papi_exec.add( cmd, history=bool(not 1 < i < n_tunnels - 2), **args ) - err_msg = f"Failed to add IPsec SAD entries on host" \ + err_msg = ( + f"Failed to add IPsec SAD entries on host" f" {nodes[u'DUT2'][u'host']}" + ) papi_exec.get_replies(err_msg) # Add protection for tunnels with IPSEC - cmd = u"ipsec_tunnel_protect_update" + cmd = "ipsec_tunnel_protect_update" n_hop = dict( address=0, via_label=MPLS_LABEL_INVALID, - obj_id=Constants.BITWISE_NON_ZERO + obj_id=Constants.BITWISE_NON_ZERO, ) ipsec_tunnel_protect = dict( - sw_if_index=None, - nh=n_hop, - sa_out=None, - n_sa_in=1, - sa_in=None - ) - args = dict( - tunnel=ipsec_tunnel_protect + sw_if_index=None, nh=n_hop, sa_out=None, n_sa_in=1, sa_in=None ) + args = dict(tunnel=ipsec_tunnel_protect) for i in range(existing_tunnels, n_tunnels): - args[u"tunnel"][u"sw_if_index"] = ipip_tunnels[i] - args[u"tunnel"][u"sa_out"] = 100000 + i - args[u"tunnel"][u"sa_in"] = [i] + args["tunnel"]["sw_if_index"] = ipip_tunnels[i] + args["tunnel"]["sa_out"] = 100000 + i + args["tunnel"]["sa_in"] = [i] papi_exec.add( cmd, history=bool(not 1 < i < n_tunnels - 2), **args ) - err_msg = f"Failed to add protection for tunnels with IPSEC " \ + err_msg = ( + f"Failed to add protection for tunnels with IPSEC " f"on host {nodes[u'DUT2'][u'host']}" + ) papi_exec.get_replies(err_msg) if not existing_tunnels: # Configure IP route - cmd = u"ip_route_add_del" + cmd = "ip_route_add_del" route = IPUtil.compose_vpp_route_structure( - nodes[u"DUT2"], tun_ips[u"ip1"].compressed, - prefix_len=32 if tun_ips[u"ip1"].version == 6 else 8, + nodes["DUT2"], + tun_ips["ip1"].compressed, + prefix_len=32 if tun_ips["ip1"].version == 6 else 8, interface=if2_key, - gateway=(tun_ips[u"ip2"] - 1).compressed - ) - args = dict( - is_add=1, - is_multipath=0, - route=route + gateway=(tun_ips["ip2"] - 1).compressed, ) + args = dict(is_add=1, is_multipath=0, route=route) papi_exec.add(cmd, **args) # Configure unnumbered interfaces - cmd = u"sw_interface_set_unnumbered" + cmd = "sw_interface_set_unnumbered" args = dict( is_add=True, sw_if_index=InterfaceUtil.get_interface_index( - nodes[u"DUT2"], if2_key + nodes["DUT2"], if2_key ), - unnumbered_sw_if_index=0 + unnumbered_sw_if_index=0, ) for i in range(existing_tunnels, n_tunnels): - args[u"unnumbered_sw_if_index"] = ipip_tunnels[i] + args["unnumbered_sw_if_index"] = ipip_tunnels[i] papi_exec.add( cmd, history=bool(not 1 < i < n_tunnels - 2), **args ) # Set interfaces up - cmd = u"sw_interface_set_flags" + cmd = "sw_interface_set_flags" args = dict( sw_if_index=0, - flags=InterfaceStatusFlags.IF_STATUS_API_FLAG_ADMIN_UP.value + flags=InterfaceStatusFlags.IF_STATUS_API_FLAG_ADMIN_UP.value, ) for i in range(existing_tunnels, n_tunnels): - args[u"sw_if_index"] = ipip_tunnels[i] + args["sw_if_index"] = ipip_tunnels[i] papi_exec.add( cmd, history=bool(not 1 < i < n_tunnels - 2), **args ) # Configure IP routes - cmd = u"ip_route_add_del" - args = dict( - is_add=1, - is_multipath=0, - route=None - ) + cmd = "ip_route_add_del" + args = dict(is_add=1, is_multipath=0, route=None) for i in range(existing_tunnels, n_tunnels): - args[u"route"] = IPUtil.compose_vpp_route_structure( - nodes[u"DUT1"], (raddr_ip1 + i).compressed, + args["route"] = IPUtil.compose_vpp_route_structure( + nodes["DUT1"], + (raddr_ip1 + i).compressed, prefix_len=128 if raddr_ip1.version == 6 else 32, - interface=ipip_tunnels[i] + interface=ipip_tunnels[i], ) papi_exec.add( cmd, history=bool(not 1 < i < n_tunnels - 2), **args ) - err_msg = f"Failed to add IP routes " \ - f"on host {nodes[u'DUT2'][u'host']}" + err_msg = ( + f"Failed to add IP routes " f"on host {nodes[u'DUT2'][u'host']}" + ) papi_exec.get_replies(err_msg) @staticmethod def vpp_ipsec_create_tunnel_interfaces( - nodes, tun_if1_ip_addr, tun_if2_ip_addr, if1_key, if2_key, - n_tunnels, crypto_alg, integ_alg, raddr_ip1, raddr_ip2, raddr_range, - existing_tunnels=0, return_keys=False): + nodes, + tun_if1_ip_addr, + tun_if2_ip_addr, + if1_key, + if2_key, + n_tunnels, + crypto_alg, + integ_alg, + raddr_ip1, + raddr_ip2, + raddr_range, + existing_tunnels=0, + return_keys=False, + ): """Create multiple IPsec tunnel interfaces between two VPP nodes. Some deployments (e.g. devicetest) need to know the generated keys. @@ -1682,32 +1809,49 @@ class IPsecUtil: """ n_tunnels = int(n_tunnels) existing_tunnels = int(existing_tunnels) - spi_d = dict( - spi_1=100000, - spi_2=200000 - ) + spi_d = dict(spi_1=100000, spi_2=200000) tun_ips = dict( - ip1=ip_address(tun_if1_ip_addr), - ip2=ip_address(tun_if2_ip_addr) + ip1=ip_address(tun_if1_ip_addr), ip2=ip_address(tun_if2_ip_addr) ) raddr_ip1 = ip_address(raddr_ip1) raddr_ip2 = ip_address(raddr_ip2) - addr_incr = 1 << (128 - raddr_range) if tun_ips[u"ip1"].version == 6 \ + addr_incr = ( + 1 << (128 - raddr_range) + if tun_ips["ip1"].version == 6 else 1 << (32 - raddr_range) + ) ckeys, ikeys = IPsecUtil._ipsec_create_tunnel_interfaces_dut1_papi( - nodes, tun_ips, if1_key, if2_key, n_tunnels, crypto_alg, - integ_alg, raddr_ip2, addr_incr, spi_d, existing_tunnels + nodes, + tun_ips, + if1_key, + if2_key, + n_tunnels, + crypto_alg, + integ_alg, + raddr_ip2, + addr_incr, + spi_d, + existing_tunnels, ) - if u"DUT2" in nodes.keys(): + if "DUT2" in nodes.keys(): IPsecUtil._ipsec_create_tunnel_interfaces_dut2_papi( - nodes, tun_ips, if2_key, n_tunnels, crypto_alg, ckeys, - integ_alg, ikeys, raddr_ip1, addr_incr, spi_d, - existing_tunnels + nodes, + tun_ips, + if2_key, + n_tunnels, + crypto_alg, + ckeys, + integ_alg, + ikeys, + raddr_ip1, + addr_incr, + spi_d, + existing_tunnels, ) if return_keys: - return ckeys, ikeys, spi_d[u"spi_1"], spi_d[u"spi_2"] + return ckeys, ikeys, spi_d["spi_1"], spi_d["spi_2"] return None @staticmethod @@ -1724,12 +1868,11 @@ class IPsecUtil: script_filename = ( f"/tmp/ipsec_create_tunnel_cnf_{dut}_{cnf + 1}.config" ) - scripts.append(open(script_filename, 'w')) + scripts.append(open(script_filename, "w")) return scripts @staticmethod - def _close_and_copy_ipsec_script_files( - dut, nodes, instances, scripts): + def _close_and_copy_ipsec_script_files(dut, nodes, instances, scripts): """Close created scripts and copy them to containers :param dut: DUT node on which to create the script files @@ -1748,11 +1891,19 @@ class IPsecUtil: ) scp_node(nodes[dut], script_filename, script_filename) - @staticmethod def vpp_ipsec_create_tunnel_interfaces_in_containers( - nodes, if1_ip_addr, if2_ip_addr, n_tunnels, crypto_alg, integ_alg, - raddr_ip1, raddr_ip2, raddr_range, n_instances): + nodes, + if1_ip_addr, + if2_ip_addr, + n_tunnels, + crypto_alg, + integ_alg, + raddr_ip1, + raddr_ip2, + raddr_range, + n_instances, + ): """Create multiple IPsec tunnel interfaces between two VPP nodes. :param nodes: VPP nodes to create tunnel interfaces. @@ -1783,17 +1934,12 @@ class IPsecUtil: spi_2 = 200000 addr_incr = 1 << (32 - raddr_range) - dut1_scripts = IPsecUtil._create_ipsec_script_files( - u"DUT1", n_instances - ) - dut2_scripts = IPsecUtil._create_ipsec_script_files( - u"DUT2", n_instances - ) + dut1_scripts = IPsecUtil._create_ipsec_script_files("DUT1", n_instances) + dut2_scripts = IPsecUtil._create_ipsec_script_files("DUT2", n_instances) for cnf in range(0, n_instances): dut1_scripts[cnf].write( - u"create loopback interface\n" - u"set interface state loop0 up\n\n" + "create loopback interface\n" "set interface state loop0 up\n\n" ) dut2_scripts[cnf].write( f"ip route add {if1_ip_addr}/8 via " @@ -1803,11 +1949,11 @@ class IPsecUtil: for tnl in range(0, n_tunnels): cnf = tnl % n_instances ckey = getattr( - gen_key(IPsecUtil.get_crypto_alg_key_len(crypto_alg)), u"hex" + gen_key(IPsecUtil.get_crypto_alg_key_len(crypto_alg)), "hex" ) - integ = u"" + integ = "" ikey = getattr( - gen_key(IPsecUtil.get_integ_alg_key_len(integ_alg)), u"hex" + gen_key(IPsecUtil.get_integ_alg_key_len(integ_alg)), "hex" ) if integ_alg: integ = ( @@ -1817,7 +1963,7 @@ class IPsecUtil: ) # Configure tunnel end point(s) on left side dut1_scripts[cnf].write( - u"set interface ip address loop0 " + "set interface ip address loop0 " f"{ip_address(if1_ip_addr) + tnl * addr_incr}/32\n" f"create ipsec tunnel " f"local-ip {ip_address(if1_ip_addr) + tnl * addr_incr} " @@ -1858,15 +2004,27 @@ class IPsecUtil: ) IPsecUtil._close_and_copy_ipsec_script_files( - u"DUT1", nodes, n_instances, dut1_scripts) + "DUT1", nodes, n_instances, dut1_scripts + ) IPsecUtil._close_and_copy_ipsec_script_files( - u"DUT2", nodes, n_instances, dut2_scripts) + "DUT2", nodes, n_instances, dut2_scripts + ) @staticmethod def vpp_ipsec_add_multiple_tunnels( - nodes, interface1, interface2, n_tunnels, crypto_alg, integ_alg, - tunnel_ip1, tunnel_ip2, raddr_ip1, raddr_ip2, raddr_range, - tunnel_addr_incr=True): + nodes, + interface1, + interface2, + n_tunnels, + crypto_alg, + integ_alg, + tunnel_ip1, + tunnel_ip2, + raddr_ip1, + raddr_ip2, + raddr_range, + tunnel_addr_incr=True, + ): """Create multiple IPsec tunnels between two VPP nodes. :param nodes: VPP nodes to create tunnels. @@ -1909,117 +2067,204 @@ class IPsecUtil: crypto_key = gen_key( IPsecUtil.get_crypto_alg_key_len(crypto_alg) ).decode() - integ_key = gen_key( - IPsecUtil.get_integ_alg_key_len(integ_alg) - ).decode() if integ_alg else u"" + integ_key = ( + gen_key(IPsecUtil.get_integ_alg_key_len(integ_alg)).decode() + if integ_alg + else "" + ) - rmac = Topology.get_interface_mac(nodes[u"DUT2"], interface2) \ - if u"DUT2" in nodes.keys() \ - else Topology.get_interface_mac(nodes[u"TG"], interface2) + rmac = ( + Topology.get_interface_mac(nodes["DUT2"], interface2) + if "DUT2" in nodes.keys() + else Topology.get_interface_mac(nodes["TG"], interface2) + ) IPsecUtil.vpp_ipsec_set_ip_route( - nodes[u"DUT1"], n_tunnels, tunnel_ip1, raddr_ip2, tunnel_ip2, - interface1, raddr_range, rmac) + nodes["DUT1"], + n_tunnels, + tunnel_ip1, + raddr_ip2, + tunnel_ip2, + interface1, + raddr_range, + rmac, + ) - IPsecUtil.vpp_ipsec_add_spd(nodes[u"DUT1"], spd_id) - IPsecUtil.vpp_ipsec_spd_add_if(nodes[u"DUT1"], spd_id, interface1) + IPsecUtil.vpp_ipsec_add_spd(nodes["DUT1"], spd_id) + IPsecUtil.vpp_ipsec_spd_add_if(nodes["DUT1"], spd_id, interface1) - addr_incr = 1 << (128 - 96) if ip_address(tunnel_ip1).version == 6 \ + addr_incr = ( + 1 << (128 - 96) + if ip_address(tunnel_ip1).version == 6 else 1 << (32 - 24) - for i in range(n_tunnels//(addr_incr**2)+1): - dut1_local_outbound_range = \ - ip_network(f"{ip_address(tunnel_ip1) + i*(addr_incr**3)}/8", - False).with_prefixlen - dut1_remote_outbound_range = \ - ip_network(f"{ip_address(tunnel_ip2) + i*(addr_incr**3)}/8", - False).with_prefixlen + ) + for i in range(n_tunnels // (addr_incr**2) + 1): + dut1_local_outbound_range = ip_network( + f"{ip_address(tunnel_ip1) + i*(addr_incr**3)}/8", False + ).with_prefixlen + dut1_remote_outbound_range = ip_network( + f"{ip_address(tunnel_ip2) + i*(addr_incr**3)}/8", False + ).with_prefixlen IPsecUtil.vpp_ipsec_add_spd_entry( - nodes[u"DUT1"], spd_id, p_hi, PolicyAction.BYPASS, inbound=False, - proto=50, laddr_range=dut1_local_outbound_range, - raddr_range=dut1_remote_outbound_range + nodes["DUT1"], + spd_id, + p_hi, + PolicyAction.BYPASS, + inbound=False, + proto=50, + laddr_range=dut1_local_outbound_range, + raddr_range=dut1_remote_outbound_range, ) IPsecUtil.vpp_ipsec_add_spd_entry( - nodes[u"DUT1"], spd_id, p_hi, PolicyAction.BYPASS, inbound=True, - proto=50, laddr_range=dut1_remote_outbound_range, - raddr_range=dut1_local_outbound_range + nodes["DUT1"], + spd_id, + p_hi, + PolicyAction.BYPASS, + inbound=True, + proto=50, + laddr_range=dut1_remote_outbound_range, + raddr_range=dut1_local_outbound_range, ) IPsecUtil.vpp_ipsec_add_sad_entries( - nodes[u"DUT1"], n_tunnels, sa_id_1, spi_1, crypto_alg, crypto_key, - integ_alg, integ_key, tunnel_ip1, tunnel_ip2, tunnel_addr_incr + nodes["DUT1"], + n_tunnels, + sa_id_1, + spi_1, + crypto_alg, + crypto_key, + integ_alg, + integ_key, + tunnel_ip1, + tunnel_ip2, + tunnel_addr_incr, ) IPsecUtil.vpp_ipsec_add_spd_entries( - nodes[u"DUT1"], n_tunnels, spd_id, priority=ObjIncrement(p_lo, 0), - action=PolicyAction.PROTECT, inbound=False, + nodes["DUT1"], + n_tunnels, + spd_id, + priority=ObjIncrement(p_lo, 0), + action=PolicyAction.PROTECT, + inbound=False, sa_id=ObjIncrement(sa_id_1, 1), - raddr_range=NetworkIncrement(ip_network(raddr_ip2)) + raddr_range=NetworkIncrement(ip_network(raddr_ip2)), ) IPsecUtil.vpp_ipsec_add_sad_entries( - nodes[u"DUT1"], n_tunnels, sa_id_2, spi_2, crypto_alg, crypto_key, - integ_alg, integ_key, tunnel_ip2, tunnel_ip1, tunnel_addr_incr + nodes["DUT1"], + n_tunnels, + sa_id_2, + spi_2, + crypto_alg, + crypto_key, + integ_alg, + integ_key, + tunnel_ip2, + tunnel_ip1, + tunnel_addr_incr, ) IPsecUtil.vpp_ipsec_add_spd_entries( - nodes[u"DUT1"], n_tunnels, spd_id, priority=ObjIncrement(p_lo, 0), - action=PolicyAction.PROTECT, inbound=True, + nodes["DUT1"], + n_tunnels, + spd_id, + priority=ObjIncrement(p_lo, 0), + action=PolicyAction.PROTECT, + inbound=True, sa_id=ObjIncrement(sa_id_2, 1), - raddr_range=NetworkIncrement(ip_network(raddr_ip1)) + raddr_range=NetworkIncrement(ip_network(raddr_ip1)), ) - if u"DUT2" in nodes.keys(): - rmac = Topology.get_interface_mac(nodes[u"DUT1"], interface1) + if "DUT2" in nodes.keys(): + rmac = Topology.get_interface_mac(nodes["DUT1"], interface1) IPsecUtil.vpp_ipsec_set_ip_route( - nodes[u"DUT2"], n_tunnels, tunnel_ip2, raddr_ip1, tunnel_ip1, - interface2, raddr_range, rmac) - - IPsecUtil.vpp_ipsec_add_spd(nodes[u"DUT2"], spd_id) - IPsecUtil.vpp_ipsec_spd_add_if(nodes[u"DUT2"], spd_id, interface2) - for i in range(n_tunnels//(addr_incr**2)+1): - dut2_local_outbound_range = \ - ip_network(f"{ip_address(tunnel_ip1) + i*(addr_incr**3)}/8", - False).with_prefixlen - dut2_remote_outbound_range = \ - ip_network(f"{ip_address(tunnel_ip2) + i*(addr_incr**3)}/8", - False).with_prefixlen + nodes["DUT2"], + n_tunnels, + tunnel_ip2, + raddr_ip1, + tunnel_ip1, + interface2, + raddr_range, + rmac, + ) + + IPsecUtil.vpp_ipsec_add_spd(nodes["DUT2"], spd_id) + IPsecUtil.vpp_ipsec_spd_add_if(nodes["DUT2"], spd_id, interface2) + for i in range(n_tunnels // (addr_incr**2) + 1): + dut2_local_outbound_range = ip_network( + f"{ip_address(tunnel_ip1) + i*(addr_incr**3)}/8", False + ).with_prefixlen + dut2_remote_outbound_range = ip_network( + f"{ip_address(tunnel_ip2) + i*(addr_incr**3)}/8", False + ).with_prefixlen IPsecUtil.vpp_ipsec_add_spd_entry( - nodes[u"DUT2"], spd_id, p_hi, PolicyAction.BYPASS, - inbound=False, proto=50, + nodes["DUT2"], + spd_id, + p_hi, + PolicyAction.BYPASS, + inbound=False, + proto=50, laddr_range=dut2_remote_outbound_range, - raddr_range=dut2_local_outbound_range + raddr_range=dut2_local_outbound_range, ) IPsecUtil.vpp_ipsec_add_spd_entry( - nodes[u"DUT2"], spd_id, p_hi, PolicyAction.BYPASS, - inbound=True, proto=50, + nodes["DUT2"], + spd_id, + p_hi, + PolicyAction.BYPASS, + inbound=True, + proto=50, laddr_range=dut2_local_outbound_range, - raddr_range=dut2_remote_outbound_range + raddr_range=dut2_remote_outbound_range, ) IPsecUtil.vpp_ipsec_add_sad_entries( - nodes[u"DUT2"], n_tunnels, sa_id_1, spi_1, crypto_alg, - crypto_key, integ_alg, integ_key, tunnel_ip1, tunnel_ip2, - tunnel_addr_incr + nodes["DUT2"], + n_tunnels, + sa_id_1, + spi_1, + crypto_alg, + crypto_key, + integ_alg, + integ_key, + tunnel_ip1, + tunnel_ip2, + tunnel_addr_incr, ) IPsecUtil.vpp_ipsec_add_spd_entries( - nodes[u"DUT2"], n_tunnels, spd_id, + nodes["DUT2"], + n_tunnels, + spd_id, priority=ObjIncrement(p_lo, 0), - action=PolicyAction.PROTECT, inbound=True, + action=PolicyAction.PROTECT, + inbound=True, sa_id=ObjIncrement(sa_id_1, 1), - raddr_range=NetworkIncrement(ip_network(raddr_ip2)) + raddr_range=NetworkIncrement(ip_network(raddr_ip2)), ) IPsecUtil.vpp_ipsec_add_sad_entries( - nodes[u"DUT2"], n_tunnels, sa_id_2, spi_2, crypto_alg, - crypto_key, integ_alg, integ_key, tunnel_ip2, tunnel_ip1, - tunnel_addr_incr + nodes["DUT2"], + n_tunnels, + sa_id_2, + spi_2, + crypto_alg, + crypto_key, + integ_alg, + integ_key, + tunnel_ip2, + tunnel_ip1, + tunnel_addr_incr, ) IPsecUtil.vpp_ipsec_add_spd_entries( - nodes[u"DUT2"], n_tunnels, spd_id, + nodes["DUT2"], + n_tunnels, + spd_id, priority=ObjIncrement(p_lo, 0), - action=PolicyAction.PROTECT, inbound=False, + action=PolicyAction.PROTECT, + inbound=False, sa_id=ObjIncrement(sa_id_2, 1), - raddr_range=NetworkIncrement(ip_network(raddr_ip1)) + raddr_range=NetworkIncrement(ip_network(raddr_ip1)), ) @staticmethod @@ -2029,7 +2274,7 @@ class IPsecUtil: :param node: Node to run command on. :type node: dict """ - PapiSocketExecutor.run_cli_cmd(node, u"show ipsec all") + PapiSocketExecutor.run_cli_cmd(node, "show ipsec all") @staticmethod def show_ipsec_security_association(node): @@ -2057,8 +2302,10 @@ class IPsecUtil: :returns: flow_index. """ # TODO: to be fixed to use full PAPI when it is ready in VPP - cmd = f"test flow add src-ip any proto {proto} rss function " \ + cmd = ( + f"test flow add src-ip any proto {proto} rss function " f"{function} rss types {type}" + ) stdout = PapiSocketExecutor.run_cli_cmd(node, cmd) flow_index = stdout.split()[1] @@ -2066,7 +2313,8 @@ class IPsecUtil: @staticmethod def vpp_create_ipsec_flows_on_dut( - node, n_flows, rx_queues, spi_start, interface): + node, n_flows, rx_queues, spi_start, interface + ): """Create mutiple ipsec flows and enable flows onto interface. :param node: DUT node. @@ -2084,8 +2332,9 @@ class IPsecUtil: """ for i in range(0, n_flows): - rx_queue = i%rx_queues + rx_queue = i % rx_queues spi = spi_start + i flow_index = FlowUtil.vpp_create_ip4_ipsec_flow( - node, "ESP", spi, "redirect-to-queue", value=rx_queue) + node, "ESP", spi, "redirect-to-queue", value=rx_queue + ) FlowUtil.vpp_flow_enable(node, interface, flow_index) |