aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_ikev2.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_ikev2.py')
-rw-r--r--test/test_ikev2.py2081
1 files changed, 1307 insertions, 774 deletions
diff --git a/test/test_ikev2.py b/test/test_ikev2.py
index 175fa0ab9be..fd065b47c98 100644
--- a/test/test_ikev2.py
+++ b/test/test_ikev2.py
@@ -1,4 +1,5 @@
import os
+import socket
import time
from socket import inet_pton
from cryptography import x509
@@ -13,13 +14,21 @@ from cryptography.hazmat.primitives.ciphers import (
)
from ipaddress import IPv4Address, IPv6Address, ip_address
import unittest
+from config import config
from scapy.layers.ipsec import ESP
from scapy.layers.inet import IP, UDP, Ether
from scapy.layers.inet6 import IPv6
from scapy.packet import raw, Raw
from scapy.utils import long_converter
-from framework import tag_fixme_vpp_workers
-from framework import VppTestCase, VppTestRunner
+from framework import VppTestCase
+from asfframework import (
+ tag_fixme_vpp_workers,
+ tag_fixme_ubuntu2204,
+ tag_fixme_debian11,
+ is_distro_ubuntu2204,
+ is_distro_debian11,
+ VppTestRunner,
+)
from vpp_ikev2 import Profile, IDType, AuthMethod
from vpp_papi import VppEnum
@@ -37,7 +46,9 @@ GCM_IV_SIZE = 8
# defined in rfc3526
# tuple structure is (p, g, key_len)
DH = {
- '2048MODPgr': (long_converter("""
+ "2048MODPgr": (
+ long_converter(
+ """
FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
@@ -48,9 +59,14 @@ DH = {
670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B
E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9
DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510
- 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256),
-
- '3072MODPgr': (long_converter("""
+ 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""
+ ),
+ 2,
+ 256,
+ ),
+ "3072MODPgr": (
+ long_converter(
+ """
FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1
29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD
EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245
@@ -66,7 +82,11 @@ DH = {
ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B
F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C
BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31
- 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384)
+ 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""
+ ),
+ 2,
+ 384,
+ ),
}
@@ -78,7 +98,7 @@ class CryptoAlgo(object):
if self.cipher is not None:
self.bs = self.cipher.block_size // 8
- if self.name == 'AES-GCM-16ICV':
+ if self.name == "AES-GCM-16ICV":
self.iv_len = GCM_IV_SIZE
else:
self.iv_len = self.bs
@@ -86,14 +106,16 @@ class CryptoAlgo(object):
def encrypt(self, data, key, aad=None):
iv = os.urandom(self.iv_len)
if aad is None:
- encryptor = Cipher(self.cipher(key), self.mode(iv),
- default_backend()).encryptor()
+ encryptor = Cipher(
+ self.cipher(key), self.mode(iv), default_backend()
+ ).encryptor()
return iv + encryptor.update(data) + encryptor.finalize()
else:
salt = key[-SALT_SIZE:]
nonce = salt + iv
- encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
- default_backend()).encryptor()
+ encryptor = Cipher(
+ self.cipher(key[:-SALT_SIZE]), self.mode(nonce), default_backend()
+ ).encryptor()
encryptor.authenticate_additional_data(aad)
data = encryptor.update(data) + encryptor.finalize()
data += encryptor.tag[:GCM_ICV_SIZE]
@@ -101,26 +123,26 @@ class CryptoAlgo(object):
def decrypt(self, data, key, aad=None, icv=None):
if aad is None:
- iv = data[:self.iv_len]
- ct = data[self.iv_len:]
- decryptor = Cipher(algorithms.AES(key),
- self.mode(iv),
- default_backend()).decryptor()
+ iv = data[: self.iv_len]
+ ct = data[self.iv_len :]
+ decryptor = Cipher(
+ algorithms.AES(key), self.mode(iv), default_backend()
+ ).decryptor()
return decryptor.update(ct) + decryptor.finalize()
else:
salt = key[-SALT_SIZE:]
nonce = salt + data[:GCM_IV_SIZE]
ct = data[GCM_IV_SIZE:]
key = key[:-SALT_SIZE]
- decryptor = Cipher(algorithms.AES(key),
- self.mode(nonce, icv, len(icv)),
- default_backend()).decryptor()
+ decryptor = Cipher(
+ algorithms.AES(key), self.mode(nonce, icv, len(icv)), default_backend()
+ ).decryptor()
decryptor.authenticate_additional_data(aad)
return decryptor.update(ct) + decryptor.finalize()
def pad(self, data):
pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
- data = data + b'\x00' * (pad_len - 1)
+ data = data + b"\x00" * (pad_len - 1)
return data + bytes([pad_len - 1])
@@ -134,36 +156,34 @@ class AuthAlgo(object):
CRYPTO_ALGOS = {
- 'NULL': CryptoAlgo('NULL', cipher=None, mode=None),
- 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC),
- 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES,
- mode=modes.GCM),
+ "NULL": CryptoAlgo("NULL", cipher=None, mode=None),
+ "AES-CBC": CryptoAlgo("AES-CBC", cipher=algorithms.AES, mode=modes.CBC),
+ "AES-GCM-16ICV": CryptoAlgo("AES-GCM-16ICV", cipher=algorithms.AES, mode=modes.GCM),
}
AUTH_ALGOS = {
- 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
- 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12),
- 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16),
- 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24),
- 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32),
+ "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
+ "HMAC-SHA1-96": AuthAlgo("HMAC-SHA1-96", hmac.HMAC, hashes.SHA1, 20, 12),
+ "SHA2-256-128": AuthAlgo("SHA2-256-128", hmac.HMAC, hashes.SHA256, 32, 16),
+ "SHA2-384-192": AuthAlgo("SHA2-384-192", hmac.HMAC, hashes.SHA256, 48, 24),
+ "SHA2-512-256": AuthAlgo("SHA2-512-256", hmac.HMAC, hashes.SHA256, 64, 32),
}
PRF_ALGOS = {
- 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0),
- 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC,
- hashes.SHA256, 32),
+ "NULL": AuthAlgo("NULL", mac=None, mod=None, key_len=0, trunc_len=0),
+ "PRF_HMAC_SHA2_256": AuthAlgo("PRF_HMAC_SHA2_256", hmac.HMAC, hashes.SHA256, 32),
}
CRYPTO_IDS = {
- 12: 'AES-CBC',
- 20: 'AES-GCM-16ICV',
+ 12: "AES-CBC",
+ 20: "AES-GCM-16ICV",
}
INTEG_IDS = {
- 2: 'HMAC-SHA1-96',
- 12: 'SHA2-256-128',
- 13: 'SHA2-384-192',
- 14: 'SHA2-512-256',
+ 2: "HMAC-SHA1-96",
+ 12: "SHA2-256-128",
+ 13: "SHA2-384-192",
+ 14: "SHA2-512-256",
}
@@ -181,11 +201,24 @@ class IKEv2ChildSA(object):
class IKEv2SA(object):
- def __init__(self, test, is_initiator=True, i_id=None, r_id=None,
- spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn',
- nonce=None, auth_data=None, local_ts=None, remote_ts=None,
- auth_method='shared-key', priv_key=None, i_natt=False,
- r_natt=False, udp_encap=False):
+ def __init__(
+ self,
+ test,
+ is_initiator=True,
+ i_id=None,
+ r_id=None,
+ spi=b"\x01\x02\x03\x04\x05\x06\x07\x08",
+ id_type="fqdn",
+ nonce=None,
+ auth_data=None,
+ local_ts=None,
+ remote_ts=None,
+ auth_method="shared-key",
+ priv_key=None,
+ i_natt=False,
+ r_natt=False,
+ udp_encap=False,
+ ):
self.udp_encap = udp_encap
self.i_natt = i_natt
self.r_natt = r_natt
@@ -210,15 +243,14 @@ class IKEv2SA(object):
self.id_type = id_type
self.auth_method = auth_method
if self.is_initiator:
- self.rspi = 8 * b'\x00'
+ self.rspi = 8 * b"\x00"
self.ispi = spi
self.i_nonce = nonce
else:
self.rspi = spi
- self.ispi = 8 * b'\x00'
+ self.ispi = 8 * b"\x00"
self.r_nonce = nonce
- self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
- self.is_initiator)]
+ self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, self.is_initiator)]
def new_msg_id(self):
self.msg_id += 1
@@ -244,13 +276,14 @@ class IKEv2SA(object):
priv = self.dh_private_key
peer = self.peer_dh_pub_key
p, g, l = self.ike_group
- return pow(int.from_bytes(peer, 'big'),
- int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
+ return pow(
+ int.from_bytes(peer, "big"), int.from_bytes(priv, "big"), p
+ ).to_bytes(l, "big")
def generate_dh_data(self):
# generate DH keys
if self.ike_dh not in DH:
- raise NotImplementedError('%s not in DH group' % self.ike_dh)
+ raise NotImplementedError("%s not in DH group" % self.ike_dh)
if self.dh_params is None:
dhg = DH[self.ike_dh]
@@ -260,61 +293,61 @@ class IKEv2SA(object):
priv = self.dh_params.generate_private_key()
pub = priv.public_key()
x = priv.private_numbers().x
- self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
+ self.dh_private_key = x.to_bytes(priv.key_size // 8, "big")
y = pub.public_numbers().y
if self.is_initiator:
- self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
+ self.i_dh_data = y.to_bytes(pub.key_size // 8, "big")
else:
- self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
+ self.r_dh_data = y.to_bytes(pub.key_size // 8, "big")
def complete_dh_data(self):
self.dh_shared_secret = self.compute_secret()
- def calc_child_keys(self):
+ def calc_child_keys(self, kex=False):
prf = self.ike_prf_alg.mod()
s = self.i_nonce + self.r_nonce
+ if kex:
+ s = self.dh_shared_secret + s
c = self.child_sas[0]
encr_key_len = self.esp_crypto_key_len
integ_key_len = self.esp_integ_alg.key_len
salt_len = 0 if integ_key_len else 4
- l = (integ_key_len * 2 +
- encr_key_len * 2 +
- salt_len * 2)
+ l = integ_key_len * 2 + encr_key_len * 2 + salt_len * 2
keymat = self.calc_prfplus(prf, self.sk_d, s, l)
pos = 0
- c.sk_ei = keymat[pos:pos+encr_key_len]
+ c.sk_ei = keymat[pos : pos + encr_key_len]
pos += encr_key_len
if integ_key_len:
- c.sk_ai = keymat[pos:pos+integ_key_len]
+ c.sk_ai = keymat[pos : pos + integ_key_len]
pos += integ_key_len
else:
- c.salt_ei = keymat[pos:pos+salt_len]
+ c.salt_ei = keymat[pos : pos + salt_len]
pos += salt_len
- c.sk_er = keymat[pos:pos+encr_key_len]
+ c.sk_er = keymat[pos : pos + encr_key_len]
pos += encr_key_len
if integ_key_len:
- c.sk_ar = keymat[pos:pos+integ_key_len]
+ c.sk_ar = keymat[pos : pos + integ_key_len]
pos += integ_key_len
else:
- c.salt_er = keymat[pos:pos+salt_len]
+ c.salt_er = keymat[pos : pos + salt_len]
pos += salt_len
def calc_prfplus(self, prf, key, seed, length):
- r = b''
+ r = b""
t = None
x = 1
while len(r) < length and x < 255:
if t is not None:
s = t
else:
- s = b''
+ s = b""
s = s + seed + bytes([x])
t = self.calc_prf(prf, key, s)
r = r + t
@@ -329,14 +362,21 @@ class IKEv2SA(object):
h.update(data)
return h.finalize()
- def calc_keys(self):
+ def calc_keys(self, sk_d=None):
prf = self.ike_prf_alg.mod()
- # SKEYSEED = prf(Ni | Nr, g^ir)
- s = self.i_nonce + self.r_nonce
- self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
+ if sk_d is None:
+ # SKEYSEED = prf(Ni | Nr, g^ir)
+ self.skeyseed = self.calc_prf(
+ prf, self.i_nonce + self.r_nonce, self.dh_shared_secret
+ )
+ else:
+ # SKEYSEED = prf(SK_d (old), g^ir (new) | Ni | Nr)
+ self.skeyseed = self.calc_prf(
+ prf, sk_d, self.dh_shared_secret + self.i_nonce + self.r_nonce
+ )
# calculate S = Ni | Nr | SPIi SPIr
- s = s + self.ispi + self.rspi
+ s = self.i_nonce + self.r_nonce + self.ispi + self.rspi
prf_key_trunc = self.ike_prf_alg.trunc_len
encr_key_len = self.ike_crypto_key_len
@@ -347,30 +387,32 @@ class IKEv2SA(object):
else:
salt_size = 0
- l = (prf_key_trunc +
- integ_key_len * 2 +
- encr_key_len * 2 +
- tr_prf_key_len * 2 +
- salt_size * 2)
+ l = (
+ prf_key_trunc
+ + integ_key_len * 2
+ + encr_key_len * 2
+ + tr_prf_key_len * 2
+ + salt_size * 2
+ )
keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
pos = 0
- self.sk_d = keymat[:pos+prf_key_trunc]
+ self.sk_d = keymat[: pos + prf_key_trunc]
pos += prf_key_trunc
- self.sk_ai = keymat[pos:pos+integ_key_len]
+ self.sk_ai = keymat[pos : pos + integ_key_len]
pos += integ_key_len
- self.sk_ar = keymat[pos:pos+integ_key_len]
+ self.sk_ar = keymat[pos : pos + integ_key_len]
pos += integ_key_len
- self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
+ self.sk_ei = keymat[pos : pos + encr_key_len + salt_size]
pos += encr_key_len + salt_size
- self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
+ self.sk_er = keymat[pos : pos + encr_key_len + salt_size]
pos += encr_key_len + salt_size
- self.sk_pi = keymat[pos:pos+tr_prf_key_len]
+ self.sk_pi = keymat[pos : pos + tr_prf_key_len]
pos += tr_prf_key_len
- self.sk_pr = keymat[pos:pos+tr_prf_key_len]
+ self.sk_pr = keymat[pos : pos + tr_prf_key_len]
def generate_authmsg(self, prf, packet):
if self.is_initiator:
@@ -392,14 +434,15 @@ class IKEv2SA(object):
else:
packet = self.init_resp_packet
authmsg = self.generate_authmsg(prf, raw(packet))
- if self.auth_method == 'shared-key':
+ if self.auth_method == "shared-key":
psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
self.auth_data = self.calc_prf(prf, psk, authmsg)
- elif self.auth_method == 'rsa-sig':
- self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
- hashes.SHA1())
+ elif self.auth_method == "rsa-sig":
+ self.auth_data = self.priv_key.sign(
+ authmsg, padding.PKCS1v15(), hashes.SHA1()
+ )
else:
- raise TypeError('unknown auth method type!')
+ raise TypeError("unknown auth method type!")
def encrypt(self, data, aad=None):
data = self.ike_crypto_alg.pad(data)
@@ -430,7 +473,7 @@ class IKEv2SA(object):
return self.sk_ei
def concat(self, alg, key_len):
- return alg + '-' + str(key_len * 8)
+ return alg + "-" + str(key_len * 8)
@property
def vpp_ike_cypto_alg(self):
@@ -444,8 +487,9 @@ class IKEv2SA(object):
integ_trunc = self.ike_integ_alg.trunc_len
exp_hmac = ikemsg[-integ_trunc:]
data = ikemsg[:-integ_trunc]
- computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(),
- self.peer_authkey, data)
+ computed_hmac = self.compute_hmac(
+ self.ike_integ_alg.mod(), self.peer_authkey, data
+ )
self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac)
def compute_hmac(self, integ, key, data):
@@ -458,7 +502,7 @@ class IKEv2SA(object):
def hmac_and_decrypt(self, ike):
ep = ike[ikev2.IKEv2_payload_Encrypted]
- if self.ike_crypto == 'AES-GCM-16ICV':
+ if self.ike_crypto == "AES-GCM-16ICV":
aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
ct = ep.load[:-GCM_ICV_SIZE]
tag = ep.load[-GCM_ICV_SIZE:]
@@ -472,26 +516,26 @@ class IKEv2SA(object):
plain = self.decrypt(ct)
# remove padding
pad_len = plain[-1]
- return plain[:-pad_len - 1]
+ return plain[: -pad_len - 1]
def build_ts_addr(self, ts, version):
- return {'starting_address_v' + version: ts['start_addr'],
- 'ending_address_v' + version: ts['end_addr']}
+ return {
+ "starting_address_v" + version: ts["start_addr"],
+ "ending_address_v" + version: ts["end_addr"],
+ }
def generate_ts(self, is_ip4):
c = self.child_sas[0]
- ts_data = {'IP_protocol_ID': 0,
- 'start_port': 0,
- 'end_port': 0xffff}
+ ts_data = {"IP_protocol_ID": 0, "start_port": 0, "end_port": 0xFFFF}
if is_ip4:
- ts_data.update(self.build_ts_addr(c.local_ts, '4'))
+ ts_data.update(self.build_ts_addr(c.local_ts, "4"))
ts1 = ikev2.IPv4TrafficSelector(**ts_data)
- ts_data.update(self.build_ts_addr(c.remote_ts, '4'))
+ ts_data.update(self.build_ts_addr(c.remote_ts, "4"))
ts2 = ikev2.IPv4TrafficSelector(**ts_data)
else:
- ts_data.update(self.build_ts_addr(c.local_ts, '6'))
+ ts_data.update(self.build_ts_addr(c.local_ts, "6"))
ts1 = ikev2.IPv6TrafficSelector(**ts_data)
- ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
+ ts_data.update(self.build_ts_addr(c.remote_ts, "6"))
ts2 = ikev2.IPv6TrafficSelector(**ts_data)
if self.is_initiator:
@@ -500,18 +544,18 @@ class IKEv2SA(object):
def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
if crypto not in CRYPTO_ALGOS:
- raise TypeError('unsupported encryption algo %r' % crypto)
+ raise TypeError("unsupported encryption algo %r" % crypto)
self.ike_crypto = crypto
self.ike_crypto_alg = CRYPTO_ALGOS[crypto]
self.ike_crypto_key_len = crypto_key_len
if integ not in AUTH_ALGOS:
- raise TypeError('unsupported auth algo %r' % integ)
- self.ike_integ = None if integ == 'NULL' else integ
+ raise TypeError("unsupported auth algo %r" % integ)
+ self.ike_integ = None if integ == "NULL" else integ
self.ike_integ_alg = AUTH_ALGOS[integ]
if prf not in PRF_ALGOS:
- raise TypeError('unsupported prf algo %r' % prf)
+ raise TypeError("unsupported prf algo %r" % prf)
self.ike_prf = prf
self.ike_prf_alg = PRF_ALGOS[prf]
self.ike_dh = dh
@@ -520,20 +564,20 @@ class IKEv2SA(object):
def set_esp_props(self, crypto, crypto_key_len, integ):
self.esp_crypto_key_len = crypto_key_len
if crypto not in CRYPTO_ALGOS:
- raise TypeError('unsupported encryption algo %r' % crypto)
+ raise TypeError("unsupported encryption algo %r" % crypto)
self.esp_crypto = crypto
self.esp_crypto_alg = CRYPTO_ALGOS[crypto]
if integ not in AUTH_ALGOS:
- raise TypeError('unsupported auth algo %r' % integ)
- self.esp_integ = None if integ == 'NULL' else integ
+ raise TypeError("unsupported auth algo %r" % integ)
+ self.esp_integ = None if integ == "NULL" else integ
self.esp_integ_alg = AUTH_ALGOS[integ]
def crypto_attr(self, key_len):
- if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
- return (0x800e << 16 | key_len << 3, 12)
+ if self.ike_crypto in ["AES-CBC", "AES-GCM-16ICV"]:
+ return (0x800E << 16 | key_len << 3, 12)
else:
- raise Exception('unsupported attribute type')
+ raise Exception("unsupported attribute type")
def ike_crypto_attr(self):
return self.crypto_attr(self.ike_crypto_key_len)
@@ -544,19 +588,60 @@ class IKEv2SA(object):
def compute_nat_sha1(self, ip, port, rspi=None):
if rspi is None:
rspi = self.rspi
- data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
+ data = self.ispi + rspi + ip + (port).to_bytes(2, "big")
digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
digest.update(data)
return digest.finalize()
-
+ def clone(self, test, **kwargs):
+ if "spi" not in kwargs:
+ kwargs["spi"] = self.ispi if self.is_initiator else self.rspi
+ if "nonce" not in kwargs:
+ kwargs["nonce"] = self.i_nonce if self.is_initiator else self.r_nonce
+ if self.child_sas:
+ if "local_ts" not in kwargs:
+ kwargs["local_ts"] = self.child_sas[0].local_ts
+ if "remote_ts" not in kwargs:
+ kwargs["remote_ts"] = self.child_sas[0].remote_ts
+ sa = type(self)(
+ test,
+ is_initiator=self.is_initiator,
+ i_id=self.i_id,
+ r_id=self.r_id,
+ id_type=self.id_type,
+ auth_data=self.auth_data,
+ auth_method=self.auth_method,
+ priv_key=self.priv_key,
+ i_natt=self.i_natt,
+ r_natt=self.r_natt,
+ udp_encap=self.udp_encap,
+ **kwargs,
+ )
+ if sa.is_initiator:
+ sa.set_ike_props(
+ crypto=self.ike_crypto,
+ crypto_key_len=self.ike_crypto_key_len,
+ integ=self.ike_integ,
+ prf=self.ike_prf,
+ dh=self.ike_dh,
+ )
+ sa.set_esp_props(
+ crypto=self.esp_crypto,
+ crypto_key_len=self.esp_crypto_key_len,
+ integ=self.esp_integ,
+ )
+ return sa
+
+
+@unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
class IkePeer(VppTestCase):
- """ common class for initiator and responder """
+ """common class for initiator and responder"""
@classmethod
def setUpClass(cls):
import scapy.contrib.ikev2 as _ikev2
- globals()['ikev2'] = _ikev2
+
+ globals()["ikev2"] = _ikev2
super(IkePeer, cls).setUpClass()
cls.create_pg_interfaces(range(2))
for i in cls.pg_interfaces:
@@ -578,6 +663,8 @@ class IkePeer(VppTestCase):
self.initiate_del_sa_from_initiator()
r = self.vapi.ikev2_sa_dump()
self.assertEqual(len(r), 0)
+ r = self.vapi.ikev2_sa_v2_dump()
+ self.assertEqual(len(r), 0)
sas = self.vapi.ipsec_sa_dump()
self.assertEqual(len(sas), 0)
self.p.remove_vpp_config()
@@ -590,36 +677,60 @@ class IkePeer(VppTestCase):
self.assertIsNotNone(self.p.query_vpp_config())
if self.sa.is_initiator:
self.sa.generate_dh_data()
- self.vapi.cli('ikev2 set logging level 4')
- self.vapi.cli('event-lo clear')
+ self.vapi.cli("ikev2 set logging level 4")
+ self.vapi.cli("event-lo clear")
- def assert_counter(self, count, name, version='ip4'):
- node_name = '/err/ikev2-%s/' % version + name
+ def assert_counter(self, count, name, version="ip4"):
+ node_name = "/err/ikev2-%s/" % version + name
self.assertEqual(count, self.statistics.get_err_counter(node_name))
- def create_rekey_request(self):
- sa, first_payload = self.generate_auth_payload(is_rekey=True)
+ def create_rekey_request(self, kex=False):
+ sa, first_payload = self.generate_auth_payload(is_rekey=True, kex=kex)
header = ikev2.IKEv2(
- init_SPI=self.sa.ispi,
- resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
- flags='Initiator', exch_type='CREATE_CHILD_SA')
+ init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi,
+ id=self.sa.new_msg_id(),
+ flags="Initiator",
+ exch_type="CREATE_CHILD_SA",
+ )
ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
- return self.create_packet(self.pg0, ike_msg, self.sa.sport,
- self.sa.dport, self.sa.natt, self.ip6)
-
- def create_empty_request(self):
- header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
- id=self.sa.new_msg_id(), flags='Initiator',
- exch_type='INFORMATIONAL',
- next_payload='Encrypted')
+ return self.create_packet(
+ self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
+ )
- msg = self.encrypt_ike_msg(header, b'', None)
- return self.create_packet(self.pg0, msg, self.sa.sport,
- self.sa.dport, self.sa.natt, self.ip6)
+ def create_sa_rekey_request(self, **kwargs):
+ sa = self.generate_sa_init_payload(**kwargs)
+ header = ikev2.IKEv2(
+ init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi,
+ id=self.sa.new_msg_id(),
+ flags="Initiator",
+ exch_type="CREATE_CHILD_SA",
+ )
+ ike_msg = self.encrypt_ike_msg(header, sa, "SA")
+ return self.create_packet(
+ self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
+ )
- def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
- use_ip6=False):
+ def create_empty_request(self):
+ header = ikev2.IKEv2(
+ init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi,
+ id=self.sa.new_msg_id(),
+ flags="Initiator",
+ exch_type="INFORMATIONAL",
+ next_payload="Encrypted",
+ )
+
+ msg = self.encrypt_ike_msg(header, b"", None)
+ return self.create_packet(
+ self.pg0, msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
+ )
+
+ def create_packet(
+ self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False
+ ):
if use_ip6:
src_ip = src_if.remote_ip6
dst_ip = src_if.local_ip6
@@ -628,12 +739,14 @@ class IkePeer(VppTestCase):
src_ip = src_if.remote_ip4
dst_ip = src_if.local_ip4
ip_layer = IP
- res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
- ip_layer(src=src_ip, dst=dst_ip) /
- UDP(sport=sport, dport=dport))
+ res = (
+ Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+ / ip_layer(src=src_ip, dst=dst_ip)
+ / UDP(sport=sport, dport=dport)
+ )
if natt:
# insert non ESP marker
- res = res / Raw(b'\x00' * 4)
+ res = res / Raw(b"\x00" * 4)
return res / msg
def verify_udp(self, udp):
@@ -650,7 +763,7 @@ class IkePeer(VppTestCase):
esp = packet[ESP]
ih = self.verify_and_remove_non_esp_marker(esp)
self.assertEqual(ih.version, 0x20)
- self.assertNotIn('Version', ih.flags)
+ self.assertNotIn("Version", ih.flags)
return ih
def verify_and_remove_non_esp_marker(self, packet):
@@ -658,26 +771,32 @@ class IkePeer(VppTestCase):
# if we are in nat traversal mode check for non esp marker
# and remove it
data = raw(packet)
- self.assertEqual(data[:4], b'\x00' * 4)
+ self.assertEqual(data[:4], b"\x00" * 4)
return ikev2.IKEv2(data[4:])
else:
return packet
def encrypt_ike_msg(self, header, plain, first_payload):
- if self.sa.ike_crypto == 'AES-GCM-16ICV':
+ if self.sa.ike_crypto == "AES-GCM-16ICV":
data = self.sa.ike_crypto_alg.pad(raw(plain))
- plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
- len(ikev2.IKEv2_payload_Encrypted())
+ plen = (
+ len(data)
+ + GCM_IV_SIZE
+ + GCM_ICV_SIZE
+ + len(ikev2.IKEv2_payload_Encrypted())
+ )
tlen = plen + len(ikev2.IKEv2())
# prepare aad data
- sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
- length=plen)
+ sk_p = ikev2.IKEv2_payload_Encrypted(
+ next_payload=first_payload, length=plen
+ )
header.length = tlen
res = header / sk_p
encr = self.sa.encrypt(raw(plain), raw(res))
- sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
- length=plen, load=encr)
+ sk_p = ikev2.IKEv2_payload_Encrypted(
+ next_payload=first_payload, length=plen, load=encr
+ )
res = header / sk_p
else:
encr = self.sa.encrypt(raw(plain))
@@ -685,16 +804,18 @@ class IkePeer(VppTestCase):
plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
tlen = plen + len(ikev2.IKEv2())
- sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload,
- length=plen, load=encr)
+ sk_p = ikev2.IKEv2_payload_Encrypted(
+ next_payload=first_payload, length=plen, load=encr
+ )
header.length = tlen
res = header / sk_p
integ_data = raw(res)
- hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
- self.sa.my_authkey, integ_data)
+ hmac_data = self.sa.compute_hmac(
+ self.sa.ike_integ_alg.mod(), self.sa.my_authkey, integ_data
+ )
res = res / Raw(hmac_data[:trunc_len])
- assert(len(res) == tlen)
+ assert len(res) == tlen
return res
def verify_udp_encap(self, ipsec_sa):
@@ -704,14 +825,15 @@ class IkePeer(VppTestCase):
else:
self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
- def verify_ipsec_sas(self, is_rekey=False):
+ def verify_ipsec_sas(self, is_rekey=False, sa_count=None):
sas = self.vapi.ipsec_sa_dump()
- if is_rekey:
- # after rekey there is a short period of time in which old
- # inbound SA is still present
- sa_count = 3
- else:
- sa_count = 2
+ if sa_count is None:
+ if is_rekey:
+ # after rekey there is a short period of time in which old
+ # inbound SA is still present
+ sa_count = 3
+ else:
+ sa_count = 2
self.assertEqual(len(sas), sa_count)
if self.sa.is_initiator:
if is_rekey:
@@ -746,37 +868,116 @@ class IkePeer(VppTestCase):
# verify crypto keys
self.assertEqual(sa0.crypto_key.length, len(c.sk_er))
self.assertEqual(sa1.crypto_key.length, len(c.sk_ei))
- self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er)
- self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei)
+ self.assertEqual(sa0.crypto_key.data[: len(c.sk_er)], c.sk_er)
+ self.assertEqual(sa1.crypto_key.data[: len(c.sk_ei)], c.sk_ei)
# verify integ keys
if vpp_integ_alg:
self.assertEqual(sa0.integrity_key.length, len(c.sk_ar))
self.assertEqual(sa1.integrity_key.length, len(c.sk_ai))
- self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar)
- self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
+ self.assertEqual(sa0.integrity_key.data[: len(c.sk_ar)], c.sk_ar)
+ self.assertEqual(sa1.integrity_key.data[: len(c.sk_ai)], c.sk_ai)
else:
- self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er)
- self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei)
+ self.assertEqual(sa0.salt.to_bytes(4, "little"), c.salt_er)
+ self.assertEqual(sa1.salt.to_bytes(4, "little"), c.salt_ei)
def verify_keymat(self, api_keys, keys, name):
km = getattr(keys, name)
api_km = getattr(api_keys, name)
- api_km_len = getattr(api_keys, name + '_len')
+ api_km_len = getattr(api_keys, name + "_len")
self.assertEqual(len(km), api_km_len)
self.assertEqual(km, api_km[:api_km_len])
def verify_id(self, api_id, exp_id):
self.assertEqual(api_id.type, IDType.value(exp_id.type))
self.assertEqual(api_id.data_len, exp_id.data_len)
- self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type)
+ self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
- def verify_ike_sas(self):
+ def verify_ike_sas(self, is_rekey=False):
r = self.vapi.ikev2_sa_dump()
+ if is_rekey:
+ sa_count = 2
+ sa = r[1].sa
+ else:
+ sa_count = 1
+ sa = r[0].sa
+ self.assertEqual(len(r), sa_count)
+ self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
+ self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
+ if self.ip6:
+ if self.sa.is_initiator:
+ self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
+ self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
+ else:
+ self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
+ self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
+ else:
+ if self.sa.is_initiator:
+ self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
+ self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
+ else:
+ self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
+ self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
+ self.verify_keymat(sa.keys, self.sa, "sk_d")
+ self.verify_keymat(sa.keys, self.sa, "sk_ai")
+ self.verify_keymat(sa.keys, self.sa, "sk_ar")
+ self.verify_keymat(sa.keys, self.sa, "sk_ei")
+ self.verify_keymat(sa.keys, self.sa, "sk_er")
+ self.verify_keymat(sa.keys, self.sa, "sk_pi")
+ self.verify_keymat(sa.keys, self.sa, "sk_pr")
+
+ self.assertEqual(sa.i_id.type, self.sa.id_type)
+ self.assertEqual(sa.r_id.type, self.sa.id_type)
+ self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
+ self.assertEqual(sa.r_id.data_len, len(self.idr))
+ self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
+ self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
+
+ n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
+ self.verify_nonce(n, self.sa.i_nonce)
+ n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
+ self.verify_nonce(n, self.sa.r_nonce)
+
+ r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
+ if is_rekey:
+ self.assertEqual(len(r), 0)
+ return
+
+ self.assertEqual(len(r), 1)
+ csa = r[0].child_sa
+ self.assertEqual(csa.sa_index, sa.sa_index)
+ c = self.sa.child_sas[0]
+ if hasattr(c, "sk_ai"):
+ self.verify_keymat(csa.keys, c, "sk_ai")
+ self.verify_keymat(csa.keys, c, "sk_ar")
+ self.verify_keymat(csa.keys, c, "sk_ei")
+ self.verify_keymat(csa.keys, c, "sk_er")
+ self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
+ self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
+
+ tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
+ tsi = tsi[0]
+ tsr = tsr[0]
+ r = self.vapi.ikev2_traffic_selector_dump(
+ is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
+ )
+ self.assertEqual(len(r), 1)
+ ts = r[0].ts
+ self.verify_ts(r[0].ts, tsi[0], True)
+
+ r = self.vapi.ikev2_traffic_selector_dump(
+ is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
+ )
+ self.assertEqual(len(r), 1)
+ self.verify_ts(r[0].ts, tsr[0], False)
+
+ def verify_ike_sas_v2(self):
+ r = self.vapi.ikev2_sa_v2_dump()
self.assertEqual(len(r), 1)
sa = r[0].sa
- self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
- self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
+ self.assertEqual(self.p.profile_name, sa.profile_name)
+ self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
+ self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
if self.ip6:
if self.sa.is_initiator:
self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
@@ -791,55 +992,53 @@ class IkePeer(VppTestCase):
else:
self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
- self.verify_keymat(sa.keys, self.sa, 'sk_d')
- self.verify_keymat(sa.keys, self.sa, 'sk_ai')
- self.verify_keymat(sa.keys, self.sa, 'sk_ar')
- self.verify_keymat(sa.keys, self.sa, 'sk_ei')
- self.verify_keymat(sa.keys, self.sa, 'sk_er')
- self.verify_keymat(sa.keys, self.sa, 'sk_pi')
- self.verify_keymat(sa.keys, self.sa, 'sk_pr')
+ self.verify_keymat(sa.keys, self.sa, "sk_d")
+ self.verify_keymat(sa.keys, self.sa, "sk_ai")
+ self.verify_keymat(sa.keys, self.sa, "sk_ar")
+ self.verify_keymat(sa.keys, self.sa, "sk_ei")
+ self.verify_keymat(sa.keys, self.sa, "sk_er")
+ self.verify_keymat(sa.keys, self.sa, "sk_pi")
+ self.verify_keymat(sa.keys, self.sa, "sk_pr")
self.assertEqual(sa.i_id.type, self.sa.id_type)
self.assertEqual(sa.r_id.type, self.sa.id_type)
self.assertEqual(sa.i_id.data_len, len(self.sa.i_id))
- self.assertEqual(sa.r_id.data_len, len(self.sa.r_id))
- self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id)
- self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id)
+ self.assertEqual(sa.r_id.data_len, len(self.idr))
+ self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
+ self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
self.assertEqual(len(r), 1)
csa = r[0].child_sa
self.assertEqual(csa.sa_index, sa.sa_index)
c = self.sa.child_sas[0]
- if hasattr(c, 'sk_ai'):
- self.verify_keymat(csa.keys, c, 'sk_ai')
- self.verify_keymat(csa.keys, c, 'sk_ar')
- self.verify_keymat(csa.keys, c, 'sk_ei')
- self.verify_keymat(csa.keys, c, 'sk_er')
- self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
- self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
+ if hasattr(c, "sk_ai"):
+ self.verify_keymat(csa.keys, c, "sk_ai")
+ self.verify_keymat(csa.keys, c, "sk_ar")
+ self.verify_keymat(csa.keys, c, "sk_ei")
+ self.verify_keymat(csa.keys, c, "sk_er")
+ self.assertEqual(csa.i_spi.to_bytes(4, "big"), c.ispi)
+ self.assertEqual(csa.r_spi.to_bytes(4, "big"), c.rspi)
tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
tsi = tsi[0]
tsr = tsr[0]
r = self.vapi.ikev2_traffic_selector_dump(
- is_initiator=True, sa_index=sa.sa_index,
- child_sa_index=csa.child_sa_index)
+ is_initiator=True, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
+ )
self.assertEqual(len(r), 1)
ts = r[0].ts
self.verify_ts(r[0].ts, tsi[0], True)
r = self.vapi.ikev2_traffic_selector_dump(
- is_initiator=False, sa_index=sa.sa_index,
- child_sa_index=csa.child_sa_index)
+ is_initiator=False, sa_index=sa.sa_index, child_sa_index=csa.child_sa_index
+ )
self.assertEqual(len(r), 1)
self.verify_ts(r[0].ts, tsr[0], False)
- n = self.vapi.ikev2_nonce_get(is_initiator=True,
- sa_index=sa.sa_index)
+ n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
self.verify_nonce(n, self.sa.i_nonce)
- n = self.vapi.ikev2_nonce_get(is_initiator=False,
- sa_index=sa.sa_index)
+ n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
self.verify_nonce(n, self.sa.r_nonce)
def verify_nonce(self, api_nonce, nonce):
@@ -853,61 +1052,65 @@ class IkePeer(VppTestCase):
self.assertFalse(api_ts.is_local)
if self.p.ts_is_ip4:
- self.assertEqual(api_ts.start_addr,
- IPv4Address(ts.starting_address_v4))
- self.assertEqual(api_ts.end_addr,
- IPv4Address(ts.ending_address_v4))
+ self.assertEqual(api_ts.start_addr, IPv4Address(ts.starting_address_v4))
+ self.assertEqual(api_ts.end_addr, IPv4Address(ts.ending_address_v4))
else:
- self.assertEqual(api_ts.start_addr,
- IPv6Address(ts.starting_address_v6))
- self.assertEqual(api_ts.end_addr,
- IPv6Address(ts.ending_address_v6))
+ self.assertEqual(api_ts.start_addr, IPv6Address(ts.starting_address_v6))
+ self.assertEqual(api_ts.end_addr, IPv6Address(ts.ending_address_v6))
self.assertEqual(api_ts.start_port, ts.start_port)
self.assertEqual(api_ts.end_port, ts.end_port)
self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
class TemplateInitiator(IkePeer):
- """ initiator test template """
+ """initiator test template"""
def initiate_del_sa_from_initiator(self):
- ispi = int.from_bytes(self.sa.ispi, 'little')
+ ispi = int.from_bytes(self.sa.ispi, "little")
self.pg0.enable_capture()
self.pg_start()
self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
capture = self.pg0.get_capture(1)
ih = self.get_ike_header(capture[0])
- self.assertNotIn('Response', ih.flags)
- self.assertIn('Initiator', ih.flags)
+ self.assertNotIn("Response", ih.flags)
+ self.assertIn("Initiator", ih.flags)
self.assertEqual(ih.init_SPI, self.sa.ispi)
self.assertEqual(ih.resp_SPI, self.sa.rspi)
plain = self.sa.hmac_and_decrypt(ih)
d = ikev2.IKEv2_payload_Delete(plain)
self.assertEqual(d.proto, 1) # proto=IKEv2
- header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
- flags='Response', exch_type='INFORMATIONAL',
- id=ih.id, next_payload='Encrypted')
- resp = self.encrypt_ike_msg(header, b'', None)
+ header = ikev2.IKEv2(
+ init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi,
+ flags="Response",
+ exch_type="INFORMATIONAL",
+ id=ih.id,
+ next_payload="Encrypted",
+ )
+ resp = self.encrypt_ike_msg(header, b"", None)
self.send_and_assert_no_replies(self.pg0, resp)
def verify_del_sa(self, packet):
ih = self.get_ike_header(packet)
self.assertEqual(ih.id, self.sa.msg_id)
self.assertEqual(ih.exch_type, 37) # exchange informational
- self.assertIn('Response', ih.flags)
- self.assertIn('Initiator', ih.flags)
+ self.assertIn("Response", ih.flags)
+ self.assertIn("Initiator", ih.flags)
plain = self.sa.hmac_and_decrypt(ih)
- self.assertEqual(plain, b'')
+ self.assertEqual(plain, b"")
def initiate_del_sa_from_responder(self):
- header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
- exch_type='INFORMATIONAL',
- id=self.sa.new_msg_id())
- del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
- ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
- packet = self.create_packet(self.pg0, ike_msg,
- self.sa.sport, self.sa.dport,
- self.sa.natt, self.ip6)
+ header = ikev2.IKEv2(
+ init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi,
+ exch_type="INFORMATIONAL",
+ id=self.sa.new_msg_id(),
+ )
+ del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
+ ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
+ packet = self.create_packet(
+ self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
+ )
self.pg0.add_stream(packet)
self.pg0.enable_capture()
self.pg_start()
@@ -934,26 +1137,28 @@ class TemplateInitiator(IkePeer):
s = self.find_notify_payload(packet, 16388)
self.assertIsNotNone(s)
src_sha = self.sa.compute_nat_sha1(
- inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
+ inet_pton(socket.AF_INET, iph.src), udp.sport, b"\x00" * 8
+ )
self.assertEqual(s.load, src_sha)
# NAT_DETECTION_DESTINATION_IP
s = self.find_notify_payload(packet, 16389)
self.assertIsNotNone(s)
dst_sha = self.sa.compute_nat_sha1(
- inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
+ inet_pton(socket.AF_INET, iph.dst), udp.dport, b"\x00" * 8
+ )
self.assertEqual(s.load, dst_sha)
def verify_sa_init_request(self, packet):
udp = packet[UDP]
self.sa.dport = udp.sport
ih = packet[ikev2.IKEv2]
- self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
+ self.assertNotEqual(ih.init_SPI, 8 * b"\x00")
self.assertEqual(ih.exch_type, 34) # SA_INIT
self.sa.ispi = ih.init_SPI
- self.assertEqual(ih.resp_SPI, 8 * b'\x00')
- self.assertIn('Initiator', ih.flags)
- self.assertNotIn('Response', ih.flags)
+ self.assertEqual(ih.resp_SPI, 8 * b"\x00")
+ self.assertIn("Initiator", ih.flags)
+ self.assertNotIn("Response", ih.flags)
self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
@@ -961,20 +1166,23 @@ class TemplateInitiator(IkePeer):
self.assertEqual(prop.proto, 1) # proto = ikev2
self.assertEqual(prop.proposal, 1)
self.assertEqual(prop.trans[0].transform_type, 1) # encryption
- self.assertEqual(prop.trans[0].transform_id,
- self.p.ike_transforms['crypto_alg'])
+ self.assertEqual(
+ prop.trans[0].transform_id, self.p.ike_transforms["crypto_alg"]
+ )
self.assertEqual(prop.trans[1].transform_type, 2) # prf
self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
self.assertEqual(prop.trans[2].transform_type, 4) # dh
- self.assertEqual(prop.trans[2].transform_id,
- self.p.ike_transforms['dh_group'])
+ self.assertEqual(prop.trans[2].transform_id, self.p.ike_transforms["dh_group"])
self.verify_nat_detection(packet)
self.sa.set_ike_props(
- crypto='AES-GCM-16ICV', crypto_key_len=32,
- integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
- self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
- integ='SHA2-256-128')
+ crypto="AES-GCM-16ICV",
+ crypto_key_len=32,
+ integ="NULL",
+ prf="PRF_HMAC_SHA2_256",
+ dh="3072MODPgr",
+ )
+ self.sa.set_esp_props(crypto="AES-CBC", crypto_key_len=32, integ="SHA2-256-128")
self.sa.generate_dh_data()
self.sa.complete_dh_data()
self.sa.calc_keys()
@@ -994,8 +1202,8 @@ class TemplateInitiator(IkePeer):
self.assertEqual(ih.resp_SPI, self.sa.rspi)
self.assertEqual(ih.init_SPI, self.sa.ispi)
self.assertEqual(ih.exch_type, 35) # IKE_AUTH
- self.assertIn('Initiator', ih.flags)
- self.assertNotIn('Response', ih.flags)
+ self.assertIn("Initiator", ih.flags)
+ self.assertNotIn("Response", ih.flags)
udp = packet[UDP]
self.verify_udp(udp)
@@ -1003,54 +1211,76 @@ class TemplateInitiator(IkePeer):
self.sa.msg_id += 1
plain = self.sa.hmac_and_decrypt(ih)
idi = ikev2.IKEv2_payload_IDi(plain)
- idr = ikev2.IKEv2_payload_IDr(idi.payload)
self.assertEqual(idi.load, self.sa.i_id)
- self.assertEqual(idr.load, self.sa.r_id)
+ if self.no_idr_auth:
+ self.assertEqual(idi.next_payload, 39) # AUTH
+ else:
+ idr = ikev2.IKEv2_payload_IDr(idi.payload)
+ self.assertEqual(idr.load, self.sa.r_id)
prop = idi[ikev2.IKEv2_payload_Proposal]
c = self.sa.child_sas[0]
c.ispi = prop.SPI
- self.update_esp_transforms(
- prop[ikev2.IKEv2_payload_Transform], self.sa)
+ self.update_esp_transforms(prop[ikev2.IKEv2_payload_Transform], self.sa)
def send_init_response(self):
tr_attr = self.sa.ike_crypto_attr()
- trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
- transform_id=self.sa.ike_crypto, length=tr_attr[1],
- key_length=tr_attr[0]) /
- ikev2.IKEv2_payload_Transform(transform_type='Integrity',
- transform_id=self.sa.ike_integ) /
- ikev2.IKEv2_payload_Transform(transform_type='PRF',
- transform_id=self.sa.ike_prf_alg.name) /
- ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
- transform_id=self.sa.ike_dh))
- props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
- trans_nb=4, trans=trans))
+ trans = (
+ ikev2.IKEv2_payload_Transform(
+ transform_type="Encryption",
+ transform_id=self.sa.ike_crypto,
+ length=tr_attr[1],
+ key_length=tr_attr[0],
+ )
+ / ikev2.IKEv2_payload_Transform(
+ transform_type="Integrity", transform_id=self.sa.ike_integ
+ )
+ / ikev2.IKEv2_payload_Transform(
+ transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
+ )
+ / ikev2.IKEv2_payload_Transform(
+ transform_type="GroupDesc", transform_id=self.sa.ike_dh
+ )
+ )
+ props = ikev2.IKEv2_payload_Proposal(
+ proposal=1, proto="IKEv2", trans_nb=4, trans=trans
+ )
src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
if self.sa.natt:
- dst_address = b'\x0a\x0a\x0a\x0a'
+ dst_address = b"\x0a\x0a\x0a\x0a"
else:
dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
self.sa.init_resp_packet = (
- ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
- exch_type='IKE_SA_INIT', flags='Response') /
- ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
- ikev2.IKEv2_payload_KE(next_payload='Nonce',
- group=self.sa.ike_dh,
- load=self.sa.my_dh_pub_key) /
- ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce,
- next_payload='Notify') /
- ikev2.IKEv2_payload_Notify(
- type='NAT_DETECTION_SOURCE_IP', load=src_nat,
- next_payload='Notify') / ikev2.IKEv2_payload_Notify(
- type='NAT_DETECTION_DESTINATION_IP', load=dst_nat))
-
- ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
- self.sa.sport, self.sa.dport,
- False, self.ip6)
+ ikev2.IKEv2(
+ init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi,
+ exch_type="IKE_SA_INIT",
+ flags="Response",
+ )
+ / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
+ / ikev2.IKEv2_payload_KE(
+ next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
+ )
+ / ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, next_payload="Notify")
+ / ikev2.IKEv2_payload_Notify(
+ type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
+ )
+ / ikev2.IKEv2_payload_Notify(
+ type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
+ )
+ )
+
+ ike_msg = self.create_packet(
+ self.pg0,
+ self.sa.init_resp_packet,
+ self.sa.sport,
+ self.sa.dport,
+ False,
+ self.ip6,
+ )
self.pg_send(self.pg0, ike_msg)
capture = self.pg0.get_capture(1)
self.verify_sa_auth_req(capture[0])
@@ -1066,47 +1296,64 @@ class TemplateInitiator(IkePeer):
def send_auth_response(self):
tr_attr = self.sa.esp_crypto_attr()
- trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
- transform_id=self.sa.esp_crypto, length=tr_attr[1],
- key_length=tr_attr[0]) /
- ikev2.IKEv2_payload_Transform(transform_type='Integrity',
- transform_id=self.sa.esp_integ) /
- ikev2.IKEv2_payload_Transform(
- transform_type='Extended Sequence Number',
- transform_id='No ESN') /
- ikev2.IKEv2_payload_Transform(
- transform_type='Extended Sequence Number',
- transform_id='ESN'))
+ trans = (
+ ikev2.IKEv2_payload_Transform(
+ transform_type="Encryption",
+ transform_id=self.sa.esp_crypto,
+ length=tr_attr[1],
+ key_length=tr_attr[0],
+ )
+ / ikev2.IKEv2_payload_Transform(
+ transform_type="Integrity", transform_id=self.sa.esp_integ
+ )
+ / ikev2.IKEv2_payload_Transform(
+ transform_type="Extended Sequence Number", transform_id="No ESN"
+ )
+ / ikev2.IKEv2_payload_Transform(
+ transform_type="Extended Sequence Number", transform_id="ESN"
+ )
+ )
c = self.sa.child_sas[0]
- props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
- SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
+ props = ikev2.IKEv2_payload_Proposal(
+ proposal=1, proto="ESP", SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans
+ )
tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
- plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
- IDtype=self.sa.id_type, load=self.sa.i_id) /
- ikev2.IKEv2_payload_IDr(next_payload='AUTH',
- IDtype=self.sa.id_type, load=self.sa.r_id) /
- ikev2.IKEv2_payload_AUTH(next_payload='SA',
- auth_type=AuthMethod.value(self.sa.auth_method),
- load=self.sa.auth_data) /
- ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
- ikev2.IKEv2_payload_TSi(next_payload='TSr',
- number_of_TSs=len(tsi),
- traffic_selector=tsi) /
- ikev2.IKEv2_payload_TSr(next_payload='Notify',
- number_of_TSs=len(tsr),
- traffic_selector=tsr) /
- ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
+ plain = (
+ ikev2.IKEv2_payload_IDi(
+ next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
+ )
+ / ikev2.IKEv2_payload_IDr(
+ next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
+ )
+ / ikev2.IKEv2_payload_AUTH(
+ next_payload="SA",
+ auth_type=AuthMethod.value(self.sa.auth_method),
+ load=self.sa.auth_data,
+ )
+ / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
+ / ikev2.IKEv2_payload_TSi(
+ next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
+ )
+ / ikev2.IKEv2_payload_TSr(
+ next_payload="Notify", number_of_TSs=len(tsr), traffic_selector=tsr
+ )
+ / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
+ )
header = ikev2.IKEv2(
- init_SPI=self.sa.ispi,
- resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
- flags='Response', exch_type='IKE_AUTH')
-
- ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
- packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
- self.sa.dport, self.sa.natt, self.ip6)
+ init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi,
+ id=self.sa.new_msg_id(),
+ flags="Response",
+ exch_type="IKE_AUTH",
+ )
+
+ ike_msg = self.encrypt_ike_msg(header, plain, "IDi")
+ packet = self.create_packet(
+ self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
+ )
self.pg_send(self.pg0, packet)
def test_initiator(self):
@@ -1115,174 +1362,265 @@ class TemplateInitiator(IkePeer):
self.sa.calc_child_keys()
self.send_auth_response()
self.verify_ike_sas()
+ self.verify_ike_sas_v2()
class TemplateResponder(IkePeer):
- """ responder test template """
+ """responder test template"""
def initiate_del_sa_from_responder(self):
self.pg0.enable_capture()
self.pg_start()
- self.vapi.ikev2_initiate_del_ike_sa(
- ispi=int.from_bytes(self.sa.ispi, 'little'))
+ self.vapi.ikev2_initiate_del_ike_sa(ispi=int.from_bytes(self.sa.ispi, "little"))
capture = self.pg0.get_capture(1)
ih = self.get_ike_header(capture[0])
- self.assertNotIn('Response', ih.flags)
- self.assertNotIn('Initiator', ih.flags)
+ self.assertNotIn("Response", ih.flags)
+ self.assertNotIn("Initiator", ih.flags)
self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
plain = self.sa.hmac_and_decrypt(ih)
d = ikev2.IKEv2_payload_Delete(plain)
self.assertEqual(d.proto, 1) # proto=IKEv2
self.assertEqual(ih.init_SPI, self.sa.ispi)
self.assertEqual(ih.resp_SPI, self.sa.rspi)
- header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
- flags='Initiator+Response',
- exch_type='INFORMATIONAL',
- id=ih.id, next_payload='Encrypted')
- resp = self.encrypt_ike_msg(header, b'', None)
+ header = ikev2.IKEv2(
+ init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi,
+ flags="Initiator+Response",
+ exch_type="INFORMATIONAL",
+ id=ih.id,
+ next_payload="Encrypted",
+ )
+ resp = self.encrypt_ike_msg(header, b"", None)
self.send_and_assert_no_replies(self.pg0, resp)
def verify_del_sa(self, packet):
ih = self.get_ike_header(packet)
self.assertEqual(ih.id, self.sa.msg_id)
self.assertEqual(ih.exch_type, 37) # exchange informational
- self.assertIn('Response', ih.flags)
- self.assertNotIn('Initiator', ih.flags)
+ self.assertIn("Response", ih.flags)
+ self.assertNotIn("Initiator", ih.flags)
self.assertEqual(ih.next_payload, 46) # Encrypted
self.assertEqual(ih.init_SPI, self.sa.ispi)
self.assertEqual(ih.resp_SPI, self.sa.rspi)
plain = self.sa.hmac_and_decrypt(ih)
- self.assertEqual(plain, b'')
+ self.assertEqual(plain, b"")
def initiate_del_sa_from_initiator(self):
- header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
- flags='Initiator', exch_type='INFORMATIONAL',
- id=self.sa.new_msg_id())
- del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
- ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
- packet = self.create_packet(self.pg0, ike_msg,
- self.sa.sport, self.sa.dport,
- self.sa.natt, self.ip6)
+ header = ikev2.IKEv2(
+ init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi,
+ flags="Initiator",
+ exch_type="INFORMATIONAL",
+ id=self.sa.new_msg_id(),
+ )
+ del_sa = ikev2.IKEv2_payload_Delete(proto="IKEv2")
+ ike_msg = self.encrypt_ike_msg(header, del_sa, "Delete")
+ packet = self.create_packet(
+ self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
+ )
self.pg0.add_stream(packet)
self.pg0.enable_capture()
self.pg_start()
capture = self.pg0.get_capture(1)
self.verify_del_sa(capture[0])
- def send_sa_init_req(self):
+ def generate_sa_init_payload(
+ self, spi=None, dh_pub_key=None, nonce=None, next_payload=None
+ ):
tr_attr = self.sa.ike_crypto_attr()
- trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
- transform_id=self.sa.ike_crypto, length=tr_attr[1],
- key_length=tr_attr[0]) /
- ikev2.IKEv2_payload_Transform(transform_type='Integrity',
- transform_id=self.sa.ike_integ) /
- ikev2.IKEv2_payload_Transform(transform_type='PRF',
- transform_id=self.sa.ike_prf_alg.name) /
- ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
- transform_id=self.sa.ike_dh))
-
- props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
- trans_nb=4, trans=trans))
-
- next_payload = None if self.ip6 else 'Notify'
-
- self.sa.init_req_packet = (
- ikev2.IKEv2(init_SPI=self.sa.ispi,
- flags='Initiator', exch_type='IKE_SA_INIT') /
- ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
- ikev2.IKEv2_payload_KE(next_payload='Nonce',
- group=self.sa.ike_dh,
- load=self.sa.my_dh_pub_key) /
- ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
- load=self.sa.i_nonce))
+ trans = (
+ ikev2.IKEv2_payload_Transform(
+ transform_type="Encryption",
+ transform_id=self.sa.ike_crypto,
+ length=tr_attr[1],
+ key_length=tr_attr[0],
+ )
+ / ikev2.IKEv2_payload_Transform(
+ transform_type="Integrity", transform_id=self.sa.ike_integ
+ )
+ / ikev2.IKEv2_payload_Transform(
+ transform_type="PRF", transform_id=self.sa.ike_prf_alg.name
+ )
+ / ikev2.IKEv2_payload_Transform(
+ transform_type="GroupDesc", transform_id=self.sa.ike_dh
+ )
+ )
+
+ if spi is None:
+ pargs = {}
+ else:
+ pargs = {"SPI": spi, "SPIsize": len(spi)}
+ props = ikev2.IKEv2_payload_Proposal(
+ proposal=1,
+ proto="IKEv2",
+ trans_nb=4,
+ trans=trans,
+ **pargs,
+ )
+
+ return (
+ ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
+ / ikev2.IKEv2_payload_KE(
+ next_payload="Nonce",
+ group=self.sa.ike_dh,
+ load=self.sa.my_dh_pub_key if dh_pub_key is None else dh_pub_key,
+ )
+ / ikev2.IKEv2_payload_Nonce(
+ next_payload=next_payload,
+ load=self.sa.i_nonce if nonce is None else nonce,
+ )
+ )
+
+ def send_sa_init_req(self):
+ self.sa.init_req_packet = ikev2.IKEv2(
+ init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
+ ) / self.generate_sa_init_payload(next_payload=None if self.ip6 else "Notify")
if not self.ip6:
if self.sa.i_natt:
- src_address = b'\x0a\x0a\x0a\x01'
+ src_address = b"\x0a\x0a\x0a\x01"
else:
src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
if self.sa.r_natt:
- dst_address = b'\x0a\x0a\x0a\x0a'
+ dst_address = b"\x0a\x0a\x0a\x0a"
else:
dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4)
src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport)
nat_src_detection = ikev2.IKEv2_payload_Notify(
- type='NAT_DETECTION_SOURCE_IP', load=src_nat,
- next_payload='Notify')
+ type="NAT_DETECTION_SOURCE_IP", load=src_nat, next_payload="Notify"
+ )
nat_dst_detection = ikev2.IKEv2_payload_Notify(
- type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
- self.sa.init_req_packet = (self.sa.init_req_packet /
- nat_src_detection /
- nat_dst_detection)
-
- ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
- self.sa.sport, self.sa.dport,
- self.sa.natt, self.ip6)
+ type="NAT_DETECTION_DESTINATION_IP", load=dst_nat
+ )
+ self.sa.init_req_packet = (
+ self.sa.init_req_packet / nat_src_detection / nat_dst_detection
+ )
+
+ ike_msg = self.create_packet(
+ self.pg0,
+ self.sa.init_req_packet,
+ self.sa.sport,
+ self.sa.dport,
+ self.sa.natt,
+ self.ip6,
+ )
self.pg0.add_stream(ike_msg)
self.pg0.enable_capture()
self.pg_start()
capture = self.pg0.get_capture(1)
self.verify_sa_init(capture[0])
- def generate_auth_payload(self, last_payload=None, is_rekey=False):
+ def generate_auth_payload(self, last_payload=None, is_rekey=False, kex=False):
tr_attr = self.sa.esp_crypto_attr()
- last_payload = last_payload or 'Notify'
- trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
- transform_id=self.sa.esp_crypto, length=tr_attr[1],
- key_length=tr_attr[0]) /
- ikev2.IKEv2_payload_Transform(transform_type='Integrity',
- transform_id=self.sa.esp_integ) /
- ikev2.IKEv2_payload_Transform(
- transform_type='Extended Sequence Number',
- transform_id='No ESN') /
- ikev2.IKEv2_payload_Transform(
- transform_type='Extended Sequence Number',
- transform_id='ESN'))
+ last_payload = last_payload or "Notify"
+ trans_nb = 4
+ trans = (
+ ikev2.IKEv2_payload_Transform(
+ transform_type="Encryption",
+ transform_id=self.sa.esp_crypto,
+ length=tr_attr[1],
+ key_length=tr_attr[0],
+ )
+ / ikev2.IKEv2_payload_Transform(
+ transform_type="Integrity", transform_id=self.sa.esp_integ
+ )
+ / ikev2.IKEv2_payload_Transform(
+ transform_type="Extended Sequence Number", transform_id="No ESN"
+ )
+ / ikev2.IKEv2_payload_Transform(
+ transform_type="Extended Sequence Number", transform_id="ESN"
+ )
+ )
+
+ if kex:
+ trans_nb += 1
+ trans /= ikev2.IKEv2_payload_Transform(
+ transform_type="GroupDesc", transform_id=self.sa.ike_dh
+ )
c = self.sa.child_sas[0]
- props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
- SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
+ props = ikev2.IKEv2_payload_Proposal(
+ proposal=1,
+ proto="ESP",
+ SPIsize=4,
+ SPI=c.ispi,
+ trans_nb=trans_nb,
+ trans=trans,
+ )
tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
- plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
- auth_type=AuthMethod.value(self.sa.auth_method),
- load=self.sa.auth_data) /
- ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
- ikev2.IKEv2_payload_TSi(next_payload='TSr',
- number_of_TSs=len(tsi), traffic_selector=tsi) /
- ikev2.IKEv2_payload_TSr(next_payload=last_payload,
- number_of_TSs=len(tsr), traffic_selector=tsr))
+ plain = (
+ ikev2.IKEv2_payload_AUTH(
+ next_payload="SA",
+ auth_type=AuthMethod.value(self.sa.auth_method),
+ load=self.sa.auth_data,
+ )
+ / ikev2.IKEv2_payload_SA(next_payload="TSi", prop=props)
+ / ikev2.IKEv2_payload_TSi(
+ next_payload="TSr", number_of_TSs=len(tsi), traffic_selector=tsi
+ )
+ / ikev2.IKEv2_payload_TSr(
+ next_payload=last_payload, number_of_TSs=len(tsr), traffic_selector=tsr
+ )
+ )
if is_rekey:
- first_payload = 'Nonce'
- plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
- next_payload='SA') / plain /
- ikev2.IKEv2_payload_Notify(type='REKEY_SA',
- proto='ESP', SPI=c.ispi))
+ first_payload = "Nonce"
+ if kex:
+ head = ikev2.IKEv2_payload_Nonce(
+ load=self.sa.i_nonce, next_payload="KE"
+ ) / ikev2.IKEv2_payload_KE(
+ group=self.sa.ike_dh, load=self.sa.my_dh_pub_key, next_payload="SA"
+ )
+ else:
+ head = ikev2.IKEv2_payload_Nonce(
+ load=self.sa.i_nonce, next_payload="SA"
+ )
+ plain = (
+ head
+ / plain
+ / ikev2.IKEv2_payload_Notify(
+ type="REKEY_SA",
+ proto="ESP",
+ SPI=c.ispi,
+ length=8 + len(c.ispi),
+ next_payload="Notify",
+ )
+ / ikev2.IKEv2_payload_Notify(type="ESP_TFC_PADDING_NOT_SUPPORTED")
+ )
else:
- first_payload = 'IDi'
- ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
- IDtype=self.sa.id_type, load=self.sa.i_id) /
- ikev2.IKEv2_payload_IDr(next_payload='AUTH',
- IDtype=self.sa.id_type, load=self.sa.r_id))
+ first_payload = "IDi"
+ if self.no_idr_auth:
+ ids = ikev2.IKEv2_payload_IDi(
+ next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.i_id
+ )
+ else:
+ ids = ikev2.IKEv2_payload_IDi(
+ next_payload="IDr", IDtype=self.sa.id_type, load=self.sa.i_id
+ ) / ikev2.IKEv2_payload_IDr(
+ next_payload="AUTH", IDtype=self.sa.id_type, load=self.sa.r_id
+ )
plain = ids / plain
return plain, first_payload
def send_sa_auth(self):
- plain, first_payload = self.generate_auth_payload(
- last_payload='Notify')
- plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
+ plain, first_payload = self.generate_auth_payload(last_payload="Notify")
+ plain = plain / ikev2.IKEv2_payload_Notify(type="INITIAL_CONTACT")
header = ikev2.IKEv2(
- init_SPI=self.sa.ispi,
- resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
- flags='Initiator', exch_type='IKE_AUTH')
+ init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi,
+ id=self.sa.new_msg_id(),
+ flags="Initiator",
+ exch_type="IKE_AUTH",
+ )
ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
- packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
- self.sa.dport, self.sa.natt, self.ip6)
+ packet = self.create_packet(
+ self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
+ )
self.pg0.add_stream(packet)
self.pg0.enable_capture()
self.pg_start()
@@ -1294,7 +1632,7 @@ class TemplateResponder(IkePeer):
self.assertEqual(ih.id, self.sa.msg_id)
self.assertEqual(ih.exch_type, 34)
- self.assertIn('Response', ih.flags)
+ self.assertIn("Response", ih.flags)
self.assertEqual(ih.init_SPI, self.sa.ispi)
self.assertNotEqual(ih.resp_SPI, 0)
self.sa.rspi = ih.resp_SPI
@@ -1322,23 +1660,29 @@ class TemplateResponder(IkePeer):
self.sa.child_sas[0].rspi = prop.SPI
self.sa.calc_child_keys()
- IKE_NODE_SUFFIX = 'ip4'
+ IKE_NODE_SUFFIX = "ip4"
def verify_counters(self):
- self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX)
- self.assert_counter(1, 'init_sa_req', self.IKE_NODE_SUFFIX)
- self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX)
+ self.assert_counter(2, "processed", self.IKE_NODE_SUFFIX)
+ self.assert_counter(1, "init_sa_req", self.IKE_NODE_SUFFIX)
+ self.assert_counter(1, "ike_auth_req", self.IKE_NODE_SUFFIX)
r = self.vapi.ikev2_sa_dump()
s = r[0].sa.stats
self.assertEqual(1, s.n_sa_auth_req)
self.assertEqual(1, s.n_sa_init_req)
+ r = self.vapi.ikev2_sa_v2_dump()
+ s = r[0].sa.stats
+ self.assertEqual(1, s.n_sa_auth_req)
+ self.assertEqual(1, s.n_sa_init_req)
+
def test_responder(self):
self.send_sa_init_req()
self.send_sa_auth()
self.verify_ipsec_sas()
self.verify_ike_sas()
+ self.verify_ike_sas_v2()
self.verify_counters()
@@ -1347,125 +1691,157 @@ class Ikev2Params(object):
ec = VppEnum.vl_api_ipsec_crypto_alg_t
ei = VppEnum.vl_api_ipsec_integ_alg_t
self.vpp_enums = {
- 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
- 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
- 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
- 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
- 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
- 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
-
- 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96,
- 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128,
- 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
- 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
-
- dpd_disabled = True if 'dpd_disabled' not in params else\
- params['dpd_disabled']
+ "AES-CBC-128": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128,
+ "AES-CBC-192": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192,
+ "AES-CBC-256": ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256,
+ "AES-GCM-16ICV-128": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128,
+ "AES-GCM-16ICV-192": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192,
+ "AES-GCM-16ICV-256": ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256,
+ "HMAC-SHA1-96": ei.IPSEC_API_INTEG_ALG_SHA1_96,
+ "SHA2-256-128": ei.IPSEC_API_INTEG_ALG_SHA_256_128,
+ "SHA2-384-192": ei.IPSEC_API_INTEG_ALG_SHA_384_192,
+ "SHA2-512-256": ei.IPSEC_API_INTEG_ALG_SHA_512_256,
+ }
+
+ dpd_disabled = True if "dpd_disabled" not in params else params["dpd_disabled"]
if dpd_disabled:
- self.vapi.cli('ikev2 dpd disable')
- self.del_sa_from_responder = False if 'del_sa_from_responder'\
- not in params else params['del_sa_from_responder']
- i_natt = False if 'i_natt' not in params else params['i_natt']
- r_natt = False if 'r_natt' not in params else params['r_natt']
- self.p = Profile(self, 'pr1')
- self.ip6 = False if 'ip6' not in params else params['ip6']
-
- if 'auth' in params and params['auth'] == 'rsa-sig':
- auth_method = 'rsa-sig'
- work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
- self.vapi.ikev2_set_local_key(
- key_file=work_dir + params['server-key'])
-
- client_file = work_dir + params['client-cert']
- server_pem = open(work_dir + params['server-cert']).read()
- client_priv = open(work_dir + params['client-key']).read()
- client_priv = load_pem_private_key(str.encode(client_priv), None,
- default_backend())
+ self.vapi.cli("ikev2 dpd disable")
+ self.del_sa_from_responder = (
+ False
+ if "del_sa_from_responder" not in params
+ else params["del_sa_from_responder"]
+ )
+ i_natt = False if "i_natt" not in params else params["i_natt"]
+ r_natt = False if "r_natt" not in params else params["r_natt"]
+ self.p = Profile(self, "pr1")
+ self.ip6 = False if "ip6" not in params else params["ip6"]
+
+ if "auth" in params and params["auth"] == "rsa-sig":
+ auth_method = "rsa-sig"
+ work_dir = f"{config.vpp_ws_dir}/src/plugins/ikev2/test/certs/"
+ self.vapi.ikev2_set_local_key(key_file=work_dir + params["server-key"])
+
+ client_file = work_dir + params["client-cert"]
+ server_pem = open(work_dir + params["server-cert"]).read()
+ client_priv = open(work_dir + params["client-key"]).read()
+ client_priv = load_pem_private_key(
+ str.encode(client_priv), None, default_backend()
+ )
self.peer_cert = x509.load_pem_x509_certificate(
- str.encode(server_pem),
- default_backend())
- self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
+ str.encode(server_pem), default_backend()
+ )
+ self.p.add_auth(method="rsa-sig", data=str.encode(client_file))
auth_data = None
else:
- auth_data = b'$3cr3tpa$$w0rd'
- self.p.add_auth(method='shared-key', data=auth_data)
- auth_method = 'shared-key'
+ auth_data = b"$3cr3tpa$$w0rd"
+ self.p.add_auth(method="shared-key", data=auth_data)
+ auth_method = "shared-key"
client_priv = None
- is_init = True if 'is_initiator' not in params else\
- params['is_initiator']
+ is_init = True if "is_initiator" not in params else params["is_initiator"]
+ self.no_idr_auth = params.get("no_idr_in_auth", False)
- idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
- idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
+ idr = {"id_type": "fqdn", "data": b"vpp.home"}
+ idi = {"id_type": "fqdn", "data": b"roadwarrior.example.com"}
+ r_id = self.idr = idr["data"]
+ i_id = self.idi = idi["data"]
if is_init:
+ # scapy is initiator, VPP is responder
self.p.add_local_id(**idr)
self.p.add_remote_id(**idi)
+ if self.no_idr_auth:
+ r_id = None
else:
+ # VPP is initiator, scapy is responder
self.p.add_local_id(**idi)
- self.p.add_remote_id(**idr)
-
- loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
- 'loc_ts' not in params else params['loc_ts']
- rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
- 'rem_ts' not in params else params['rem_ts']
+ if not self.no_idr_auth:
+ self.p.add_remote_id(**idr)
+
+ loc_ts = (
+ {"start_addr": "10.10.10.0", "end_addr": "10.10.10.255"}
+ if "loc_ts" not in params
+ else params["loc_ts"]
+ )
+ rem_ts = (
+ {"start_addr": "10.0.0.0", "end_addr": "10.0.0.255"}
+ if "rem_ts" not in params
+ else params["rem_ts"]
+ )
self.p.add_local_ts(**loc_ts)
self.p.add_remote_ts(**rem_ts)
- if 'responder' in params:
- self.p.add_responder(params['responder'])
- if 'ike_transforms' in params:
- self.p.add_ike_transforms(params['ike_transforms'])
- if 'esp_transforms' in params:
- self.p.add_esp_transforms(params['esp_transforms'])
-
- udp_encap = False if 'udp_encap' not in params else\
- params['udp_encap']
+ if "responder" in params:
+ self.p.add_responder(params["responder"])
+ if "ike_transforms" in params:
+ self.p.add_ike_transforms(params["ike_transforms"])
+ if "esp_transforms" in params:
+ self.p.add_esp_transforms(params["esp_transforms"])
+
+ udp_encap = False if "udp_encap" not in params else params["udp_encap"]
if udp_encap:
self.p.set_udp_encap(True)
- if 'responder_hostname' in params:
- hn = params['responder_hostname']
+ if "responder_hostname" in params:
+ hn = params["responder_hostname"]
self.p.add_responder_hostname(hn)
# configure static dns record
self.vapi.dns_name_server_add_del(
- is_ip6=0, is_add=1,
- server_address=IPv4Address(u'8.8.8.8').packed)
+ is_ip6=0, is_add=1, server_address=IPv4Address("8.8.8.8").packed
+ )
self.vapi.dns_enable_disable(enable=1)
- cmd = "dns cache add {} {}".format(hn['hostname'],
- self.pg0.remote_ip4)
+ cmd = "dns cache add {} {}".format(hn["hostname"], self.pg0.remote_ip4)
self.vapi.cli(cmd)
- self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
- is_initiator=is_init,
- id_type=self.p.local_id['id_type'],
- i_natt=i_natt, r_natt=r_natt,
- priv_key=client_priv, auth_method=auth_method,
- auth_data=auth_data, udp_encap=udp_encap,
- local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
+ self.sa = IKEv2SA(
+ self,
+ i_id=i_id,
+ r_id=r_id,
+ is_initiator=is_init,
+ id_type=self.p.local_id["id_type"],
+ i_natt=i_natt,
+ r_natt=r_natt,
+ priv_key=client_priv,
+ auth_method=auth_method,
+ nonce=params.get("nonce"),
+ auth_data=auth_data,
+ udp_encap=udp_encap,
+ local_ts=self.p.remote_ts,
+ remote_ts=self.p.local_ts,
+ )
+
if is_init:
- ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
- params['ike-crypto']
- ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
- params['ike-integ']
- ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
- params['ike-dh']
-
- esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
- params['esp-crypto']
- esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
- params['esp-integ']
+ ike_crypto = (
+ ("AES-CBC", 32) if "ike-crypto" not in params else params["ike-crypto"]
+ )
+ ike_integ = (
+ "HMAC-SHA1-96" if "ike-integ" not in params else params["ike-integ"]
+ )
+ ike_dh = "2048MODPgr" if "ike-dh" not in params else params["ike-dh"]
+
+ esp_crypto = (
+ ("AES-CBC", 32) if "esp-crypto" not in params else params["esp-crypto"]
+ )
+ esp_integ = (
+ "HMAC-SHA1-96" if "esp-integ" not in params else params["esp-integ"]
+ )
self.sa.set_ike_props(
- crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
- integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
+ crypto=ike_crypto[0],
+ crypto_key_len=ike_crypto[1],
+ integ=ike_integ,
+ prf="PRF_HMAC_SHA2_256",
+ dh=ike_dh,
+ )
self.sa.set_esp_props(
- crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
- integ=esp_integ)
+ crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], integ=esp_integ
+ )
+@unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
class TestApi(VppTestCase):
- """ Test IKEV2 API """
+ """Test IKEV2 API"""
+
@classmethod
def setUpClass(cls):
super(TestApi, cls).setUpClass()
@@ -1482,241 +1858,249 @@ class TestApi(VppTestCase):
self.assertEqual(len(r), 0)
def configure_profile(self, cfg):
- p = Profile(self, cfg['name'])
- p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
- p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
- p.add_local_ts(**cfg['loc_ts'])
- p.add_remote_ts(**cfg['rem_ts'])
- p.add_responder(cfg['responder'])
- p.add_ike_transforms(cfg['ike_ts'])
- p.add_esp_transforms(cfg['esp_ts'])
- p.add_auth(**cfg['auth'])
- p.set_udp_encap(cfg['udp_encap'])
- p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
- if 'lifetime_data' in cfg:
- p.set_lifetime_data(cfg['lifetime_data'])
- if 'tun_itf' in cfg:
- p.set_tunnel_interface(cfg['tun_itf'])
- if 'natt_disabled' in cfg and cfg['natt_disabled']:
+ p = Profile(self, cfg["name"])
+ p.add_local_id(id_type=cfg["loc_id"][0], data=cfg["loc_id"][1])
+ p.add_remote_id(id_type=cfg["rem_id"][0], data=cfg["rem_id"][1])
+ p.add_local_ts(**cfg["loc_ts"])
+ p.add_remote_ts(**cfg["rem_ts"])
+ p.add_responder(cfg["responder"])
+ p.add_ike_transforms(cfg["ike_ts"])
+ p.add_esp_transforms(cfg["esp_ts"])
+ p.add_auth(**cfg["auth"])
+ p.set_udp_encap(cfg["udp_encap"])
+ p.set_ipsec_over_udp_port(cfg["ipsec_over_udp_port"])
+ if "lifetime_data" in cfg:
+ p.set_lifetime_data(cfg["lifetime_data"])
+ if "tun_itf" in cfg:
+ p.set_tunnel_interface(cfg["tun_itf"])
+ if "natt_disabled" in cfg and cfg["natt_disabled"]:
p.disable_natt()
p.add_vpp_config()
return p
def test_profile_api(self):
- """ test profile dump API """
+ """test profile dump API"""
loc_ts4 = {
- 'proto': 8,
- 'start_port': 1,
- 'end_port': 19,
- 'start_addr': '3.3.3.2',
- 'end_addr': '3.3.3.3',
- }
+ "proto": 8,
+ "start_port": 1,
+ "end_port": 19,
+ "start_addr": "3.3.3.2",
+ "end_addr": "3.3.3.3",
+ }
rem_ts4 = {
- 'proto': 9,
- 'start_port': 10,
- 'end_port': 119,
- 'start_addr': '4.5.76.80',
- 'end_addr': '2.3.4.6',
- }
+ "proto": 9,
+ "start_port": 10,
+ "end_port": 119,
+ "start_addr": "4.5.76.80",
+ "end_addr": "2.3.4.6",
+ }
loc_ts6 = {
- 'proto': 8,
- 'start_port': 1,
- 'end_port': 19,
- 'start_addr': 'ab::1',
- 'end_addr': 'ab::4',
- }
+ "proto": 8,
+ "start_port": 1,
+ "end_port": 19,
+ "start_addr": "ab::1",
+ "end_addr": "ab::4",
+ }
rem_ts6 = {
- 'proto': 9,
- 'start_port': 10,
- 'end_port': 119,
- 'start_addr': 'cd::12',
- 'end_addr': 'cd::13',
- }
+ "proto": 9,
+ "start_port": 10,
+ "end_port": 119,
+ "start_addr": "cd::12",
+ "end_addr": "cd::13",
+ }
conf = {
- 'p1': {
- 'name': 'p1',
- 'natt_disabled': True,
- 'loc_id': ('fqdn', b'vpp.home'),
- 'rem_id': ('fqdn', b'roadwarrior.example.com'),
- 'loc_ts': loc_ts4,
- 'rem_ts': rem_ts4,
- 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'},
- 'ike_ts': {
- 'crypto_alg': 20,
- 'crypto_key_size': 32,
- 'integ_alg': 0,
- 'dh_group': 1},
- 'esp_ts': {
- 'crypto_alg': 13,
- 'crypto_key_size': 24,
- 'integ_alg': 2},
- 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
- 'udp_encap': True,
- 'ipsec_over_udp_port': 4501,
- 'lifetime_data': {
- 'lifetime': 123,
- 'lifetime_maxdata': 20192,
- 'lifetime_jitter': 9,
- 'handover': 132},
+ "p1": {
+ "name": "p1",
+ "natt_disabled": True,
+ "loc_id": ("fqdn", b"vpp.home"),
+ "rem_id": ("fqdn", b"roadwarrior.example.com"),
+ "loc_ts": loc_ts4,
+ "rem_ts": rem_ts4,
+ "responder": {"sw_if_index": 0, "addr": "5.6.7.8"},
+ "ike_ts": {
+ "crypto_alg": 20,
+ "crypto_key_size": 32,
+ "integ_alg": 0,
+ "dh_group": 1,
+ },
+ "esp_ts": {"crypto_alg": 13, "crypto_key_size": 24, "integ_alg": 2},
+ "auth": {"method": "shared-key", "data": b"sharedkeydata"},
+ "udp_encap": True,
+ "ipsec_over_udp_port": 4501,
+ "lifetime_data": {
+ "lifetime": 123,
+ "lifetime_maxdata": 20192,
+ "lifetime_jitter": 9,
+ "handover": 132,
+ },
+ },
+ "p2": {
+ "name": "p2",
+ "loc_id": ("ip4-addr", b"192.168.2.1"),
+ "rem_id": ("ip6-addr", b"abcd::1"),
+ "loc_ts": loc_ts6,
+ "rem_ts": rem_ts6,
+ "responder": {"sw_if_index": 4, "addr": "def::10"},
+ "ike_ts": {
+ "crypto_alg": 12,
+ "crypto_key_size": 16,
+ "integ_alg": 3,
+ "dh_group": 3,
+ },
+ "esp_ts": {"crypto_alg": 9, "crypto_key_size": 24, "integ_alg": 4},
+ "auth": {"method": "shared-key", "data": b"sharedkeydata"},
+ "udp_encap": False,
+ "ipsec_over_udp_port": 4600,
+ "tun_itf": 0,
},
- 'p2': {
- 'name': 'p2',
- 'loc_id': ('ip4-addr', b'192.168.2.1'),
- 'rem_id': ('ip6-addr', b'abcd::1'),
- 'loc_ts': loc_ts6,
- 'rem_ts': rem_ts6,
- 'responder': {'sw_if_index': 4, 'addr': 'def::10'},
- 'ike_ts': {
- 'crypto_alg': 12,
- 'crypto_key_size': 16,
- 'integ_alg': 3,
- 'dh_group': 3},
- 'esp_ts': {
- 'crypto_alg': 9,
- 'crypto_key_size': 24,
- 'integ_alg': 4},
- 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
- 'udp_encap': False,
- 'ipsec_over_udp_port': 4600,
- 'tun_itf': 0}
}
- self.p1 = self.configure_profile(conf['p1'])
- self.p2 = self.configure_profile(conf['p2'])
+ self.p1 = self.configure_profile(conf["p1"])
+ self.p2 = self.configure_profile(conf["p2"])
r = self.vapi.ikev2_profile_dump()
self.assertEqual(len(r), 2)
- self.verify_profile(r[0].profile, conf['p1'])
- self.verify_profile(r[1].profile, conf['p2'])
+ self.verify_profile(r[0].profile, conf["p1"])
+ self.verify_profile(r[1].profile, conf["p2"])
def verify_id(self, api_id, cfg_id):
self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
- self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
+ self.assertEqual(bytes(api_id.data, "ascii"), cfg_id[1])
def verify_ts(self, api_ts, cfg_ts):
- self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
- self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
- self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
- self.assertEqual(api_ts.start_addr,
- ip_address(text_type(cfg_ts['start_addr'])))
- self.assertEqual(api_ts.end_addr,
- ip_address(text_type(cfg_ts['end_addr'])))
+ self.assertEqual(api_ts.protocol_id, cfg_ts["proto"])
+ self.assertEqual(api_ts.start_port, cfg_ts["start_port"])
+ self.assertEqual(api_ts.end_port, cfg_ts["end_port"])
+ self.assertEqual(api_ts.start_addr, ip_address(text_type(cfg_ts["start_addr"])))
+ self.assertEqual(api_ts.end_addr, ip_address(text_type(cfg_ts["end_addr"])))
def verify_responder(self, api_r, cfg_r):
- self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
- self.assertEqual(api_r.addr, ip_address(cfg_r['addr']))
+ self.assertEqual(api_r.sw_if_index, cfg_r["sw_if_index"])
+ self.assertEqual(api_r.addr, ip_address(cfg_r["addr"]))
def verify_transforms(self, api_ts, cfg_ts):
- self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
- self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
- self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
+ self.assertEqual(api_ts.crypto_alg, cfg_ts["crypto_alg"])
+ self.assertEqual(api_ts.crypto_key_size, cfg_ts["crypto_key_size"])
+ self.assertEqual(api_ts.integ_alg, cfg_ts["integ_alg"])
def verify_ike_transforms(self, api_ts, cfg_ts):
self.verify_transforms(api_ts, cfg_ts)
- self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
+ self.assertEqual(api_ts.dh_group, cfg_ts["dh_group"])
def verify_esp_transforms(self, api_ts, cfg_ts):
self.verify_transforms(api_ts, cfg_ts)
def verify_auth(self, api_auth, cfg_auth):
- self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
- self.assertEqual(api_auth.data, cfg_auth['data'])
- self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
+ self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth["method"]))
+ self.assertEqual(api_auth.data, cfg_auth["data"])
+ self.assertEqual(api_auth.data_len, len(cfg_auth["data"]))
def verify_lifetime_data(self, p, ld):
- self.assertEqual(p.lifetime, ld['lifetime'])
- self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
- self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
- self.assertEqual(p.handover, ld['handover'])
+ self.assertEqual(p.lifetime, ld["lifetime"])
+ self.assertEqual(p.lifetime_maxdata, ld["lifetime_maxdata"])
+ self.assertEqual(p.lifetime_jitter, ld["lifetime_jitter"])
+ self.assertEqual(p.handover, ld["handover"])
def verify_profile(self, ap, cp):
- self.assertEqual(ap.name, cp['name'])
- self.assertEqual(ap.udp_encap, cp['udp_encap'])
- self.verify_id(ap.loc_id, cp['loc_id'])
- self.verify_id(ap.rem_id, cp['rem_id'])
- self.verify_ts(ap.loc_ts, cp['loc_ts'])
- self.verify_ts(ap.rem_ts, cp['rem_ts'])
- self.verify_responder(ap.responder, cp['responder'])
- self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
- self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
- self.verify_auth(ap.auth, cp['auth'])
- natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled']
+ self.assertEqual(ap.name, cp["name"])
+ self.assertEqual(ap.udp_encap, cp["udp_encap"])
+ self.verify_id(ap.loc_id, cp["loc_id"])
+ self.verify_id(ap.rem_id, cp["rem_id"])
+ self.verify_ts(ap.loc_ts, cp["loc_ts"])
+ self.verify_ts(ap.rem_ts, cp["rem_ts"])
+ self.verify_responder(ap.responder, cp["responder"])
+ self.verify_ike_transforms(ap.ike_ts, cp["ike_ts"])
+ self.verify_esp_transforms(ap.esp_ts, cp["esp_ts"])
+ self.verify_auth(ap.auth, cp["auth"])
+ natt_dis = False if "natt_disabled" not in cp else cp["natt_disabled"]
self.assertTrue(natt_dis == ap.natt_disabled)
- if 'lifetime_data' in cp:
- self.verify_lifetime_data(ap, cp['lifetime_data'])
- self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
- if 'tun_itf' in cp:
- self.assertEqual(ap.tun_itf, cp['tun_itf'])
+ if "lifetime_data" in cp:
+ self.verify_lifetime_data(ap, cp["lifetime_data"])
+ self.assertEqual(ap.ipsec_over_udp_port, cp["ipsec_over_udp_port"])
+ if "tun_itf" in cp:
+ self.assertEqual(ap.tun_itf, cp["tun_itf"])
else:
- self.assertEqual(ap.tun_itf, 0xffffffff)
+ self.assertEqual(ap.tun_itf, 0xFFFFFFFF)
@tag_fixme_vpp_workers
class TestResponderBehindNAT(TemplateResponder, Ikev2Params):
- """ test responder - responder behind NAT """
+ """test responder - responder behind NAT"""
- IKE_NODE_SUFFIX = 'ip4-natt'
+ IKE_NODE_SUFFIX = "ip4-natt"
def config_tc(self):
- self.config_params({'r_natt': True})
+ self.config_params({"r_natt": True})
@tag_fixme_vpp_workers
class TestInitiatorNATT(TemplateInitiator, Ikev2Params):
- """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """
+ """test ikev2 initiator - NAT traversal (intitiator behind NAT)"""
def config_tc(self):
- self.config_params({
- 'i_natt': True,
- 'is_initiator': False, # seen from test case perspective
- # thus vpp is initiator
- 'responder': {'sw_if_index': self.pg0.sw_if_index,
- 'addr': self.pg0.remote_ip4},
- 'ike-crypto': ('AES-GCM-16ICV', 32),
- 'ike-integ': 'NULL',
- 'ike-dh': '3072MODPgr',
- 'ike_transforms': {
- 'crypto_alg': 20, # "aes-gcm-16"
- 'crypto_key_size': 256,
- 'dh_group': 15, # "modp-3072"
- },
- 'esp_transforms': {
- 'crypto_alg': 12, # "aes-cbc"
- 'crypto_key_size': 256,
- # "hmac-sha2-256-128"
- 'integ_alg': 12}})
+ self.config_params(
+ {
+ "i_natt": True,
+ "is_initiator": False, # seen from test case perspective
+ # thus vpp is initiator
+ "responder": {
+ "sw_if_index": self.pg0.sw_if_index,
+ "addr": self.pg0.remote_ip4,
+ },
+ "ike-crypto": ("AES-GCM-16ICV", 32),
+ "ike-integ": "NULL",
+ "ike-dh": "3072MODPgr",
+ "ike_transforms": {
+ "crypto_alg": 20, # "aes-gcm-16"
+ "crypto_key_size": 256,
+ "dh_group": 15, # "modp-3072"
+ },
+ "esp_transforms": {
+ "crypto_alg": 12, # "aes-cbc"
+ "crypto_key_size": 256,
+ # "hmac-sha2-256-128"
+ "integ_alg": 12,
+ },
+ }
+ )
@tag_fixme_vpp_workers
class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
- """ test ikev2 initiator - pre shared key auth """
+ """test ikev2 initiator - pre shared key auth"""
def config_tc(self):
- self.config_params({
- 'is_initiator': False, # seen from test case perspective
- # thus vpp is initiator
- 'ike-crypto': ('AES-GCM-16ICV', 32),
- 'ike-integ': 'NULL',
- 'ike-dh': '3072MODPgr',
- 'ike_transforms': {
- 'crypto_alg': 20, # "aes-gcm-16"
- 'crypto_key_size': 256,
- 'dh_group': 15, # "modp-3072"
- },
- 'esp_transforms': {
- 'crypto_alg': 12, # "aes-cbc"
- 'crypto_key_size': 256,
- # "hmac-sha2-256-128"
- 'integ_alg': 12},
- 'responder_hostname': {'hostname': 'vpp.responder.org',
- 'sw_if_index': self.pg0.sw_if_index}})
+ self.config_params(
+ {
+ "is_initiator": False, # seen from test case perspective
+ # thus vpp is initiator
+ "ike-crypto": ("AES-GCM-16ICV", 32),
+ "ike-integ": "NULL",
+ "ike-dh": "3072MODPgr",
+ "ike_transforms": {
+ "crypto_alg": 20, # "aes-gcm-16"
+ "crypto_key_size": 256,
+ "dh_group": 15, # "modp-3072"
+ },
+ "esp_transforms": {
+ "crypto_alg": 12, # "aes-cbc"
+ "crypto_key_size": 256,
+ # "hmac-sha2-256-128"
+ "integ_alg": 12,
+ },
+ "responder_hostname": {
+ "hostname": "vpp.responder.org",
+ "sw_if_index": self.pg0.sw_if_index,
+ },
+ }
+ )
@tag_fixme_vpp_workers
class TestInitiatorRequestWindowSize(TestInitiatorPsk):
- """ test initiator - request window size (1) """
+ """test initiator - request window size (1)"""
def rekey_respond(self, req, update_child_sa_data):
ih = self.get_ike_header(req)
@@ -1730,19 +2114,25 @@ class TestInitiatorRequestWindowSize(TestInitiatorPsk):
self.sa.child_sas[0].rspi = prop.SPI
self.sa.calc_child_keys()
- header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
- flags='Response', exch_type=36,
- id=ih.id, next_payload='Encrypted')
- resp = self.encrypt_ike_msg(header, sa, 'SA')
- packet = self.create_packet(self.pg0, resp, self.sa.sport,
- self.sa.dport, self.sa.natt, self.ip6)
+ header = ikev2.IKEv2(
+ init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi,
+ flags="Response",
+ exch_type=36,
+ id=ih.id,
+ next_payload="Encrypted",
+ )
+ resp = self.encrypt_ike_msg(header, sa, "SA")
+ packet = self.create_packet(
+ self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
+ )
self.send_and_assert_no_replies(self.pg0, packet)
def test_initiator(self):
super(TestInitiatorRequestWindowSize, self).test_initiator()
self.pg0.enable_capture()
self.pg_start()
- ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
+ ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
capture = self.pg0.get_capture(2)
@@ -1753,23 +2143,24 @@ class TestInitiatorRequestWindowSize(TestInitiatorPsk):
# verify that only the second request was accepted
self.verify_ike_sas()
+ self.verify_ike_sas_v2()
self.verify_ipsec_sas(is_rekey=True)
@tag_fixme_vpp_workers
class TestInitiatorRekey(TestInitiatorPsk):
- """ test ikev2 initiator - rekey """
+ """test ikev2 initiator - rekey"""
def rekey_from_initiator(self):
- ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
+ ispi = int.from_bytes(self.sa.child_sas[0].ispi, "little")
self.pg0.enable_capture()
self.pg_start()
self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
capture = self.pg0.get_capture(1)
ih = self.get_ike_header(capture[0])
self.assertEqual(ih.exch_type, 36) # CHILD_SA
- self.assertNotIn('Response', ih.flags)
- self.assertIn('Initiator', ih.flags)
+ self.assertNotIn("Response", ih.flags)
+ self.assertIn("Initiator", ih.flags)
plain = self.sa.hmac_and_decrypt(ih)
sa = ikev2.IKEv2_payload_SA(plain)
prop = sa[ikev2.IKEv2_payload_Proposal]
@@ -1779,61 +2170,75 @@ class TestInitiatorRekey(TestInitiatorPsk):
self.sa.child_sas[0].ispi = prop.SPI
self.sa.child_sas[0].rspi = prop.SPI
self.sa.calc_child_keys()
- header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
- flags='Response', exch_type=36,
- id=ih.id, next_payload='Encrypted')
- resp = self.encrypt_ike_msg(header, sa, 'SA')
- packet = self.create_packet(self.pg0, resp, self.sa.sport,
- self.sa.dport, self.sa.natt, self.ip6)
+ header = ikev2.IKEv2(
+ init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi,
+ flags="Response",
+ exch_type=36,
+ id=ih.id,
+ next_payload="Encrypted",
+ )
+ resp = self.encrypt_ike_msg(header, sa, "SA")
+ packet = self.create_packet(
+ self.pg0, resp, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
+ )
self.send_and_assert_no_replies(self.pg0, packet)
def test_initiator(self):
super(TestInitiatorRekey, self).test_initiator()
self.rekey_from_initiator()
self.verify_ike_sas()
+ self.verify_ike_sas_v2()
self.verify_ipsec_sas(is_rekey=True)
@tag_fixme_vpp_workers
class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
- """ test ikev2 initiator - delete IKE SA from responder """
+ """test ikev2 initiator - delete IKE SA from responder"""
def config_tc(self):
- self.config_params({
- 'del_sa_from_responder': True,
- 'is_initiator': False, # seen from test case perspective
- # thus vpp is initiator
- 'responder': {'sw_if_index': self.pg0.sw_if_index,
- 'addr': self.pg0.remote_ip4},
- 'ike-crypto': ('AES-GCM-16ICV', 32),
- 'ike-integ': 'NULL',
- 'ike-dh': '3072MODPgr',
- 'ike_transforms': {
- 'crypto_alg': 20, # "aes-gcm-16"
- 'crypto_key_size': 256,
- 'dh_group': 15, # "modp-3072"
- },
- 'esp_transforms': {
- 'crypto_alg': 12, # "aes-cbc"
- 'crypto_key_size': 256,
- # "hmac-sha2-256-128"
- 'integ_alg': 12}})
+ self.config_params(
+ {
+ "del_sa_from_responder": True,
+ "is_initiator": False, # seen from test case perspective
+ # thus vpp is initiator
+ "responder": {
+ "sw_if_index": self.pg0.sw_if_index,
+ "addr": self.pg0.remote_ip4,
+ },
+ "ike-crypto": ("AES-GCM-16ICV", 32),
+ "ike-integ": "NULL",
+ "ike-dh": "3072MODPgr",
+ "ike_transforms": {
+ "crypto_alg": 20, # "aes-gcm-16"
+ "crypto_key_size": 256,
+ "dh_group": 15, # "modp-3072"
+ },
+ "esp_transforms": {
+ "crypto_alg": 12, # "aes-cbc"
+ "crypto_key_size": 256,
+ # "hmac-sha2-256-128"
+ "integ_alg": 12,
+ },
+ "no_idr_in_auth": True,
+ }
+ )
@tag_fixme_vpp_workers
class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params):
- """ test ikev2 responder - initiator behind NAT """
+ """test ikev2 responder - initiator behind NAT"""
- IKE_NODE_SUFFIX = 'ip4-natt'
+ IKE_NODE_SUFFIX = "ip4-natt"
def config_tc(self):
- self.config_params(
- {'i_natt': True})
+ self.config_params({"i_natt": True})
@tag_fixme_vpp_workers
class TestResponderPsk(TemplateResponder, Ikev2Params):
- """ test ikev2 responder - pre shared key auth """
+ """test ikev2 responder - pre shared key auth"""
+
def config_tc(self):
self.config_params()
@@ -1843,8 +2248,9 @@ class TestResponderDpd(TestResponderPsk):
"""
Dead peer detection test
"""
+
def config_tc(self):
- self.config_params({'dpd_disabled': False})
+ self.config_params({"dpd_disabled": False})
def tearDown(self):
pass
@@ -1859,25 +2265,34 @@ class TestResponderDpd(TestResponderPsk):
ih = self.get_ike_header(capture[0])
self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
plain = self.sa.hmac_and_decrypt(ih)
- self.assertEqual(plain, b'')
+ self.assertEqual(plain, b"")
# wait for SA expiration
time.sleep(3)
ike_sas = self.vapi.ikev2_sa_dump()
self.assertEqual(len(ike_sas), 0)
+ ike_sas = self.vapi.ikev2_sa_v2_dump()
+ self.assertEqual(len(ike_sas), 0)
ipsec_sas = self.vapi.ipsec_sa_dump()
self.assertEqual(len(ipsec_sas), 0)
@tag_fixme_vpp_workers
class TestResponderRekey(TestResponderPsk):
- """ test ikev2 responder - rekey """
+ """test ikev2 responder - rekey"""
- def rekey_from_initiator(self):
- packet = self.create_rekey_request()
+ WITH_KEX = False
+
+ def send_rekey_from_initiator(self):
+ if self.WITH_KEX:
+ self.sa.generate_dh_data()
+ packet = self.create_rekey_request(kex=self.WITH_KEX)
self.pg0.add_stream(packet)
self.pg0.enable_capture()
self.pg_start()
capture = self.pg0.get_capture(1)
+ return capture
+
+ def process_rekey_response(self, capture):
ih = self.get_ike_header(capture[0])
plain = self.sa.hmac_and_decrypt(ih)
sa = ikev2.IKEv2_payload_SA(plain)
@@ -1885,26 +2300,123 @@ class TestResponderRekey(TestResponderPsk):
self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
# update new responder SPI
self.sa.child_sas[0].rspi = prop.SPI
+ if self.WITH_KEX:
+ self.sa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
+ self.sa.complete_dh_data()
+ self.sa.calc_child_keys(kex=self.WITH_KEX)
def test_responder(self):
super(TestResponderRekey, self).test_responder()
- self.rekey_from_initiator()
- self.sa.calc_child_keys()
+ self.process_rekey_response(self.send_rekey_from_initiator())
self.verify_ike_sas()
+ self.verify_ike_sas_v2()
self.verify_ipsec_sas(is_rekey=True)
- self.assert_counter(1, 'rekey_req', 'ip4')
+ self.assert_counter(1, "rekey_req", "ip4")
r = self.vapi.ikev2_sa_dump()
self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
+ r = self.vapi.ikev2_sa_v2_dump()
+ self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
+@tag_fixme_vpp_workers
+class TestResponderRekeyRepeat(TestResponderRekey):
+ """test ikev2 responder - rekey repeat"""
+
+ def test_responder(self):
+ super(TestResponderRekeyRepeat, self).test_responder()
+ # rekey request is not accepted until old IPsec SA is expired
+ capture = self.send_rekey_from_initiator()
+ ih = self.get_ike_header(capture[0])
+ plain = self.sa.hmac_and_decrypt(ih)
+ notify = ikev2.IKEv2_payload_Notify(plain)
+ self.assertEqual(notify.type, 43)
+ self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3)
+ # rekey request is accepted after old IPsec SA was expired
+ for _ in range(50):
+ if len(self.vapi.ipsec_sa_dump()) != 3:
+ break
+ time.sleep(0.2)
+ else:
+ self.fail("old IPsec SA not expired")
+ self.process_rekey_response(self.send_rekey_from_initiator())
+ self.verify_ike_sas()
+ self.verify_ike_sas_v2()
+ self.verify_ipsec_sas(sa_count=3)
+
+
+@tag_fixme_vpp_workers
+class TestResponderRekeyKEX(TestResponderRekey):
+ """test ikev2 responder - rekey with key exchange"""
+
+ WITH_KEX = True
+
+
+@tag_fixme_vpp_workers
+class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat):
+ """test ikev2 responder - rekey repeat with key exchange"""
+
+ WITH_KEX = True
+
+
+@tag_fixme_vpp_workers
+class TestResponderRekeySA(TestResponderPsk):
+ """test ikev2 responder - rekey IKE SA"""
+
+ def send_rekey_from_initiator(self, newsa):
+ packet = self.create_sa_rekey_request(
+ spi=newsa.ispi,
+ dh_pub_key=newsa.my_dh_pub_key,
+ nonce=newsa.i_nonce,
+ )
+ self.pg0.add_stream(packet)
+ self.pg0.enable_capture()
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ return capture
+
+ def process_rekey_response(self, newsa, capture):
+ ih = self.get_ike_header(capture[0])
+ plain = self.sa.hmac_and_decrypt(ih)
+ sa = ikev2.IKEv2_payload_SA(plain)
+ prop = sa[ikev2.IKEv2_payload_Proposal]
+ newsa.rspi = prop.SPI
+ newsa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
+ newsa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
+ newsa.complete_dh_data()
+ newsa.calc_keys(sk_d=self.sa.sk_d)
+ newsa.child_sas = self.sa.child_sas
+ self.sa.child_sas = []
+
+ def test_responder(self):
+ super(TestResponderRekeySA, self).test_responder()
+ newsa = self.sa.clone(self, spi=os.urandom(8))
+ newsa.generate_dh_data()
+ capture = self.send_rekey_from_initiator(newsa)
+ self.process_rekey_response(newsa, capture)
+ self.verify_ike_sas(is_rekey=True)
+ self.assert_counter(1, "rekey_req", "ip4")
+ r = self.vapi.ikev2_sa_dump()
+ self.assertEqual(r[1].sa.stats.n_rekey_req, 1)
+ self.initiate_del_sa_from_initiator()
+ self.sa = newsa
+ self.verify_ike_sas()
+
+
+@tag_fixme_ubuntu2204
+@tag_fixme_debian11
class TestResponderVrf(TestResponderPsk, Ikev2Params):
- """ test ikev2 responder - non-default table id """
+ """test ikev2 responder - non-default table id"""
@classmethod
def setUpClass(cls):
import scapy.contrib.ikev2 as _ikev2
- globals()['ikev2'] = _ikev2
+
+ globals()["ikev2"] = _ikev2
super(IkePeer, cls).setUpClass()
+ if (is_distro_ubuntu2204 == True or is_distro_debian11 == True) and not hasattr(
+ cls, "vpp"
+ ):
+ return
cls.create_pg_interfaces(range(1))
cls.vapi.cli("ip table add 1")
cls.vapi.cli("set interface ip table pg0 1")
@@ -1916,7 +2428,7 @@ class TestResponderVrf(TestResponderPsk, Ikev2Params):
i.resolve_ndp()
def config_tc(self):
- self.config_params({'dpd_disabled': False})
+ self.config_params({"dpd_disabled": False})
def test_responder(self):
self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1)
@@ -1927,51 +2439,66 @@ class TestResponderVrf(TestResponderPsk, Ikev2Params):
ih = self.get_ike_header(capture[0])
self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
plain = self.sa.hmac_and_decrypt(ih)
- self.assertEqual(plain, b'')
+ self.assertEqual(plain, b"")
@tag_fixme_vpp_workers
class TestResponderRsaSign(TemplateResponder, Ikev2Params):
- """ test ikev2 responder - cert based auth """
+ """test ikev2 responder - cert based auth"""
+
def config_tc(self):
- self.config_params({
- 'udp_encap': True,
- 'auth': 'rsa-sig',
- 'server-key': 'server-key.pem',
- 'client-key': 'client-key.pem',
- 'client-cert': 'client-cert.pem',
- 'server-cert': 'server-cert.pem'})
+ self.config_params(
+ {
+ "udp_encap": True,
+ "auth": "rsa-sig",
+ "server-key": "server-key.pem",
+ "client-key": "client-key.pem",
+ "client-cert": "client-cert.pem",
+ "server-cert": "server-cert.pem",
+ }
+ )
@tag_fixme_vpp_workers
-class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\
- (TemplateResponder, Ikev2Params):
+class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192(
+ TemplateResponder, Ikev2Params
+):
"""
IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192
"""
+
def config_tc(self):
- self.config_params({
- 'ike-crypto': ('AES-CBC', 16),
- 'ike-integ': 'SHA2-256-128',
- 'esp-crypto': ('AES-CBC', 24),
- 'esp-integ': 'SHA2-384-192',
- 'ike-dh': '2048MODPgr'})
+ self.config_params(
+ {
+ "ike-crypto": ("AES-CBC", 16),
+ "ike-integ": "SHA2-256-128",
+ "esp-crypto": ("AES-CBC", 24),
+ "esp-integ": "SHA2-384-192",
+ "ike-dh": "2048MODPgr",
+ "nonce": os.urandom(256),
+ "no_idr_in_auth": True,
+ }
+ )
@tag_fixme_vpp_workers
-class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
- (TemplateResponder, Ikev2Params):
-
+class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16(
+ TemplateResponder, Ikev2Params
+):
"""
IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16
"""
+
def config_tc(self):
- self.config_params({
- 'ike-crypto': ('AES-CBC', 32),
- 'ike-integ': 'SHA2-256-128',
- 'esp-crypto': ('AES-GCM-16ICV', 32),
- 'esp-integ': 'NULL',
- 'ike-dh': '3072MODPgr'})
+ self.config_params(
+ {
+ "ike-crypto": ("AES-CBC", 32),
+ "ike-integ": "SHA2-256-128",
+ "esp-crypto": ("AES-GCM-16ICV", 32),
+ "esp-integ": "NULL",
+ "ike-dh": "3072MODPgr",
+ }
+ )
@tag_fixme_vpp_workers
@@ -1980,20 +2507,21 @@ class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
IKE:AES_GCM_16_256
"""
- IKE_NODE_SUFFIX = 'ip6'
+ IKE_NODE_SUFFIX = "ip6"
def config_tc(self):
- self.config_params({
- 'del_sa_from_responder': True,
- 'ip6': True,
- 'natt': True,
- 'ike-crypto': ('AES-GCM-16ICV', 32),
- 'ike-integ': 'NULL',
- 'ike-dh': '2048MODPgr',
- 'loc_ts': {'start_addr': 'ab:cd::0',
- 'end_addr': 'ab:cd::10'},
- 'rem_ts': {'start_addr': '11::0',
- 'end_addr': '11::100'}})
+ self.config_params(
+ {
+ "del_sa_from_responder": True,
+ "ip6": True,
+ "natt": True,
+ "ike-crypto": ("AES-GCM-16ICV", 32),
+ "ike-integ": "NULL",
+ "ike-dh": "2048MODPgr",
+ "loc_ts": {"start_addr": "ab:cd::0", "end_addr": "ab:cd::10"},
+ "rem_ts": {"start_addr": "11::0", "end_addr": "11::100"},
+ }
+ )
@tag_fixme_vpp_workers
@@ -2011,10 +2539,12 @@ class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
ih = self.get_ike_header(capture[0])
self.assertEqual(ih.id, self.sa.msg_id)
plain = self.sa.hmac_and_decrypt(ih)
- self.assertEqual(plain, b'')
- self.assert_counter(1, 'keepalive', 'ip4')
+ self.assertEqual(plain, b"")
+ self.assert_counter(1, "keepalive", "ip4")
r = self.vapi.ikev2_sa_dump()
self.assertEqual(1, r[0].sa.stats.n_keepalives)
+ r = self.vapi.ikev2_sa_v2_dump()
+ self.assertEqual(1, r[0].sa.stats.n_keepalives)
def test_initiator(self):
super(TestInitiatorKeepaliveMsg, self).test_initiator()
@@ -2022,7 +2552,7 @@ class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
class TestMalformedMessages(TemplateResponder, Ikev2Params):
- """ malformed packet test """
+ """malformed packet test"""
def tearDown(self):
pass
@@ -2031,23 +2561,26 @@ class TestMalformedMessages(TemplateResponder, Ikev2Params):
self.config_params()
def create_ike_init_msg(self, length=None, payload=None):
- msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8,
- flags='Initiator', exch_type='IKE_SA_INIT')
+ msg = ikev2.IKEv2(
+ length=length,
+ init_SPI="\x11" * 8,
+ flags="Initiator",
+ exch_type="IKE_SA_INIT",
+ )
if payload is not None:
msg /= payload
- return self.create_packet(self.pg0, msg, self.sa.sport,
- self.sa.dport)
+ return self.create_packet(self.pg0, msg, self.sa.sport, self.sa.dport)
def verify_bad_packet_length(self):
- ike_msg = self.create_ike_init_msg(length=0xdead)
+ ike_msg = self.create_ike_init_msg(length=0xDEAD)
self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
- self.assert_counter(self.pkt_count, 'bad_length')
+ self.assert_counter(self.pkt_count, "bad_length")
def verify_bad_sa_payload_length(self):
- p = ikev2.IKEv2_payload_SA(length=0xdead)
+ p = ikev2.IKEv2_payload_SA(length=0xDEAD)
ike_msg = self.create_ike_init_msg(payload=p)
self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count)
- self.assert_counter(self.pkt_count, 'malformed_packet')
+ self.assert_counter(self.pkt_count, "malformed_packet")
def test_responder(self):
self.pkt_count = 254
@@ -2055,5 +2588,5 @@ class TestMalformedMessages(TemplateResponder, Ikev2Params):
self.verify_bad_sa_payload_length()
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)