diff options
author | Dave Wallace <dwallacelf@gmail.com> | 2021-05-12 21:43:59 -0400 |
---|---|---|
committer | Damjan Marion <dmarion@me.com> | 2021-05-13 09:33:06 +0000 |
commit | eddd8e3588561039985b27edf059db6033bfdfab (patch) | |
tree | 44896887d6070853ea77a18cae218f5d4ef4d93a /src/plugins/ikev2/test | |
parent | fd77f8c00c8e9d528d91a9cefae1878e383582ed (diff) |
tests: move test source to vpp/test
- Generate copyright year and version
instead of using hard-coded data
Type: refactor
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Change-Id: I6058f5025323b3aa483f5df4a2c4371e27b5914e
Diffstat (limited to 'src/plugins/ikev2/test')
-rw-r--r-- | src/plugins/ikev2/test/test_ikev2.py | 2059 | ||||
-rw-r--r-- | src/plugins/ikev2/test/vpp_ikev2.py | 179 |
2 files changed, 0 insertions, 2238 deletions
diff --git a/src/plugins/ikev2/test/test_ikev2.py b/src/plugins/ikev2/test/test_ikev2.py deleted file mode 100644 index 558e8a02f87..00000000000 --- a/src/plugins/ikev2/test/test_ikev2.py +++ /dev/null @@ -1,2059 +0,0 @@ -import os -import time -from socket import inet_pton -from cryptography import x509 -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import hashes, hmac -from cryptography.hazmat.primitives.asymmetric import dh, padding -from cryptography.hazmat.primitives.serialization import load_pem_private_key -from cryptography.hazmat.primitives.ciphers import ( - Cipher, - algorithms, - modes, -) -from ipaddress import IPv4Address, IPv6Address, ip_address -import unittest -from scapy.layers.ipsec import ESP -from scapy.layers.inet import IP, UDP, Ether -from scapy.layers.inet6 import IPv6 -from scapy.packet import raw, Raw -from scapy.utils import long_converter -from framework import tag_fixme_vpp_workers -from framework import VppTestCase, VppTestRunner -from vpp_ikev2 import Profile, IDType, AuthMethod -from vpp_papi import VppEnum - -try: - text_type = unicode -except NameError: - text_type = str - -KEY_PAD = b"Key Pad for IKEv2" -SALT_SIZE = 4 -GCM_ICV_SIZE = 16 -GCM_IV_SIZE = 8 - - -# defined in rfc3526 -# tuple structure is (p, g, key_len) -DH = { - '2048MODPgr': (long_converter(""" - FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1 - 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD - EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245 - E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED - EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D - C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F - 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D - 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B - E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9 - DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510 - 15728E5A 8AACAA68 FFFFFFFF FFFFFFFF"""), 2, 256), - - '3072MODPgr': (long_converter(""" - FFFFFFFF FFFFFFFF C90FDAA2 2168C234 C4C6628B 80DC1CD1 - 29024E08 8A67CC74 020BBEA6 3B139B22 514A0879 8E3404DD - EF9519B3 CD3A431B 302B0A6D F25F1437 4FE1356D 6D51C245 - E485B576 625E7EC6 F44C42E9 A637ED6B 0BFF5CB6 F406B7ED - EE386BFB 5A899FA5 AE9F2411 7C4B1FE6 49286651 ECE45B3D - C2007CB8 A163BF05 98DA4836 1C55D39A 69163FA8 FD24CF5F - 83655D23 DCA3AD96 1C62F356 208552BB 9ED52907 7096966D - 670C354E 4ABC9804 F1746C08 CA18217C 32905E46 2E36CE3B - E39E772C 180E8603 9B2783A2 EC07A28F B5C55DF0 6F4C52C9 - DE2BCBF6 95581718 3995497C EA956AE5 15D22618 98FA0510 - 15728E5A 8AAAC42D AD33170D 04507A33 A85521AB DF1CBA64 - ECFB8504 58DBEF0A 8AEA7157 5D060C7D B3970F85 A6E1E4C7 - ABF5AE8C DB0933D7 1E8C94E0 4A25619D CEE3D226 1AD2EE6B - F12FFA06 D98A0864 D8760273 3EC86A64 521F2B18 177B200C - BBE11757 7A615D6C 770988C0 BAD946E2 08E24FA0 74E5AB31 - 43DB5BFC E0FD108E 4B82D120 A93AD2CA FFFFFFFF FFFFFFFF"""), 2, 384) -} - - -class CryptoAlgo(object): - def __init__(self, name, cipher, mode): - self.name = name - self.cipher = cipher - self.mode = mode - if self.cipher is not None: - self.bs = self.cipher.block_size // 8 - - if self.name == 'AES-GCM-16ICV': - self.iv_len = GCM_IV_SIZE - else: - self.iv_len = self.bs - - def encrypt(self, data, key, aad=None): - iv = os.urandom(self.iv_len) - if aad is None: - encryptor = Cipher(self.cipher(key), self.mode(iv), - default_backend()).encryptor() - return iv + encryptor.update(data) + encryptor.finalize() - else: - salt = key[-SALT_SIZE:] - nonce = salt + iv - encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce), - default_backend()).encryptor() - encryptor.authenticate_additional_data(aad) - data = encryptor.update(data) + encryptor.finalize() - data += encryptor.tag[:GCM_ICV_SIZE] - return iv + data - - def decrypt(self, data, key, aad=None, icv=None): - if aad is None: - iv = data[:self.iv_len] - ct = data[self.iv_len:] - decryptor = Cipher(algorithms.AES(key), - self.mode(iv), - default_backend()).decryptor() - return decryptor.update(ct) + decryptor.finalize() - else: - salt = key[-SALT_SIZE:] - nonce = salt + data[:GCM_IV_SIZE] - ct = data[GCM_IV_SIZE:] - key = key[:-SALT_SIZE] - decryptor = Cipher(algorithms.AES(key), - self.mode(nonce, icv, len(icv)), - default_backend()).decryptor() - decryptor.authenticate_additional_data(aad) - return decryptor.update(ct) + decryptor.finalize() - - def pad(self, data): - pad_len = (len(data) // self.bs + 1) * self.bs - len(data) - data = data + b'\x00' * (pad_len - 1) - return data + bytes([pad_len - 1]) - - -class AuthAlgo(object): - def __init__(self, name, mac, mod, key_len, trunc_len=None): - self.name = name - self.mac = mac - self.mod = mod - self.key_len = key_len - self.trunc_len = trunc_len or key_len - - -CRYPTO_ALGOS = { - 'NULL': CryptoAlgo('NULL', cipher=None, mode=None), - 'AES-CBC': CryptoAlgo('AES-CBC', cipher=algorithms.AES, mode=modes.CBC), - 'AES-GCM-16ICV': CryptoAlgo('AES-GCM-16ICV', cipher=algorithms.AES, - mode=modes.GCM), -} - -AUTH_ALGOS = { - 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0), - 'HMAC-SHA1-96': AuthAlgo('HMAC-SHA1-96', hmac.HMAC, hashes.SHA1, 20, 12), - 'SHA2-256-128': AuthAlgo('SHA2-256-128', hmac.HMAC, hashes.SHA256, 32, 16), - 'SHA2-384-192': AuthAlgo('SHA2-384-192', hmac.HMAC, hashes.SHA256, 48, 24), - 'SHA2-512-256': AuthAlgo('SHA2-512-256', hmac.HMAC, hashes.SHA256, 64, 32), -} - -PRF_ALGOS = { - 'NULL': AuthAlgo('NULL', mac=None, mod=None, key_len=0, trunc_len=0), - 'PRF_HMAC_SHA2_256': AuthAlgo('PRF_HMAC_SHA2_256', hmac.HMAC, - hashes.SHA256, 32), -} - -CRYPTO_IDS = { - 12: 'AES-CBC', - 20: 'AES-GCM-16ICV', -} - -INTEG_IDS = { - 2: 'HMAC-SHA1-96', - 12: 'SHA2-256-128', - 13: 'SHA2-384-192', - 14: 'SHA2-512-256', -} - - -class IKEv2ChildSA(object): - def __init__(self, local_ts, remote_ts, is_initiator): - spi = os.urandom(4) - if is_initiator: - self.ispi = spi - self.rspi = None - else: - self.rspi = spi - self.ispi = None - self.local_ts = local_ts - self.remote_ts = remote_ts - - -class IKEv2SA(object): - def __init__(self, test, is_initiator=True, i_id=None, r_id=None, - spi=b'\x01\x02\x03\x04\x05\x06\x07\x08', id_type='fqdn', - nonce=None, auth_data=None, local_ts=None, remote_ts=None, - auth_method='shared-key', priv_key=None, i_natt=False, - r_natt=False, udp_encap=False): - self.udp_encap = udp_encap - self.i_natt = i_natt - self.r_natt = r_natt - if i_natt or r_natt: - self.sport = 4500 - self.dport = 4500 - else: - self.sport = 500 - self.dport = 500 - self.msg_id = 0 - self.dh_params = None - self.test = test - self.priv_key = priv_key - self.is_initiator = is_initiator - nonce = nonce or os.urandom(32) - self.auth_data = auth_data - self.i_id = i_id - self.r_id = r_id - if isinstance(id_type, str): - self.id_type = IDType.value(id_type) - else: - self.id_type = id_type - self.auth_method = auth_method - if self.is_initiator: - self.rspi = 8 * b'\x00' - self.ispi = spi - self.i_nonce = nonce - else: - self.rspi = spi - self.ispi = 8 * b'\x00' - self.r_nonce = nonce - self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, - self.is_initiator)] - - def new_msg_id(self): - self.msg_id += 1 - return self.msg_id - - @property - def my_dh_pub_key(self): - if self.is_initiator: - return self.i_dh_data - return self.r_dh_data - - @property - def peer_dh_pub_key(self): - if self.is_initiator: - return self.r_dh_data - return self.i_dh_data - - @property - def natt(self): - return self.i_natt or self.r_natt - - def compute_secret(self): - priv = self.dh_private_key - peer = self.peer_dh_pub_key - p, g, l = self.ike_group - return pow(int.from_bytes(peer, 'big'), - int.from_bytes(priv, 'big'), p).to_bytes(l, 'big') - - def generate_dh_data(self): - # generate DH keys - if self.ike_dh not in DH: - raise NotImplementedError('%s not in DH group' % self.ike_dh) - - if self.dh_params is None: - dhg = DH[self.ike_dh] - pn = dh.DHParameterNumbers(dhg[0], dhg[1]) - self.dh_params = pn.parameters(default_backend()) - - priv = self.dh_params.generate_private_key() - pub = priv.public_key() - x = priv.private_numbers().x - self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big') - y = pub.public_numbers().y - - if self.is_initiator: - self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big') - else: - self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big') - - def complete_dh_data(self): - self.dh_shared_secret = self.compute_secret() - - def calc_child_keys(self): - prf = self.ike_prf_alg.mod() - s = self.i_nonce + self.r_nonce - c = self.child_sas[0] - - encr_key_len = self.esp_crypto_key_len - integ_key_len = self.esp_integ_alg.key_len - salt_len = 0 if integ_key_len else 4 - - l = (integ_key_len * 2 + - encr_key_len * 2 + - salt_len * 2) - keymat = self.calc_prfplus(prf, self.sk_d, s, l) - - pos = 0 - c.sk_ei = keymat[pos:pos+encr_key_len] - pos += encr_key_len - - if integ_key_len: - c.sk_ai = keymat[pos:pos+integ_key_len] - pos += integ_key_len - else: - c.salt_ei = keymat[pos:pos+salt_len] - pos += salt_len - - c.sk_er = keymat[pos:pos+encr_key_len] - pos += encr_key_len - - if integ_key_len: - c.sk_ar = keymat[pos:pos+integ_key_len] - pos += integ_key_len - else: - c.salt_er = keymat[pos:pos+salt_len] - pos += salt_len - - def calc_prfplus(self, prf, key, seed, length): - r = b'' - t = None - x = 1 - while len(r) < length and x < 255: - if t is not None: - s = t - else: - s = b'' - s = s + seed + bytes([x]) - t = self.calc_prf(prf, key, s) - r = r + t - x = x + 1 - - if x == 255: - return None - return r - - def calc_prf(self, prf, key, data): - h = self.ike_prf_alg.mac(key, prf, backend=default_backend()) - h.update(data) - return h.finalize() - - def calc_keys(self): - prf = self.ike_prf_alg.mod() - # SKEYSEED = prf(Ni | Nr, g^ir) - s = self.i_nonce + self.r_nonce - self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret) - - # calculate S = Ni | Nr | SPIi SPIr - s = s + self.ispi + self.rspi - - prf_key_trunc = self.ike_prf_alg.trunc_len - encr_key_len = self.ike_crypto_key_len - tr_prf_key_len = self.ike_prf_alg.key_len - integ_key_len = self.ike_integ_alg.key_len - if integ_key_len == 0: - salt_size = 4 - else: - salt_size = 0 - - l = (prf_key_trunc + - integ_key_len * 2 + - encr_key_len * 2 + - tr_prf_key_len * 2 + - salt_size * 2) - keymat = self.calc_prfplus(prf, self.skeyseed, s, l) - - pos = 0 - self.sk_d = keymat[:pos+prf_key_trunc] - pos += prf_key_trunc - - self.sk_ai = keymat[pos:pos+integ_key_len] - pos += integ_key_len - self.sk_ar = keymat[pos:pos+integ_key_len] - pos += integ_key_len - - self.sk_ei = keymat[pos:pos+encr_key_len + salt_size] - pos += encr_key_len + salt_size - self.sk_er = keymat[pos:pos+encr_key_len + salt_size] - pos += encr_key_len + salt_size - - self.sk_pi = keymat[pos:pos+tr_prf_key_len] - pos += tr_prf_key_len - self.sk_pr = keymat[pos:pos+tr_prf_key_len] - - def generate_authmsg(self, prf, packet): - if self.is_initiator: - id = self.i_id - nonce = self.r_nonce - key = self.sk_pi - else: - id = self.r_id - nonce = self.i_nonce - key = self.sk_pr - data = bytes([self.id_type, 0, 0, 0]) + id - id_hash = self.calc_prf(prf, key, data) - return packet + nonce + id_hash - - def auth_init(self): - prf = self.ike_prf_alg.mod() - if self.is_initiator: - packet = self.init_req_packet - else: - packet = self.init_resp_packet - authmsg = self.generate_authmsg(prf, raw(packet)) - if self.auth_method == 'shared-key': - psk = self.calc_prf(prf, self.auth_data, KEY_PAD) - self.auth_data = self.calc_prf(prf, psk, authmsg) - elif self.auth_method == 'rsa-sig': - self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(), - hashes.SHA1()) - else: - raise TypeError('unknown auth method type!') - - def encrypt(self, data, aad=None): - data = self.ike_crypto_alg.pad(data) - return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad) - - @property - def peer_authkey(self): - if self.is_initiator: - return self.sk_ar - return self.sk_ai - - @property - def my_authkey(self): - if self.is_initiator: - return self.sk_ai - return self.sk_ar - - @property - def my_cryptokey(self): - if self.is_initiator: - return self.sk_ei - return self.sk_er - - @property - def peer_cryptokey(self): - if self.is_initiator: - return self.sk_er - return self.sk_ei - - def concat(self, alg, key_len): - return alg + '-' + str(key_len * 8) - - @property - def vpp_ike_cypto_alg(self): - return self.concat(self.ike_crypto, self.ike_crypto_key_len) - - @property - def vpp_esp_cypto_alg(self): - return self.concat(self.esp_crypto, self.esp_crypto_key_len) - - def verify_hmac(self, ikemsg): - integ_trunc = self.ike_integ_alg.trunc_len - exp_hmac = ikemsg[-integ_trunc:] - data = ikemsg[:-integ_trunc] - computed_hmac = self.compute_hmac(self.ike_integ_alg.mod(), - self.peer_authkey, data) - self.test.assertEqual(computed_hmac[:integ_trunc], exp_hmac) - - def compute_hmac(self, integ, key, data): - h = self.ike_integ_alg.mac(key, integ, backend=default_backend()) - h.update(data) - return h.finalize() - - def decrypt(self, data, aad=None, icv=None): - return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv) - - def hmac_and_decrypt(self, ike): - ep = ike[ikev2.IKEv2_payload_Encrypted] - if self.ike_crypto == 'AES-GCM-16ICV': - aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2()) - ct = ep.load[:-GCM_ICV_SIZE] - tag = ep.load[-GCM_ICV_SIZE:] - plain = self.decrypt(ct, raw(ike)[:aad_len], tag) - else: - self.verify_hmac(raw(ike)) - integ_trunc = self.ike_integ_alg.trunc_len - - # remove ICV and decrypt payload - ct = ep.load[:-integ_trunc] - plain = self.decrypt(ct) - # remove padding - pad_len = plain[-1] - return plain[:-pad_len - 1] - - def build_ts_addr(self, ts, version): - return {'starting_address_v' + version: ts['start_addr'], - 'ending_address_v' + version: ts['end_addr']} - - def generate_ts(self, is_ip4): - c = self.child_sas[0] - ts_data = {'IP_protocol_ID': 0, - 'start_port': 0, - 'end_port': 0xffff} - if is_ip4: - ts_data.update(self.build_ts_addr(c.local_ts, '4')) - ts1 = ikev2.IPv4TrafficSelector(**ts_data) - ts_data.update(self.build_ts_addr(c.remote_ts, '4')) - ts2 = ikev2.IPv4TrafficSelector(**ts_data) - else: - ts_data.update(self.build_ts_addr(c.local_ts, '6')) - ts1 = ikev2.IPv6TrafficSelector(**ts_data) - ts_data.update(self.build_ts_addr(c.remote_ts, '6')) - ts2 = ikev2.IPv6TrafficSelector(**ts_data) - - if self.is_initiator: - return ([ts1], [ts2]) - return ([ts2], [ts1]) - - def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh): - if crypto not in CRYPTO_ALGOS: - raise TypeError('unsupported encryption algo %r' % crypto) - self.ike_crypto = crypto - self.ike_crypto_alg = CRYPTO_ALGOS[crypto] - self.ike_crypto_key_len = crypto_key_len - - if integ not in AUTH_ALGOS: - raise TypeError('unsupported auth algo %r' % integ) - self.ike_integ = None if integ == 'NULL' else integ - self.ike_integ_alg = AUTH_ALGOS[integ] - - if prf not in PRF_ALGOS: - raise TypeError('unsupported prf algo %r' % prf) - self.ike_prf = prf - self.ike_prf_alg = PRF_ALGOS[prf] - self.ike_dh = dh - self.ike_group = DH[self.ike_dh] - - def set_esp_props(self, crypto, crypto_key_len, integ): - self.esp_crypto_key_len = crypto_key_len - if crypto not in CRYPTO_ALGOS: - raise TypeError('unsupported encryption algo %r' % crypto) - self.esp_crypto = crypto - self.esp_crypto_alg = CRYPTO_ALGOS[crypto] - - if integ not in AUTH_ALGOS: - raise TypeError('unsupported auth algo %r' % integ) - self.esp_integ = None if integ == 'NULL' else integ - self.esp_integ_alg = AUTH_ALGOS[integ] - - def crypto_attr(self, key_len): - if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']: - return (0x800e << 16 | key_len << 3, 12) - else: - raise Exception('unsupported attribute type') - - def ike_crypto_attr(self): - return self.crypto_attr(self.ike_crypto_key_len) - - def esp_crypto_attr(self): - return self.crypto_attr(self.esp_crypto_key_len) - - def compute_nat_sha1(self, ip, port, rspi=None): - if rspi is None: - rspi = self.rspi - data = self.ispi + rspi + ip + (port).to_bytes(2, 'big') - digest = hashes.Hash(hashes.SHA1(), backend=default_backend()) - digest.update(data) - return digest.finalize() - - -class IkePeer(VppTestCase): - """ common class for initiator and responder """ - - @classmethod - def setUpClass(cls): - import scapy.contrib.ikev2 as _ikev2 - globals()['ikev2'] = _ikev2 - super(IkePeer, cls).setUpClass() - cls.create_pg_interfaces(range(2)) - for i in cls.pg_interfaces: - i.admin_up() - i.config_ip4() - i.resolve_arp() - i.config_ip6() - i.resolve_ndp() - - @classmethod - def tearDownClass(cls): - super(IkePeer, cls).tearDownClass() - - def tearDown(self): - super(IkePeer, self).tearDown() - if self.del_sa_from_responder: - self.initiate_del_sa_from_responder() - else: - self.initiate_del_sa_from_initiator() - r = self.vapi.ikev2_sa_dump() - self.assertEqual(len(r), 0) - sas = self.vapi.ipsec_sa_dump() - self.assertEqual(len(sas), 0) - self.p.remove_vpp_config() - self.assertIsNone(self.p.query_vpp_config()) - - def setUp(self): - super(IkePeer, self).setUp() - self.config_tc() - self.p.add_vpp_config() - self.assertIsNotNone(self.p.query_vpp_config()) - if self.sa.is_initiator: - self.sa.generate_dh_data() - self.vapi.cli('ikev2 set logging level 4') - self.vapi.cli('event-lo clear') - - def assert_counter(self, count, name, version='ip4'): - node_name = '/err/ikev2-%s/' % version + name - self.assertEqual(count, self.statistics.get_err_counter(node_name)) - - def create_rekey_request(self): - sa, first_payload = self.generate_auth_payload(is_rekey=True) - header = ikev2.IKEv2( - init_SPI=self.sa.ispi, - resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(), - flags='Initiator', exch_type='CREATE_CHILD_SA') - - ike_msg = self.encrypt_ike_msg(header, sa, first_payload) - return self.create_packet(self.pg0, ike_msg, self.sa.sport, - self.sa.dport, self.sa.natt, self.ip6) - - def create_empty_request(self): - header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, - id=self.sa.new_msg_id(), flags='Initiator', - exch_type='INFORMATIONAL', - next_payload='Encrypted') - - msg = self.encrypt_ike_msg(header, b'', None) - return self.create_packet(self.pg0, msg, self.sa.sport, - self.sa.dport, self.sa.natt, self.ip6) - - def create_packet(self, src_if, msg, sport=500, dport=500, natt=False, - use_ip6=False): - if use_ip6: - src_ip = src_if.remote_ip6 - dst_ip = src_if.local_ip6 - ip_layer = IPv6 - else: - src_ip = src_if.remote_ip4 - dst_ip = src_if.local_ip4 - ip_layer = IP - res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / - ip_layer(src=src_ip, dst=dst_ip) / - UDP(sport=sport, dport=dport)) - if natt: - # insert non ESP marker - res = res / Raw(b'\x00' * 4) - return res / msg - - def verify_udp(self, udp): - self.assertEqual(udp.sport, self.sa.sport) - self.assertEqual(udp.dport, self.sa.dport) - - def get_ike_header(self, packet): - try: - ih = packet[ikev2.IKEv2] - ih = self.verify_and_remove_non_esp_marker(ih) - except IndexError as e: - # this is a workaround for getting IKEv2 layer as both ikev2 and - # ipsec register for port 4500 - esp = packet[ESP] - ih = self.verify_and_remove_non_esp_marker(esp) - self.assertEqual(ih.version, 0x20) - self.assertNotIn('Version', ih.flags) - return ih - - def verify_and_remove_non_esp_marker(self, packet): - if self.sa.natt: - # if we are in nat traversal mode check for non esp marker - # and remove it - data = raw(packet) - self.assertEqual(data[:4], b'\x00' * 4) - return ikev2.IKEv2(data[4:]) - else: - return packet - - def encrypt_ike_msg(self, header, plain, first_payload): - if self.sa.ike_crypto == 'AES-GCM-16ICV': - data = self.sa.ike_crypto_alg.pad(raw(plain)) - plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\ - len(ikev2.IKEv2_payload_Encrypted()) - tlen = plen + len(ikev2.IKEv2()) - - # prepare aad data - sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload, - length=plen) - header.length = tlen - res = header / sk_p - encr = self.sa.encrypt(raw(plain), raw(res)) - sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload, - length=plen, load=encr) - res = header / sk_p - else: - encr = self.sa.encrypt(raw(plain)) - trunc_len = self.sa.ike_integ_alg.trunc_len - plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len - tlen = plen + len(ikev2.IKEv2()) - - sk_p = ikev2.IKEv2_payload_Encrypted(next_payload=first_payload, - length=plen, load=encr) - header.length = tlen - res = header / sk_p - - integ_data = raw(res) - hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(), - self.sa.my_authkey, integ_data) - res = res / Raw(hmac_data[:trunc_len]) - assert(len(res) == tlen) - return res - - def verify_udp_encap(self, ipsec_sa): - e = VppEnum.vl_api_ipsec_sad_flags_t - if self.sa.udp_encap or self.sa.natt: - self.assertIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags) - else: - self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags) - - def verify_ipsec_sas(self, is_rekey=False): - sas = self.vapi.ipsec_sa_dump() - if is_rekey: - # after rekey there is a short period of time in which old - # inbound SA is still present - sa_count = 3 - else: - sa_count = 2 - self.assertEqual(len(sas), sa_count) - if self.sa.is_initiator: - if is_rekey: - sa0 = sas[0].entry - sa1 = sas[2].entry - else: - sa0 = sas[0].entry - sa1 = sas[1].entry - else: - if is_rekey: - sa0 = sas[2].entry - sa1 = sas[0].entry - else: - sa1 = sas[0].entry - sa0 = sas[1].entry - - c = self.sa.child_sas[0] - - self.verify_udp_encap(sa0) - self.verify_udp_encap(sa1) - vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg] - self.assertEqual(sa0.crypto_algorithm, vpp_crypto_alg) - self.assertEqual(sa1.crypto_algorithm, vpp_crypto_alg) - - if self.sa.esp_integ is None: - vpp_integ_alg = 0 - else: - vpp_integ_alg = self.vpp_enums[self.sa.esp_integ] - self.assertEqual(sa0.integrity_algorithm, vpp_integ_alg) - self.assertEqual(sa1.integrity_algorithm, vpp_integ_alg) - - # verify crypto keys - self.assertEqual(sa0.crypto_key.length, len(c.sk_er)) - self.assertEqual(sa1.crypto_key.length, len(c.sk_ei)) - self.assertEqual(sa0.crypto_key.data[:len(c.sk_er)], c.sk_er) - self.assertEqual(sa1.crypto_key.data[:len(c.sk_ei)], c.sk_ei) - - # verify integ keys - if vpp_integ_alg: - self.assertEqual(sa0.integrity_key.length, len(c.sk_ar)) - self.assertEqual(sa1.integrity_key.length, len(c.sk_ai)) - self.assertEqual(sa0.integrity_key.data[:len(c.sk_ar)], c.sk_ar) - self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai) - else: - self.assertEqual(sa0.salt.to_bytes(4, 'little'), c.salt_er) - self.assertEqual(sa1.salt.to_bytes(4, 'little'), c.salt_ei) - - def verify_keymat(self, api_keys, keys, name): - km = getattr(keys, name) - api_km = getattr(api_keys, name) - api_km_len = getattr(api_keys, name + '_len') - self.assertEqual(len(km), api_km_len) - self.assertEqual(km, api_km[:api_km_len]) - - def verify_id(self, api_id, exp_id): - self.assertEqual(api_id.type, IDType.value(exp_id.type)) - self.assertEqual(api_id.data_len, exp_id.data_len) - self.assertEqual(bytes(api_id.data, 'ascii'), exp_id.type) - - def verify_ike_sas(self): - r = self.vapi.ikev2_sa_dump() - self.assertEqual(len(r), 1) - sa = r[0].sa - self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big')) - self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big')) - if self.ip6: - if self.sa.is_initiator: - self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6)) - self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6)) - else: - self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6)) - self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6)) - else: - if self.sa.is_initiator: - self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4)) - self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4)) - else: - self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4)) - self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4)) - self.verify_keymat(sa.keys, self.sa, 'sk_d') - self.verify_keymat(sa.keys, self.sa, 'sk_ai') - self.verify_keymat(sa.keys, self.sa, 'sk_ar') - self.verify_keymat(sa.keys, self.sa, 'sk_ei') - self.verify_keymat(sa.keys, self.sa, 'sk_er') - self.verify_keymat(sa.keys, self.sa, 'sk_pi') - self.verify_keymat(sa.keys, self.sa, 'sk_pr') - - self.assertEqual(sa.i_id.type, self.sa.id_type) - self.assertEqual(sa.r_id.type, self.sa.id_type) - self.assertEqual(sa.i_id.data_len, len(self.sa.i_id)) - self.assertEqual(sa.r_id.data_len, len(self.sa.r_id)) - self.assertEqual(bytes(sa.i_id.data, 'ascii'), self.sa.i_id) - self.assertEqual(bytes(sa.r_id.data, 'ascii'), self.sa.r_id) - - r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index) - self.assertEqual(len(r), 1) - csa = r[0].child_sa - self.assertEqual(csa.sa_index, sa.sa_index) - c = self.sa.child_sas[0] - if hasattr(c, 'sk_ai'): - self.verify_keymat(csa.keys, c, 'sk_ai') - self.verify_keymat(csa.keys, c, 'sk_ar') - self.verify_keymat(csa.keys, c, 'sk_ei') - self.verify_keymat(csa.keys, c, 'sk_er') - self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi) - self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi) - - tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4) - tsi = tsi[0] - tsr = tsr[0] - r = self.vapi.ikev2_traffic_selector_dump( - is_initiator=True, sa_index=sa.sa_index, - child_sa_index=csa.child_sa_index) - self.assertEqual(len(r), 1) - ts = r[0].ts - self.verify_ts(r[0].ts, tsi[0], True) - - r = self.vapi.ikev2_traffic_selector_dump( - is_initiator=False, sa_index=sa.sa_index, - child_sa_index=csa.child_sa_index) - self.assertEqual(len(r), 1) - self.verify_ts(r[0].ts, tsr[0], False) - - n = self.vapi.ikev2_nonce_get(is_initiator=True, - sa_index=sa.sa_index) - self.verify_nonce(n, self.sa.i_nonce) - n = self.vapi.ikev2_nonce_get(is_initiator=False, - sa_index=sa.sa_index) - self.verify_nonce(n, self.sa.r_nonce) - - def verify_nonce(self, api_nonce, nonce): - self.assertEqual(api_nonce.data_len, len(nonce)) - self.assertEqual(api_nonce.nonce, nonce) - - def verify_ts(self, api_ts, ts, is_initiator): - if is_initiator: - self.assertTrue(api_ts.is_local) - else: - self.assertFalse(api_ts.is_local) - - if self.p.ts_is_ip4: - self.assertEqual(api_ts.start_addr, - IPv4Address(ts.starting_address_v4)) - self.assertEqual(api_ts.end_addr, - IPv4Address(ts.ending_address_v4)) - else: - self.assertEqual(api_ts.start_addr, - IPv6Address(ts.starting_address_v6)) - self.assertEqual(api_ts.end_addr, - IPv6Address(ts.ending_address_v6)) - self.assertEqual(api_ts.start_port, ts.start_port) - self.assertEqual(api_ts.end_port, ts.end_port) - self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID) - - -class TemplateInitiator(IkePeer): - """ initiator test template """ - - def initiate_del_sa_from_initiator(self): - ispi = int.from_bytes(self.sa.ispi, 'little') - self.pg0.enable_capture() - self.pg_start() - self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi) - capture = self.pg0.get_capture(1) - ih = self.get_ike_header(capture[0]) - self.assertNotIn('Response', ih.flags) - self.assertIn('Initiator', ih.flags) - self.assertEqual(ih.init_SPI, self.sa.ispi) - self.assertEqual(ih.resp_SPI, self.sa.rspi) - plain = self.sa.hmac_and_decrypt(ih) - d = ikev2.IKEv2_payload_Delete(plain) - self.assertEqual(d.proto, 1) # proto=IKEv2 - header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, - flags='Response', exch_type='INFORMATIONAL', - id=ih.id, next_payload='Encrypted') - resp = self.encrypt_ike_msg(header, b'', None) - self.send_and_assert_no_replies(self.pg0, resp) - - def verify_del_sa(self, packet): - ih = self.get_ike_header(packet) - self.assertEqual(ih.id, self.sa.msg_id) - self.assertEqual(ih.exch_type, 37) # exchange informational - self.assertIn('Response', ih.flags) - self.assertIn('Initiator', ih.flags) - plain = self.sa.hmac_and_decrypt(ih) - self.assertEqual(plain, b'') - - def initiate_del_sa_from_responder(self): - header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, - exch_type='INFORMATIONAL', - id=self.sa.new_msg_id()) - del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2') - ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete') - packet = self.create_packet(self.pg0, ike_msg, - self.sa.sport, self.sa.dport, - self.sa.natt, self.ip6) - self.pg0.add_stream(packet) - self.pg0.enable_capture() - self.pg_start() - capture = self.pg0.get_capture(1) - self.verify_del_sa(capture[0]) - - @staticmethod - def find_notify_payload(packet, notify_type): - n = packet[ikev2.IKEv2_payload_Notify] - while n is not None: - if n.type == notify_type: - return n - n = n.payload - return None - - def verify_nat_detection(self, packet): - if self.ip6: - iph = packet[IPv6] - else: - iph = packet[IP] - udp = packet[UDP] - - # NAT_DETECTION_SOURCE_IP - s = self.find_notify_payload(packet, 16388) - self.assertIsNotNone(s) - src_sha = self.sa.compute_nat_sha1( - inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8) - self.assertEqual(s.load, src_sha) - - # NAT_DETECTION_DESTINATION_IP - s = self.find_notify_payload(packet, 16389) - self.assertIsNotNone(s) - dst_sha = self.sa.compute_nat_sha1( - inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8) - self.assertEqual(s.load, dst_sha) - - def verify_sa_init_request(self, packet): - udp = packet[UDP] - self.sa.dport = udp.sport - ih = packet[ikev2.IKEv2] - self.assertNotEqual(ih.init_SPI, 8 * b'\x00') - self.assertEqual(ih.exch_type, 34) # SA_INIT - self.sa.ispi = ih.init_SPI - self.assertEqual(ih.resp_SPI, 8 * b'\x00') - self.assertIn('Initiator', ih.flags) - self.assertNotIn('Response', ih.flags) - self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load - self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load - - prop = packet[ikev2.IKEv2_payload_Proposal] - self.assertEqual(prop.proto, 1) # proto = ikev2 - self.assertEqual(prop.proposal, 1) - self.assertEqual(prop.trans[0].transform_type, 1) # encryption - self.assertEqual(prop.trans[0].transform_id, - self.p.ike_transforms['crypto_alg']) - self.assertEqual(prop.trans[1].transform_type, 2) # prf - self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256" - self.assertEqual(prop.trans[2].transform_type, 4) # dh - self.assertEqual(prop.trans[2].transform_id, - self.p.ike_transforms['dh_group']) - - self.verify_nat_detection(packet) - self.sa.set_ike_props( - crypto='AES-GCM-16ICV', crypto_key_len=32, - integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr') - self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32, - integ='SHA2-256-128') - self.sa.generate_dh_data() - self.sa.complete_dh_data() - self.sa.calc_keys() - - def update_esp_transforms(self, trans, sa): - while trans: - if trans.transform_type == 1: # ecryption - sa.esp_crypto = CRYPTO_IDS[trans.transform_id] - elif trans.transform_type == 3: # integrity - sa.esp_integ = INTEG_IDS[trans.transform_id] - trans = trans.payload - - def verify_sa_auth_req(self, packet): - udp = packet[UDP] - self.sa.dport = udp.sport - ih = self.get_ike_header(packet) - self.assertEqual(ih.resp_SPI, self.sa.rspi) - self.assertEqual(ih.init_SPI, self.sa.ispi) - self.assertEqual(ih.exch_type, 35) # IKE_AUTH - self.assertIn('Initiator', ih.flags) - self.assertNotIn('Response', ih.flags) - - udp = packet[UDP] - self.verify_udp(udp) - self.assertEqual(ih.id, self.sa.msg_id + 1) - self.sa.msg_id += 1 - plain = self.sa.hmac_and_decrypt(ih) - idi = ikev2.IKEv2_payload_IDi(plain) - idr = ikev2.IKEv2_payload_IDr(idi.payload) - self.assertEqual(idi.load, self.sa.i_id) - self.assertEqual(idr.load, self.sa.r_id) - prop = idi[ikev2.IKEv2_payload_Proposal] - c = self.sa.child_sas[0] - c.ispi = prop.SPI - self.update_esp_transforms( - prop[ikev2.IKEv2_payload_Transform], self.sa) - - def send_init_response(self): - tr_attr = self.sa.ike_crypto_attr() - trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption', - transform_id=self.sa.ike_crypto, length=tr_attr[1], - key_length=tr_attr[0]) / - ikev2.IKEv2_payload_Transform(transform_type='Integrity', - transform_id=self.sa.ike_integ) / - ikev2.IKEv2_payload_Transform(transform_type='PRF', - transform_id=self.sa.ike_prf_alg.name) / - ikev2.IKEv2_payload_Transform(transform_type='GroupDesc', - transform_id=self.sa.ike_dh)) - props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2', - trans_nb=4, trans=trans)) - - src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4) - if self.sa.natt: - dst_address = b'\x0a\x0a\x0a\x0a' - else: - dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4) - src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport) - dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport) - - self.sa.init_resp_packet = ( - ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, - exch_type='IKE_SA_INIT', flags='Response') / - ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) / - ikev2.IKEv2_payload_KE(next_payload='Nonce', - group=self.sa.ike_dh, - load=self.sa.my_dh_pub_key) / - ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce, - next_payload='Notify') / - ikev2.IKEv2_payload_Notify( - type='NAT_DETECTION_SOURCE_IP', load=src_nat, - next_payload='Notify') / ikev2.IKEv2_payload_Notify( - type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)) - - ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet, - self.sa.sport, self.sa.dport, - False, self.ip6) - self.pg_send(self.pg0, ike_msg) - capture = self.pg0.get_capture(1) - self.verify_sa_auth_req(capture[0]) - - def initiate_sa_init(self): - self.pg0.enable_capture() - self.pg_start() - self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name) - - capture = self.pg0.get_capture(1) - self.verify_sa_init_request(capture[0]) - self.send_init_response() - - def send_auth_response(self): - tr_attr = self.sa.esp_crypto_attr() - trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption', - transform_id=self.sa.esp_crypto, length=tr_attr[1], - key_length=tr_attr[0]) / - ikev2.IKEv2_payload_Transform(transform_type='Integrity', - transform_id=self.sa.esp_integ) / - ikev2.IKEv2_payload_Transform( - transform_type='Extended Sequence Number', - transform_id='No ESN') / - ikev2.IKEv2_payload_Transform( - transform_type='Extended Sequence Number', - transform_id='ESN')) - - c = self.sa.child_sas[0] - props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP', - SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans)) - - tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4) - plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr', - IDtype=self.sa.id_type, load=self.sa.i_id) / - ikev2.IKEv2_payload_IDr(next_payload='AUTH', - IDtype=self.sa.id_type, load=self.sa.r_id) / - ikev2.IKEv2_payload_AUTH(next_payload='SA', - auth_type=AuthMethod.value(self.sa.auth_method), - load=self.sa.auth_data) / - ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) / - ikev2.IKEv2_payload_TSi(next_payload='TSr', - number_of_TSs=len(tsi), - traffic_selector=tsi) / - ikev2.IKEv2_payload_TSr(next_payload='Notify', - number_of_TSs=len(tsr), - traffic_selector=tsr) / - ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')) - - header = ikev2.IKEv2( - init_SPI=self.sa.ispi, - resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(), - flags='Response', exch_type='IKE_AUTH') - - ike_msg = self.encrypt_ike_msg(header, plain, 'IDi') - packet = self.create_packet(self.pg0, ike_msg, self.sa.sport, - self.sa.dport, self.sa.natt, self.ip6) - self.pg_send(self.pg0, packet) - - def test_initiator(self): - self.initiate_sa_init() - self.sa.auth_init() - self.sa.calc_child_keys() - self.send_auth_response() - self.verify_ike_sas() - - -class TemplateResponder(IkePeer): - """ responder test template """ - - def initiate_del_sa_from_responder(self): - self.pg0.enable_capture() - self.pg_start() - self.vapi.ikev2_initiate_del_ike_sa( - ispi=int.from_bytes(self.sa.ispi, 'little')) - capture = self.pg0.get_capture(1) - ih = self.get_ike_header(capture[0]) - self.assertNotIn('Response', ih.flags) - self.assertNotIn('Initiator', ih.flags) - self.assertEqual(ih.exch_type, 37) # INFORMATIONAL - plain = self.sa.hmac_and_decrypt(ih) - d = ikev2.IKEv2_payload_Delete(plain) - self.assertEqual(d.proto, 1) # proto=IKEv2 - self.assertEqual(ih.init_SPI, self.sa.ispi) - self.assertEqual(ih.resp_SPI, self.sa.rspi) - header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, - flags='Initiator+Response', - exch_type='INFORMATIONAL', - id=ih.id, next_payload='Encrypted') - resp = self.encrypt_ike_msg(header, b'', None) - self.send_and_assert_no_replies(self.pg0, resp) - - def verify_del_sa(self, packet): - ih = self.get_ike_header(packet) - self.assertEqual(ih.id, self.sa.msg_id) - self.assertEqual(ih.exch_type, 37) # exchange informational - self.assertIn('Response', ih.flags) - self.assertNotIn('Initiator', ih.flags) - self.assertEqual(ih.next_payload, 46) # Encrypted - self.assertEqual(ih.init_SPI, self.sa.ispi) - self.assertEqual(ih.resp_SPI, self.sa.rspi) - plain = self.sa.hmac_and_decrypt(ih) - self.assertEqual(plain, b'') - - def initiate_del_sa_from_initiator(self): - header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, - flags='Initiator', exch_type='INFORMATIONAL', - id=self.sa.new_msg_id()) - del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2') - ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete') - packet = self.create_packet(self.pg0, ike_msg, - self.sa.sport, self.sa.dport, - self.sa.natt, self.ip6) - self.pg0.add_stream(packet) - self.pg0.enable_capture() - self.pg_start() - capture = self.pg0.get_capture(1) - self.verify_del_sa(capture[0]) - - def send_sa_init_req(self): - tr_attr = self.sa.ike_crypto_attr() - trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption', - transform_id=self.sa.ike_crypto, length=tr_attr[1], - key_length=tr_attr[0]) / - ikev2.IKEv2_payload_Transform(transform_type='Integrity', - transform_id=self.sa.ike_integ) / - ikev2.IKEv2_payload_Transform(transform_type='PRF', - transform_id=self.sa.ike_prf_alg.name) / - ikev2.IKEv2_payload_Transform(transform_type='GroupDesc', - transform_id=self.sa.ike_dh)) - - props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2', - trans_nb=4, trans=trans)) - - next_payload = None if self.ip6 else 'Notify' - - self.sa.init_req_packet = ( - ikev2.IKEv2(init_SPI=self.sa.ispi, - flags='Initiator', exch_type='IKE_SA_INIT') / - ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) / - ikev2.IKEv2_payload_KE(next_payload='Nonce', - group=self.sa.ike_dh, - load=self.sa.my_dh_pub_key) / - ikev2.IKEv2_payload_Nonce(next_payload=next_payload, - load=self.sa.i_nonce)) - - if not self.ip6: - if self.sa.i_natt: - src_address = b'\x0a\x0a\x0a\x01' - else: - src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4) - - if self.sa.r_natt: - dst_address = b'\x0a\x0a\x0a\x0a' - else: - dst_address = inet_pton(socket.AF_INET, self.pg0.local_ip4) - - src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport) - dst_nat = self.sa.compute_nat_sha1(dst_address, self.sa.dport) - nat_src_detection = ikev2.IKEv2_payload_Notify( - type='NAT_DETECTION_SOURCE_IP', load=src_nat, - next_payload='Notify') - nat_dst_detection = ikev2.IKEv2_payload_Notify( - type='NAT_DETECTION_DESTINATION_IP', load=dst_nat) - self.sa.init_req_packet = (self.sa.init_req_packet / - nat_src_detection / - nat_dst_detection) - - ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet, - self.sa.sport, self.sa.dport, - self.sa.natt, self.ip6) - self.pg0.add_stream(ike_msg) - self.pg0.enable_capture() - self.pg_start() - capture = self.pg0.get_capture(1) - self.verify_sa_init(capture[0]) - - def generate_auth_payload(self, last_payload=None, is_rekey=False): - tr_attr = self.sa.esp_crypto_attr() - last_payload = last_payload or 'Notify' - trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption', - transform_id=self.sa.esp_crypto, length=tr_attr[1], - key_length=tr_attr[0]) / - ikev2.IKEv2_payload_Transform(transform_type='Integrity', - transform_id=self.sa.esp_integ) / - ikev2.IKEv2_payload_Transform( - transform_type='Extended Sequence Number', - transform_id='No ESN') / - ikev2.IKEv2_payload_Transform( - transform_type='Extended Sequence Number', - transform_id='ESN')) - - c = self.sa.child_sas[0] - props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP', - SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans)) - - tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4) - plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA', - auth_type=AuthMethod.value(self.sa.auth_method), - load=self.sa.auth_data) / - ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) / - ikev2.IKEv2_payload_TSi(next_payload='TSr', - number_of_TSs=len(tsi), traffic_selector=tsi) / - ikev2.IKEv2_payload_TSr(next_payload=last_payload, - number_of_TSs=len(tsr), traffic_selector=tsr)) - - if is_rekey: - first_payload = 'Nonce' - plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce, - next_payload='SA') / plain / - ikev2.IKEv2_payload_Notify(type='REKEY_SA', - proto='ESP', SPI=c.ispi)) - else: - first_payload = 'IDi' - ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr', - IDtype=self.sa.id_type, load=self.sa.i_id) / - ikev2.IKEv2_payload_IDr(next_payload='AUTH', - IDtype=self.sa.id_type, load=self.sa.r_id)) - plain = ids / plain - return plain, first_payload - - def send_sa_auth(self): - plain, first_payload = self.generate_auth_payload( - last_payload='Notify') - plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT') - header = ikev2.IKEv2( - init_SPI=self.sa.ispi, - resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(), - flags='Initiator', exch_type='IKE_AUTH') - - ike_msg = self.encrypt_ike_msg(header, plain, first_payload) - packet = self.create_packet(self.pg0, ike_msg, self.sa.sport, - self.sa.dport, self.sa.natt, self.ip6) - self.pg0.add_stream(packet) - self.pg0.enable_capture() - self.pg_start() - capture = self.pg0.get_capture(1) - self.verify_sa_auth_resp(capture[0]) - - def verify_sa_init(self, packet): - ih = self.get_ike_header(packet) - - self.assertEqual(ih.id, self.sa.msg_id) - self.assertEqual(ih.exch_type, 34) - self.assertIn('Response', ih.flags) - self.assertEqual(ih.init_SPI, self.sa.ispi) - self.assertNotEqual(ih.resp_SPI, 0) - self.sa.rspi = ih.resp_SPI - try: - sa = ih[ikev2.IKEv2_payload_SA] - self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load - self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load - except IndexError as e: - self.logger.error("unexpected reply: SA/Nonce/KE payload found!") - self.logger.error(ih.show()) - raise - self.sa.complete_dh_data() - self.sa.calc_keys() - self.sa.auth_init() - - def verify_sa_auth_resp(self, packet): - ike = self.get_ike_header(packet) - udp = packet[UDP] - self.verify_udp(udp) - self.assertEqual(ike.id, self.sa.msg_id) - plain = self.sa.hmac_and_decrypt(ike) - idr = ikev2.IKEv2_payload_IDr(plain) - prop = idr[ikev2.IKEv2_payload_Proposal] - self.assertEqual(prop.SPIsize, 4) - self.sa.child_sas[0].rspi = prop.SPI - self.sa.calc_child_keys() - - IKE_NODE_SUFFIX = 'ip4' - - def verify_counters(self): - self.assert_counter(2, 'processed', self.IKE_NODE_SUFFIX) - self.assert_counter(1, 'init_sa_req', self.IKE_NODE_SUFFIX) - self.assert_counter(1, 'ike_auth_req', self.IKE_NODE_SUFFIX) - - r = self.vapi.ikev2_sa_dump() - s = r[0].sa.stats - self.assertEqual(1, s.n_sa_auth_req) - self.assertEqual(1, s.n_sa_init_req) - - def test_responder(self): - self.send_sa_init_req() - self.send_sa_auth() - self.verify_ipsec_sas() - self.verify_ike_sas() - self.verify_counters() - - -class Ikev2Params(object): - def config_params(self, params={}): - ec = VppEnum.vl_api_ipsec_crypto_alg_t - ei = VppEnum.vl_api_ipsec_integ_alg_t - self.vpp_enums = { - 'AES-CBC-128': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_128, - 'AES-CBC-192': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_192, - 'AES-CBC-256': ec.IPSEC_API_CRYPTO_ALG_AES_CBC_256, - 'AES-GCM-16ICV-128': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_128, - 'AES-GCM-16ICV-192': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_192, - 'AES-GCM-16ICV-256': ec.IPSEC_API_CRYPTO_ALG_AES_GCM_256, - - 'HMAC-SHA1-96': ei.IPSEC_API_INTEG_ALG_SHA1_96, - 'SHA2-256-128': ei.IPSEC_API_INTEG_ALG_SHA_256_128, - 'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192, - 'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256} - - dpd_disabled = True if 'dpd_disabled' not in params else\ - params['dpd_disabled'] - if dpd_disabled: - self.vapi.cli('ikev2 dpd disable') - self.del_sa_from_responder = False if 'del_sa_from_responder'\ - not in params else params['del_sa_from_responder'] - i_natt = False if 'i_natt' not in params else params['i_natt'] - r_natt = False if 'r_natt' not in params else params['r_natt'] - self.p = Profile(self, 'pr1') - self.ip6 = False if 'ip6' not in params else params['ip6'] - - if 'auth' in params and params['auth'] == 'rsa-sig': - auth_method = 'rsa-sig' - work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/' - self.vapi.ikev2_set_local_key( - key_file=work_dir + params['server-key']) - - client_file = work_dir + params['client-cert'] - server_pem = open(work_dir + params['server-cert']).read() - client_priv = open(work_dir + params['client-key']).read() - client_priv = load_pem_private_key(str.encode(client_priv), None, - default_backend()) - self.peer_cert = x509.load_pem_x509_certificate( - str.encode(server_pem), - default_backend()) - self.p.add_auth(method='rsa-sig', data=str.encode(client_file)) - auth_data = None - else: - auth_data = b'$3cr3tpa$$w0rd' - self.p.add_auth(method='shared-key', data=auth_data) - auth_method = 'shared-key' - client_priv = None - - is_init = True if 'is_initiator' not in params else\ - params['is_initiator'] - - idr = {'id_type': 'fqdn', 'data': b'vpp.home'} - idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'} - if is_init: - self.p.add_local_id(**idr) - self.p.add_remote_id(**idi) - else: - self.p.add_local_id(**idi) - self.p.add_remote_id(**idr) - - loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\ - 'loc_ts' not in params else params['loc_ts'] - rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\ - 'rem_ts' not in params else params['rem_ts'] - self.p.add_local_ts(**loc_ts) - self.p.add_remote_ts(**rem_ts) - if 'responder' in params: - self.p.add_responder(params['responder']) - if 'ike_transforms' in params: - self.p.add_ike_transforms(params['ike_transforms']) - if 'esp_transforms' in params: - self.p.add_esp_transforms(params['esp_transforms']) - - udp_encap = False if 'udp_encap' not in params else\ - params['udp_encap'] - if udp_encap: - self.p.set_udp_encap(True) - - if 'responder_hostname' in params: - hn = params['responder_hostname'] - self.p.add_responder_hostname(hn) - - # configure static dns record - self.vapi.dns_name_server_add_del( - is_ip6=0, is_add=1, - server_address=IPv4Address(u'8.8.8.8').packed) - self.vapi.dns_enable_disable(enable=1) - - cmd = "dns cache add {} {}".format(hn['hostname'], - self.pg0.remote_ip4) - self.vapi.cli(cmd) - - self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'], - is_initiator=is_init, - id_type=self.p.local_id['id_type'], - i_natt=i_natt, r_natt=r_natt, - priv_key=client_priv, auth_method=auth_method, - auth_data=auth_data, udp_encap=udp_encap, - local_ts=self.p.remote_ts, remote_ts=self.p.local_ts) - if is_init: - ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\ - params['ike-crypto'] - ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\ - params['ike-integ'] - ike_dh = '2048MODPgr' if 'ike-dh' not in params else\ - params['ike-dh'] - - esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\ - params['esp-crypto'] - esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\ - params['esp-integ'] - - self.sa.set_ike_props( - crypto=ike_crypto[0], crypto_key_len=ike_crypto[1], - integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh) - self.sa.set_esp_props( - crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], - integ=esp_integ) - - -class TestApi(VppTestCase): - """ Test IKEV2 API """ - @classmethod - def setUpClass(cls): - super(TestApi, cls).setUpClass() - - @classmethod - def tearDownClass(cls): - super(TestApi, cls).tearDownClass() - - def tearDown(self): - super(TestApi, self).tearDown() - self.p1.remove_vpp_config() - self.p2.remove_vpp_config() - r = self.vapi.ikev2_profile_dump() - self.assertEqual(len(r), 0) - - def configure_profile(self, cfg): - p = Profile(self, cfg['name']) - p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1]) - p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1]) - p.add_local_ts(**cfg['loc_ts']) - p.add_remote_ts(**cfg['rem_ts']) - p.add_responder(cfg['responder']) - p.add_ike_transforms(cfg['ike_ts']) - p.add_esp_transforms(cfg['esp_ts']) - p.add_auth(**cfg['auth']) - p.set_udp_encap(cfg['udp_encap']) - p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port']) - if 'lifetime_data' in cfg: - p.set_lifetime_data(cfg['lifetime_data']) - if 'tun_itf' in cfg: - p.set_tunnel_interface(cfg['tun_itf']) - if 'natt_disabled' in cfg and cfg['natt_disabled']: - p.disable_natt() - p.add_vpp_config() - return p - - def test_profile_api(self): - """ test profile dump API """ - loc_ts4 = { - 'proto': 8, - 'start_port': 1, - 'end_port': 19, - 'start_addr': '3.3.3.2', - 'end_addr': '3.3.3.3', - } - rem_ts4 = { - 'proto': 9, - 'start_port': 10, - 'end_port': 119, - 'start_addr': '4.5.76.80', - 'end_addr': '2.3.4.6', - } - - loc_ts6 = { - 'proto': 8, - 'start_port': 1, - 'end_port': 19, - 'start_addr': 'ab::1', - 'end_addr': 'ab::4', - } - rem_ts6 = { - 'proto': 9, - 'start_port': 10, - 'end_port': 119, - 'start_addr': 'cd::12', - 'end_addr': 'cd::13', - } - - conf = { - 'p1': { - 'name': 'p1', - 'natt_disabled': True, - 'loc_id': ('fqdn', b'vpp.home'), - 'rem_id': ('fqdn', b'roadwarrior.example.com'), - 'loc_ts': loc_ts4, - 'rem_ts': rem_ts4, - 'responder': {'sw_if_index': 0, 'addr': '5.6.7.8'}, - 'ike_ts': { - 'crypto_alg': 20, - 'crypto_key_size': 32, - 'integ_alg': 1, - 'dh_group': 1}, - 'esp_ts': { - 'crypto_alg': 13, - 'crypto_key_size': 24, - 'integ_alg': 2}, - 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'}, - 'udp_encap': True, - 'ipsec_over_udp_port': 4501, - 'lifetime_data': { - 'lifetime': 123, - 'lifetime_maxdata': 20192, - 'lifetime_jitter': 9, - 'handover': 132}, - }, - 'p2': { - 'name': 'p2', - 'loc_id': ('ip4-addr', b'192.168.2.1'), - 'rem_id': ('ip6-addr', b'abcd::1'), - 'loc_ts': loc_ts6, - 'rem_ts': rem_ts6, - 'responder': {'sw_if_index': 4, 'addr': 'def::10'}, - 'ike_ts': { - 'crypto_alg': 12, - 'crypto_key_size': 16, - 'integ_alg': 3, - 'dh_group': 3}, - 'esp_ts': { - 'crypto_alg': 9, - 'crypto_key_size': 24, - 'integ_alg': 4}, - 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'}, - 'udp_encap': False, - 'ipsec_over_udp_port': 4600, - 'tun_itf': 0} - } - self.p1 = self.configure_profile(conf['p1']) - self.p2 = self.configure_profile(conf['p2']) - - r = self.vapi.ikev2_profile_dump() - self.assertEqual(len(r), 2) - self.verify_profile(r[0].profile, conf['p1']) - self.verify_profile(r[1].profile, conf['p2']) - - def verify_id(self, api_id, cfg_id): - self.assertEqual(api_id.type, IDType.value(cfg_id[0])) - self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1]) - - def verify_ts(self, api_ts, cfg_ts): - self.assertEqual(api_ts.protocol_id, cfg_ts['proto']) - self.assertEqual(api_ts.start_port, cfg_ts['start_port']) - self.assertEqual(api_ts.end_port, cfg_ts['end_port']) - self.assertEqual(api_ts.start_addr, - ip_address(text_type(cfg_ts['start_addr']))) - self.assertEqual(api_ts.end_addr, - ip_address(text_type(cfg_ts['end_addr']))) - - def verify_responder(self, api_r, cfg_r): - self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index']) - self.assertEqual(api_r.addr, ip_address(cfg_r['addr'])) - - def verify_transforms(self, api_ts, cfg_ts): - self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg']) - self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size']) - self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg']) - - def verify_ike_transforms(self, api_ts, cfg_ts): - self.verify_transforms(api_ts, cfg_ts) - self.assertEqual(api_ts.dh_group, cfg_ts['dh_group']) - - def verify_esp_transforms(self, api_ts, cfg_ts): - self.verify_transforms(api_ts, cfg_ts) - - def verify_auth(self, api_auth, cfg_auth): - self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method'])) - self.assertEqual(api_auth.data, cfg_auth['data']) - self.assertEqual(api_auth.data_len, len(cfg_auth['data'])) - - def verify_lifetime_data(self, p, ld): - self.assertEqual(p.lifetime, ld['lifetime']) - self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata']) - self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter']) - self.assertEqual(p.handover, ld['handover']) - - def verify_profile(self, ap, cp): - self.assertEqual(ap.name, cp['name']) - self.assertEqual(ap.udp_encap, cp['udp_encap']) - self.verify_id(ap.loc_id, cp['loc_id']) - self.verify_id(ap.rem_id, cp['rem_id']) - self.verify_ts(ap.loc_ts, cp['loc_ts']) - self.verify_ts(ap.rem_ts, cp['rem_ts']) - self.verify_responder(ap.responder, cp['responder']) - self.verify_ike_transforms(ap.ike_ts, cp['ike_ts']) - self.verify_esp_transforms(ap.esp_ts, cp['esp_ts']) - self.verify_auth(ap.auth, cp['auth']) - natt_dis = False if 'natt_disabled' not in cp else cp['natt_disabled'] - self.assertTrue(natt_dis == ap.natt_disabled) - - if 'lifetime_data' in cp: - self.verify_lifetime_data(ap, cp['lifetime_data']) - self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port']) - if 'tun_itf' in cp: - self.assertEqual(ap.tun_itf, cp['tun_itf']) - else: - self.assertEqual(ap.tun_itf, 0xffffffff) - - -@tag_fixme_vpp_workers -class TestResponderBehindNAT(TemplateResponder, Ikev2Params): - """ test responder - responder behind NAT """ - - IKE_NODE_SUFFIX = 'ip4-natt' - - def config_tc(self): - self.config_params({'r_natt': True}) - - -@tag_fixme_vpp_workers -class TestInitiatorNATT(TemplateInitiator, Ikev2Params): - """ test ikev2 initiator - NAT traversal (intitiator behind NAT) """ - - def config_tc(self): - self.config_params({ - 'i_natt': True, - 'is_initiator': False, # seen from test case perspective - # thus vpp is initiator - 'responder': {'sw_if_index': self.pg0.sw_if_index, - 'addr': self.pg0.remote_ip4}, - 'ike-crypto': ('AES-GCM-16ICV', 32), - 'ike-integ': 'NULL', - 'ike-dh': '3072MODPgr', - 'ike_transforms': { - 'crypto_alg': 20, # "aes-gcm-16" - 'crypto_key_size': 256, - 'dh_group': 15, # "modp-3072" - }, - 'esp_transforms': { - 'crypto_alg': 12, # "aes-cbc" - 'crypto_key_size': 256, - # "hmac-sha2-256-128" - 'integ_alg': 12}}) - - -@tag_fixme_vpp_workers -class TestInitiatorPsk(TemplateInitiator, Ikev2Params): - """ test ikev2 initiator - pre shared key auth """ - - def config_tc(self): - self.config_params({ - 'is_initiator': False, # seen from test case perspective - # thus vpp is initiator - 'ike-crypto': ('AES-GCM-16ICV', 32), - 'ike-integ': 'NULL', - 'ike-dh': '3072MODPgr', - 'ike_transforms': { - 'crypto_alg': 20, # "aes-gcm-16" - 'crypto_key_size': 256, - 'dh_group': 15, # "modp-3072" - }, - 'esp_transforms': { - 'crypto_alg': 12, # "aes-cbc" - 'crypto_key_size': 256, - # "hmac-sha2-256-128" - 'integ_alg': 12}, - 'responder_hostname': {'hostname': 'vpp.responder.org', - 'sw_if_index': self.pg0.sw_if_index}}) - - -@tag_fixme_vpp_workers -class TestInitiatorRequestWindowSize(TestInitiatorPsk): - """ test initiator - request window size (1) """ - - def rekey_respond(self, req, update_child_sa_data): - ih = self.get_ike_header(req) - plain = self.sa.hmac_and_decrypt(ih) - sa = ikev2.IKEv2_payload_SA(plain) - if update_child_sa_data: - prop = sa[ikev2.IKEv2_payload_Proposal] - self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load - self.sa.r_nonce = self.sa.i_nonce - self.sa.child_sas[0].ispi = prop.SPI - self.sa.child_sas[0].rspi = prop.SPI - self.sa.calc_child_keys() - - header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, - flags='Response', exch_type=36, - id=ih.id, next_payload='Encrypted') - resp = self.encrypt_ike_msg(header, sa, 'SA') - packet = self.create_packet(self.pg0, resp, self.sa.sport, - self.sa.dport, self.sa.natt, self.ip6) - self.send_and_assert_no_replies(self.pg0, packet) - - def test_initiator(self): - super(TestInitiatorRequestWindowSize, self).test_initiator() - self.pg0.enable_capture() - self.pg_start() - ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little') - self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi) - self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi) - capture = self.pg0.get_capture(2) - - # reply in reverse order - self.rekey_respond(capture[1], True) - self.rekey_respond(capture[0], False) - - # verify that only the second request was accepted - self.verify_ike_sas() - self.verify_ipsec_sas(is_rekey=True) - - -@tag_fixme_vpp_workers -class TestInitiatorRekey(TestInitiatorPsk): - """ test ikev2 initiator - rekey """ - - def rekey_from_initiator(self): - ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little') - self.pg0.enable_capture() - self.pg_start() - self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi) - capture = self.pg0.get_capture(1) - ih = self.get_ike_header(capture[0]) - self.assertEqual(ih.exch_type, 36) # CHILD_SA - self.assertNotIn('Response', ih.flags) - self.assertIn('Initiator', ih.flags) - plain = self.sa.hmac_and_decrypt(ih) - sa = ikev2.IKEv2_payload_SA(plain) - prop = sa[ikev2.IKEv2_payload_Proposal] - self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load - self.sa.r_nonce = self.sa.i_nonce - # update new responder SPI - self.sa.child_sas[0].ispi = prop.SPI - self.sa.child_sas[0].rspi = prop.SPI - self.sa.calc_child_keys() - header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, - flags='Response', exch_type=36, - id=ih.id, next_payload='Encrypted') - resp = self.encrypt_ike_msg(header, sa, 'SA') - packet = self.create_packet(self.pg0, resp, self.sa.sport, - self.sa.dport, self.sa.natt, self.ip6) - self.send_and_assert_no_replies(self.pg0, packet) - - def test_initiator(self): - super(TestInitiatorRekey, self).test_initiator() - self.rekey_from_initiator() - self.verify_ike_sas() - self.verify_ipsec_sas(is_rekey=True) - - -@tag_fixme_vpp_workers -class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params): - """ test ikev2 initiator - delete IKE SA from responder """ - - def config_tc(self): - self.config_params({ - 'del_sa_from_responder': True, - 'is_initiator': False, # seen from test case perspective - # thus vpp is initiator - 'responder': {'sw_if_index': self.pg0.sw_if_index, - 'addr': self.pg0.remote_ip4}, - 'ike-crypto': ('AES-GCM-16ICV', 32), - 'ike-integ': 'NULL', - 'ike-dh': '3072MODPgr', - 'ike_transforms': { - 'crypto_alg': 20, # "aes-gcm-16" - 'crypto_key_size': 256, - 'dh_group': 15, # "modp-3072" - }, - 'esp_transforms': { - 'crypto_alg': 12, # "aes-cbc" - 'crypto_key_size': 256, - # "hmac-sha2-256-128" - 'integ_alg': 12}}) - - -@tag_fixme_vpp_workers -class TestResponderInitBehindNATT(TemplateResponder, Ikev2Params): - """ test ikev2 responder - initiator behind NAT """ - - IKE_NODE_SUFFIX = 'ip4-natt' - - def config_tc(self): - self.config_params( - {'i_natt': True}) - - -@tag_fixme_vpp_workers -class TestResponderPsk(TemplateResponder, Ikev2Params): - """ test ikev2 responder - pre shared key auth """ - def config_tc(self): - self.config_params() - - -@tag_fixme_vpp_workers -class TestResponderDpd(TestResponderPsk): - """ - Dead peer detection test - """ - def config_tc(self): - self.config_params({'dpd_disabled': False}) - - def tearDown(self): - pass - - def test_responder(self): - self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1) - super(TestResponderDpd, self).test_responder() - self.pg0.enable_capture() - self.pg_start() - # capture empty request but don't reply - capture = self.pg0.get_capture(expected_count=1, timeout=5) - ih = self.get_ike_header(capture[0]) - self.assertEqual(ih.exch_type, 37) # INFORMATIONAL - plain = self.sa.hmac_and_decrypt(ih) - self.assertEqual(plain, b'') - # wait for SA expiration - time.sleep(3) - ike_sas = self.vapi.ikev2_sa_dump() - self.assertEqual(len(ike_sas), 0) - ipsec_sas = self.vapi.ipsec_sa_dump() - self.assertEqual(len(ipsec_sas), 0) - - -@tag_fixme_vpp_workers -class TestResponderRekey(TestResponderPsk): - """ test ikev2 responder - rekey """ - - def rekey_from_initiator(self): - packet = self.create_rekey_request() - self.pg0.add_stream(packet) - self.pg0.enable_capture() - self.pg_start() - capture = self.pg0.get_capture(1) - ih = self.get_ike_header(capture[0]) - plain = self.sa.hmac_and_decrypt(ih) - sa = ikev2.IKEv2_payload_SA(plain) - prop = sa[ikev2.IKEv2_payload_Proposal] - self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load - # update new responder SPI - self.sa.child_sas[0].rspi = prop.SPI - - def test_responder(self): - super(TestResponderRekey, self).test_responder() - self.rekey_from_initiator() - self.sa.calc_child_keys() - self.verify_ike_sas() - self.verify_ipsec_sas(is_rekey=True) - self.assert_counter(1, 'rekey_req', 'ip4') - r = self.vapi.ikev2_sa_dump() - self.assertEqual(r[0].sa.stats.n_rekey_req, 1) - - -class TestResponderVrf(TestResponderPsk, Ikev2Params): - """ test ikev2 responder - non-default table id """ - - @classmethod - def setUpClass(cls): - import scapy.contrib.ikev2 as _ikev2 - globals()['ikev2'] = _ikev2 - super(IkePeer, cls).setUpClass() - cls.create_pg_interfaces(range(1)) - cls.vapi.cli("ip table add 1") - cls.vapi.cli("set interface ip table pg0 1") - for i in cls.pg_interfaces: - i.admin_up() - i.config_ip4() - i.resolve_arp() - i.config_ip6() - i.resolve_ndp() - - def config_tc(self): - self.config_params({'dpd_disabled': False}) - - def test_responder(self): - self.vapi.ikev2_profile_set_liveness(period=2, max_retries=1) - super(TestResponderVrf, self).test_responder() - self.pg0.enable_capture() - self.pg_start() - capture = self.pg0.get_capture(expected_count=1, timeout=5) - ih = self.get_ike_header(capture[0]) - self.assertEqual(ih.exch_type, 37) # INFORMATIONAL - plain = self.sa.hmac_and_decrypt(ih) - self.assertEqual(plain, b'') - - -@tag_fixme_vpp_workers -class TestResponderRsaSign(TemplateResponder, Ikev2Params): - """ test ikev2 responder - cert based auth """ - def config_tc(self): - self.config_params({ - 'udp_encap': True, - 'auth': 'rsa-sig', - 'server-key': 'server-key.pem', - 'client-key': 'client-key.pem', - 'client-cert': 'client-cert.pem', - 'server-cert': 'server-cert.pem'}) - - -@tag_fixme_vpp_workers -class Test_IKE_AES_CBC_128_SHA256_128_MODP2048_ESP_AES_CBC_192_SHA_384_192\ - (TemplateResponder, Ikev2Params): - """ - IKE:AES_CBC_128_SHA256_128,DH=modp2048 ESP:AES_CBC_192_SHA_384_192 - """ - def config_tc(self): - self.config_params({ - 'ike-crypto': ('AES-CBC', 16), - 'ike-integ': 'SHA2-256-128', - 'esp-crypto': ('AES-CBC', 24), - 'esp-integ': 'SHA2-384-192', - 'ike-dh': '2048MODPgr'}) - - -@tag_fixme_vpp_workers -class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\ - (TemplateResponder, Ikev2Params): - - """ - IKE:AES_CBC_128_SHA256_128,DH=modp3072 ESP:AES_GCM_16 - """ - def config_tc(self): - self.config_params({ - 'ike-crypto': ('AES-CBC', 32), - 'ike-integ': 'SHA2-256-128', - 'esp-crypto': ('AES-GCM-16ICV', 32), - 'esp-integ': 'NULL', - 'ike-dh': '3072MODPgr'}) - - -@tag_fixme_vpp_workers -class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params): - """ - IKE:AES_GCM_16_256 - """ - - IKE_NODE_SUFFIX = 'ip6' - - def config_tc(self): - self.config_params({ - 'del_sa_from_responder': True, - 'ip6': True, - 'natt': True, - 'ike-crypto': ('AES-GCM-16ICV', 32), - 'ike-integ': 'NULL', - 'ike-dh': '2048MODPgr', - 'loc_ts': {'start_addr': 'ab:cd::0', - 'end_addr': 'ab:cd::10'}, - 'rem_ts': {'start_addr': '11::0', - 'end_addr': '11::100'}}) - - -@tag_fixme_vpp_workers -class TestInitiatorKeepaliveMsg(TestInitiatorPsk): - """ - Test for keep alive messages - """ - - def send_empty_req_from_responder(self): - packet = self.create_empty_request() - self.pg0.add_stream(packet) - self.pg0.enable_capture() - self.pg_start() - capture = self.pg0.get_capture(1) - ih = self.get_ike_header(capture[0]) - self.assertEqual(ih.id, self.sa.msg_id) - plain = self.sa.hmac_and_decrypt(ih) - self.assertEqual(plain, b'') - self.assert_counter(1, 'keepalive', 'ip4') - r = self.vapi.ikev2_sa_dump() - self.assertEqual(1, r[0].sa.stats.n_keepalives) - - def test_initiator(self): - super(TestInitiatorKeepaliveMsg, self).test_initiator() - self.send_empty_req_from_responder() - - -class TestMalformedMessages(TemplateResponder, Ikev2Params): - """ malformed packet test """ - - def tearDown(self): - pass - - def config_tc(self): - self.config_params() - - def create_ike_init_msg(self, length=None, payload=None): - msg = ikev2.IKEv2(length=length, init_SPI='\x11' * 8, - flags='Initiator', exch_type='IKE_SA_INIT') - if payload is not None: - msg /= payload - return self.create_packet(self.pg0, msg, self.sa.sport, - self.sa.dport) - - def verify_bad_packet_length(self): - ike_msg = self.create_ike_init_msg(length=0xdead) - self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count) - self.assert_counter(self.pkt_count, 'bad_length') - - def verify_bad_sa_payload_length(self): - p = ikev2.IKEv2_payload_SA(length=0xdead) - ike_msg = self.create_ike_init_msg(payload=p) - self.send_and_assert_no_replies(self.pg0, ike_msg * self.pkt_count) - self.assert_counter(self.pkt_count, 'malformed_packet') - - def test_responder(self): - self.pkt_count = 254 - self.verify_bad_packet_length() - self.verify_bad_sa_payload_length() - - -if __name__ == '__main__': - unittest.main(testRunner=VppTestRunner) diff --git a/src/plugins/ikev2/test/vpp_ikev2.py b/src/plugins/ikev2/test/vpp_ikev2.py deleted file mode 100644 index de2081268ee..00000000000 --- a/src/plugins/ikev2/test/vpp_ikev2.py +++ /dev/null @@ -1,179 +0,0 @@ -from ipaddress import IPv4Address, AddressValueError -from vpp_object import VppObject -from vpp_papi import VppEnum - - -class AuthMethod: - v = {'rsa-sig': 1, - 'shared-key': 2} - - @staticmethod - def value(key): return AuthMethod.v[key] - - -class IDType: - v = {'ip4-addr': 1, - 'fqdn': 2, - 'ip6-addr': 5} - - @staticmethod - def value(key): return IDType.v[key] - - -class Profile(VppObject): - """ IKEv2 profile """ - def __init__(self, test, profile_name): - self.test = test - self.vapi = test.vapi - self.profile_name = profile_name - self.udp_encap = False - self.natt = True - - def disable_natt(self): - self.natt = False - - def add_auth(self, method, data, is_hex=False): - if isinstance(method, int): - m = method - elif isinstance(method, str): - m = AuthMethod.value(method) - else: - raise Exception('unsupported type {}'.format(method)) - self.auth = {'auth_method': m, - 'data': data, - 'is_hex': is_hex} - - def add_local_id(self, id_type, data): - if isinstance(id_type, str): - t = IDType.value(id_type) - self.local_id = {'id_type': t, - 'data': data, - 'is_local': True} - - def add_remote_id(self, id_type, data): - if isinstance(id_type, str): - t = IDType.value(id_type) - self.remote_id = {'id_type': t, - 'data': data, - 'is_local': False} - - def add_local_ts(self, start_addr, end_addr, start_port=0, end_port=0xffff, - proto=0, is_ip4=True): - self.ts_is_ip4 = is_ip4 - self.local_ts = {'is_local': True, - 'protocol_id': proto, - 'start_port': start_port, - 'end_port': end_port, - 'start_addr': start_addr, - 'end_addr': end_addr} - - def add_remote_ts(self, start_addr, end_addr, start_port=0, - end_port=0xffff, proto=0): - try: - IPv4Address(start_addr) - is_ip4 = True - except AddressValueError: - is_ip4 = False - self.ts_is_ip4 = is_ip4 - self.remote_ts = {'is_local': False, - 'protocol_id': proto, - 'start_port': start_port, - 'end_port': end_port, - 'start_addr': start_addr, - 'end_addr': end_addr} - - def add_responder_hostname(self, hn): - self.responder_hostname = hn - - def add_responder(self, responder): - self.responder = responder - - def add_ike_transforms(self, tr): - self.ike_transforms = tr - - def add_esp_transforms(self, tr): - self.esp_transforms = tr - - def set_udp_encap(self, udp_encap): - self.udp_encap = udp_encap - - def set_lifetime_data(self, data): - self.lifetime_data = data - - def set_ipsec_over_udp_port(self, port): - self.ipsec_udp_port = {'is_set': 1, - 'port': port} - - def set_tunnel_interface(self, sw_if_index): - self.tun_itf = sw_if_index - - def object_id(self): - return 'ikev2-profile-%s' % self.profile_name - - def remove_vpp_config(self): - self.vapi.ikev2_profile_add_del(name=self.profile_name, is_add=False) - - def add_vpp_config(self): - self.vapi.ikev2_profile_add_del(name=self.profile_name, is_add=True) - if hasattr(self, 'auth'): - self.vapi.ikev2_profile_set_auth(name=self.profile_name, - data_len=len(self.auth['data']), - **self.auth) - if hasattr(self, 'local_id'): - self.vapi.ikev2_profile_set_id(name=self.profile_name, - data_len=len(self.local_id - ['data']), - **self.local_id) - if hasattr(self, 'remote_id'): - self.vapi.ikev2_profile_set_id(name=self.profile_name, - data_len=len(self.remote_id - ['data']), - **self.remote_id) - if hasattr(self, 'local_ts'): - self.vapi.ikev2_profile_set_ts(name=self.profile_name, - ts=self.local_ts) - - if hasattr(self, 'remote_ts'): - self.vapi.ikev2_profile_set_ts(name=self.profile_name, - ts=self.remote_ts) - - if hasattr(self, 'responder'): - self.vapi.ikev2_set_responder(name=self.profile_name, - responder=self.responder) - - if hasattr(self, 'responder_hostname'): - print(self.responder_hostname) - self.vapi.ikev2_set_responder_hostname(name=self.profile_name, - **self.responder_hostname) - - if hasattr(self, 'ike_transforms'): - self.vapi.ikev2_set_ike_transforms(name=self.profile_name, - tr=self.ike_transforms) - - if hasattr(self, 'esp_transforms'): - self.vapi.ikev2_set_esp_transforms(name=self.profile_name, - tr=self.esp_transforms) - - if self.udp_encap: - self.vapi.ikev2_profile_set_udp_encap(name=self.profile_name) - - if hasattr(self, 'lifetime_data'): - self.vapi.ikev2_set_sa_lifetime(name=self.profile_name, - **self.lifetime_data) - - if hasattr(self, 'ipsec_udp_port'): - self.vapi.ikev2_profile_set_ipsec_udp_port(name=self.profile_name, - **self.ipsec_udp_port) - if hasattr(self, 'tun_itf'): - self.vapi.ikev2_set_tunnel_interface(name=self.profile_name, - sw_if_index=self.tun_itf) - - if not self.natt: - self.vapi.ikev2_profile_disable_natt(name=self.profile_name) - - def query_vpp_config(self): - res = self.vapi.ikev2_profile_dump() - for r in res: - if r.profile.name == self.profile_name: - return r.profile - return None |