From a7b963df2758d7c25de366db1999ca1024e12d30 Mon Sep 17 00:00:00 2001 From: Filip Tehlar Date: Wed, 8 Jul 2020 13:25:34 +0000 Subject: ikev2: add support for AES-GCM cipher in IKE Type: feature Ticket: VPP-1920 Change-Id: I6e30f3594cb30553f3ca5a35e0a4f679325aacec Signed-off-by: Filip Tehlar --- src/plugins/ikev2/test/test_ikev2.py | 174 +++++++++++++++++++++++++---------- 1 file changed, 126 insertions(+), 48 deletions(-) (limited to 'src/plugins/ikev2/test/test_ikev2.py') diff --git a/src/plugins/ikev2/test/test_ikev2.py b/src/plugins/ikev2/test/test_ikev2.py index 918b5cef624..77698d6a5ed 100644 --- a/src/plugins/ikev2/test/test_ikev2.py +++ b/src/plugins/ikev2/test/test_ikev2.py @@ -19,6 +19,9 @@ from vpp_papi import VppEnum KEY_PAD = b"Key Pad for IKEv2" +SALT_SIZE = 4 +GCM_ICV_SIZE = 16 +GCM_IV_SIZE = 8 # defined in rfc3526 @@ -65,19 +68,47 @@ class CryptoAlgo(object): if self.cipher is not None: self.bs = self.cipher.block_size // 8 - def encrypt(self, data, key): - iv = os.urandom(self.bs) - encryptor = Cipher(self.cipher(key), self.mode(iv), - default_backend()).encryptor() - return iv + encryptor.update(data) + encryptor.finalize() - - def decrypt(self, data, key, icv=None): - iv = data[:self.bs] - ct = data[self.bs:] - decryptor = Cipher(algorithms.AES(key), - modes.CBC(iv), - default_backend()).decryptor() - return decryptor.update(ct) + decryptor.finalize() + if self.name == 'AES-GCM-16ICV': + self.iv_len = GCM_IV_SIZE + else: + self.iv_len = self.bs + + def encrypt(self, data, key, aad=None): + iv = os.urandom(self.iv_len) + if aad is None: + encryptor = Cipher(self.cipher(key), self.mode(iv), + default_backend()).encryptor() + return iv + encryptor.update(data) + encryptor.finalize() + else: + salt = key[-SALT_SIZE:] + nonce = salt + iv + encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce), + default_backend()).encryptor() + encryptor.authenticate_additional_data(aad) + data = encryptor.update(data) + encryptor.finalize() + data += encryptor.tag[:GCM_ICV_SIZE] + return iv + data + + def decrypt(self, data, key, aad=None, icv=None): + if aad is None: + iv = data[:self.iv_len] + ct = data[self.iv_len:] + decryptor = Cipher(algorithms.AES(key), + self.mode(iv), + default_backend()).decryptor() + return decryptor.update(ct) + decryptor.finalize() + else: + salt = key[-SALT_SIZE:] + nonce = salt + data[:GCM_IV_SIZE] + ct = data[GCM_IV_SIZE:] + key = key[:-SALT_SIZE] + decryptor = Cipher(algorithms.AES(key), + self.mode(nonce, icv, len(icv)), + default_backend()).decryptor() + decryptor.authenticate_additional_data(aad) + pt = decryptor.update(ct) + decryptor.finalize() + pad_len = pt[-1] + 1 + return pt[:-pad_len] def pad(self, data): pad_len = (len(data) // self.bs + 1) * self.bs - len(data) @@ -241,7 +272,7 @@ class IKEv2SA(object): return r def calc_prf(self, prf, key, data): - h = self.ike_integ_alg.mac(key, prf, backend=default_backend()) + h = self.ike_prf_alg.mac(key, prf, backend=default_backend()) h.update(data) return h.finalize() @@ -258,10 +289,16 @@ class IKEv2SA(object): encr_key_len = self.ike_crypto_key_len tr_prf_key_len = self.ike_prf_alg.key_len integ_key_len = self.ike_integ_alg.key_len + if integ_key_len == 0: + salt_size = 4 + else: + salt_size = 0 + l = (prf_key_trunc + integ_key_len * 2 + encr_key_len * 2 + - tr_prf_key_len * 2) + tr_prf_key_len * 2 + + salt_size * 2) keymat = self.calc_prfplus(prf, self.skeyseed, s, l) pos = 0 @@ -273,10 +310,10 @@ class IKEv2SA(object): self.sk_ar = keymat[pos:pos+integ_key_len] pos += integ_key_len - self.sk_ei = keymat[pos:pos+encr_key_len] - pos += encr_key_len - self.sk_er = keymat[pos:pos+encr_key_len] - pos += encr_key_len + self.sk_ei = keymat[pos:pos+encr_key_len + salt_size] + pos += encr_key_len + salt_size + self.sk_er = keymat[pos:pos+encr_key_len + salt_size] + pos += encr_key_len + salt_size self.sk_pi = keymat[pos:pos+tr_prf_key_len] pos += tr_prf_key_len @@ -303,9 +340,9 @@ class IKEv2SA(object): else: raise TypeError('unknown auth method type!') - def encrypt(self, data): + def encrypt(self, data, aad=None): data = self.ike_crypto_alg.pad(data) - return self.ike_crypto_alg.encrypt(data, self.my_cryptokey) + return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad) @property def peer_authkey(self): @@ -355,17 +392,23 @@ class IKEv2SA(object): h.update(data) return h.finalize() - def decrypt(self, data): - return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey) + def decrypt(self, data, aad=None, icv=None): + return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv) def hmac_and_decrypt(self, ike): ep = ike[ikev2.IKEv2_payload_Encrypted] - self.verify_hmac(raw(ike)) - integ_trunc = self.ike_integ_alg.trunc_len + if self.ike_crypto == 'AES-GCM-16ICV': + aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2()) + ct = ep.load[:-GCM_ICV_SIZE] + tag = ep.load[-GCM_ICV_SIZE:] + return self.decrypt(ct, raw(ike)[:aad_len], tag) + else: + self.verify_hmac(raw(ike)) + integ_trunc = self.ike_integ_alg.trunc_len - # remove ICV and decrypt payload - ct = ep.load[:-integ_trunc] - return self.decrypt(ct) + # remove ICV and decrypt payload + ct = ep.load[:-integ_trunc] + return self.decrypt(ct) def generate_ts(self): c = self.child_sas[0] @@ -388,7 +431,7 @@ class IKEv2SA(object): if integ not in AUTH_ALGOS: raise TypeError('unsupported auth algo %r' % integ) - self.ike_integ = integ + self.ike_integ = None if integ == 'NULL' else integ self.ike_integ_alg = AUTH_ALGOS[integ] if prf not in PRF_ALGOS: @@ -411,7 +454,7 @@ class IKEv2SA(object): self.esp_integ_alg = AUTH_ALGOS[integ] def crypto_attr(self, key_len): - if self.ike_crypto in ['AES-CBC', 'AES-GCM']: + if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']: return (0x800e << 16 | key_len << 3, 12) else: raise Exception('unsupported attribute type') @@ -542,24 +585,47 @@ class TemplateResponder(VppTestCase): number_of_TSs=len(tsr), traffic_selector=tsr) / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')) - encr = self.sa.encrypt(raw(plain)) - - trunc_len = self.sa.ike_integ_alg.trunc_len - plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len - tlen = plen + len(ikev2.IKEv2()) - - sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi', - length=plen, load=encr) - sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, - length=tlen, flags='Initiator', exch_type='IKE_AUTH', id=1)) - sa_auth /= sk_p - - integ_data = raw(sa_auth) - hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(), - self.sa.my_authkey, integ_data) - sa_auth = sa_auth / Raw(hmac_data[:trunc_len]) - assert(len(sa_auth) == tlen) + if self.sa.ike_crypto == 'AES-GCM-16ICV': + data = self.sa.ike_crypto_alg.pad(raw(plain)) + plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\ + len(ikev2.IKEv2_payload_Encrypted()) + tlen = plen + len(ikev2.IKEv2()) + + # prepare aad data + sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi', + length=plen) + sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi, + resp_SPI=self.sa.rspi, id=1, + length=tlen, flags='Initiator', exch_type='IKE_AUTH')) + sa_auth /= sk_p + + encr = self.sa.encrypt(raw(plain), raw(sa_auth)) + sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi', + length=plen, load=encr) + sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi, + resp_SPI=self.sa.rspi, id=1, + length=tlen, flags='Initiator', exch_type='IKE_AUTH')) + sa_auth /= sk_p + else: + encr = self.sa.encrypt(raw(plain)) + trunc_len = self.sa.ike_integ_alg.trunc_len + plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len + tlen = plen + len(ikev2.IKEv2()) + + sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi', + length=plen, load=encr) + sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi, + resp_SPI=self.sa.rspi, id=1, + length=tlen, flags='Initiator', exch_type='IKE_AUTH')) + sa_auth /= sk_p + + integ_data = raw(sa_auth) + hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(), + self.sa.my_authkey, integ_data) + sa_auth = sa_auth / Raw(hmac_data[:trunc_len]) + + assert(len(sa_auth) == tlen) packet = self.create_ike_msg(self.pg0, sa_auth, self.sa.sport, self.sa.dport, self.sa.natt) self.pg0.add_stream(packet) @@ -590,8 +656,9 @@ class TemplateResponder(VppTestCase): sa = ih[ikev2.IKEv2_payload_SA] self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load - except AttributeError as e: + except IndexError as e: self.logger.error("unexpected reply: SA/Nonce/KE payload found!") + self.logger.error(ih.show()) raise self.sa.complete_dh_data() self.sa.calc_keys() @@ -783,5 +850,16 @@ class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\ 'ike-dh': '3072MODPgr'}) +class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params): + """ + IKE:AES_GCM_16_256 + """ + def config_tc(self): + self.config_params({ + 'ike-crypto': ('AES-GCM-16ICV', 32), + 'ike-integ': 'NULL', + 'ike-dh': '2048MODPgr'}) + + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner) -- cgit 1.2.3-korg