aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins/ikev2/test
diff options
context:
space:
mode:
authorFilip Tehlar <ftehlar@cisco.com>2020-07-08 13:25:34 +0000
committerBenoƮt Ganne <bganne@cisco.com>2020-07-15 16:12:16 +0000
commita7b963df2758d7c25de366db1999ca1024e12d30 (patch)
tree5cbf77b9e1248239b193324dc1af17df15724bd1 /src/plugins/ikev2/test
parent8046fdc10b14fd161ee81d0a25cfa79793ef698b (diff)
ikev2: add support for AES-GCM cipher in IKE
Type: feature Ticket: VPP-1920 Change-Id: I6e30f3594cb30553f3ca5a35e0a4f679325aacec Signed-off-by: Filip Tehlar <ftehlar@cisco.com>
Diffstat (limited to 'src/plugins/ikev2/test')
-rw-r--r--src/plugins/ikev2/test/test_ikev2.py174
1 files changed, 126 insertions, 48 deletions
diff --git a/src/plugins/ikev2/test/test_ikev2.py b/src/plugins/ikev2/test/test_ikev2.py
index 918b5cef624..77698d6a5ed 100644
--- a/src/plugins/ikev2/test/test_ikev2.py
+++ b/src/plugins/ikev2/test/test_ikev2.py
@@ -19,6 +19,9 @@ from vpp_papi import VppEnum
KEY_PAD = b"Key Pad for IKEv2"
+SALT_SIZE = 4
+GCM_ICV_SIZE = 16
+GCM_IV_SIZE = 8
# defined in rfc3526
@@ -65,19 +68,47 @@ class CryptoAlgo(object):
if self.cipher is not None:
self.bs = self.cipher.block_size // 8
- def encrypt(self, data, key):
- iv = os.urandom(self.bs)
- encryptor = Cipher(self.cipher(key), self.mode(iv),
- default_backend()).encryptor()
- return iv + encryptor.update(data) + encryptor.finalize()
-
- def decrypt(self, data, key, icv=None):
- iv = data[:self.bs]
- ct = data[self.bs:]
- decryptor = Cipher(algorithms.AES(key),
- modes.CBC(iv),
- default_backend()).decryptor()
- return decryptor.update(ct) + decryptor.finalize()
+ if self.name == 'AES-GCM-16ICV':
+ self.iv_len = GCM_IV_SIZE
+ else:
+ self.iv_len = self.bs
+
+ def encrypt(self, data, key, aad=None):
+ iv = os.urandom(self.iv_len)
+ if aad is None:
+ encryptor = Cipher(self.cipher(key), self.mode(iv),
+ default_backend()).encryptor()
+ return iv + encryptor.update(data) + encryptor.finalize()
+ else:
+ salt = key[-SALT_SIZE:]
+ nonce = salt + iv
+ encryptor = Cipher(self.cipher(key[:-SALT_SIZE]), self.mode(nonce),
+ default_backend()).encryptor()
+ encryptor.authenticate_additional_data(aad)
+ data = encryptor.update(data) + encryptor.finalize()
+ data += encryptor.tag[:GCM_ICV_SIZE]
+ return iv + data
+
+ def decrypt(self, data, key, aad=None, icv=None):
+ if aad is None:
+ iv = data[:self.iv_len]
+ ct = data[self.iv_len:]
+ decryptor = Cipher(algorithms.AES(key),
+ self.mode(iv),
+ default_backend()).decryptor()
+ return decryptor.update(ct) + decryptor.finalize()
+ else:
+ salt = key[-SALT_SIZE:]
+ nonce = salt + data[:GCM_IV_SIZE]
+ ct = data[GCM_IV_SIZE:]
+ key = key[:-SALT_SIZE]
+ decryptor = Cipher(algorithms.AES(key),
+ self.mode(nonce, icv, len(icv)),
+ default_backend()).decryptor()
+ decryptor.authenticate_additional_data(aad)
+ pt = decryptor.update(ct) + decryptor.finalize()
+ pad_len = pt[-1] + 1
+ return pt[:-pad_len]
def pad(self, data):
pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
@@ -241,7 +272,7 @@ class IKEv2SA(object):
return r
def calc_prf(self, prf, key, data):
- h = self.ike_integ_alg.mac(key, prf, backend=default_backend())
+ h = self.ike_prf_alg.mac(key, prf, backend=default_backend())
h.update(data)
return h.finalize()
@@ -258,10 +289,16 @@ class IKEv2SA(object):
encr_key_len = self.ike_crypto_key_len
tr_prf_key_len = self.ike_prf_alg.key_len
integ_key_len = self.ike_integ_alg.key_len
+ if integ_key_len == 0:
+ salt_size = 4
+ else:
+ salt_size = 0
+
l = (prf_key_trunc +
integ_key_len * 2 +
encr_key_len * 2 +
- tr_prf_key_len * 2)
+ tr_prf_key_len * 2 +
+ salt_size * 2)
keymat = self.calc_prfplus(prf, self.skeyseed, s, l)
pos = 0
@@ -273,10 +310,10 @@ class IKEv2SA(object):
self.sk_ar = keymat[pos:pos+integ_key_len]
pos += integ_key_len
- self.sk_ei = keymat[pos:pos+encr_key_len]
- pos += encr_key_len
- self.sk_er = keymat[pos:pos+encr_key_len]
- pos += encr_key_len
+ self.sk_ei = keymat[pos:pos+encr_key_len + salt_size]
+ pos += encr_key_len + salt_size
+ self.sk_er = keymat[pos:pos+encr_key_len + salt_size]
+ pos += encr_key_len + salt_size
self.sk_pi = keymat[pos:pos+tr_prf_key_len]
pos += tr_prf_key_len
@@ -303,9 +340,9 @@ class IKEv2SA(object):
else:
raise TypeError('unknown auth method type!')
- def encrypt(self, data):
+ def encrypt(self, data, aad=None):
data = self.ike_crypto_alg.pad(data)
- return self.ike_crypto_alg.encrypt(data, self.my_cryptokey)
+ return self.ike_crypto_alg.encrypt(data, self.my_cryptokey, aad)
@property
def peer_authkey(self):
@@ -355,17 +392,23 @@ class IKEv2SA(object):
h.update(data)
return h.finalize()
- def decrypt(self, data):
- return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey)
+ def decrypt(self, data, aad=None, icv=None):
+ return self.ike_crypto_alg.decrypt(data, self.peer_cryptokey, aad, icv)
def hmac_and_decrypt(self, ike):
ep = ike[ikev2.IKEv2_payload_Encrypted]
- self.verify_hmac(raw(ike))
- integ_trunc = self.ike_integ_alg.trunc_len
+ if self.ike_crypto == 'AES-GCM-16ICV':
+ aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
+ ct = ep.load[:-GCM_ICV_SIZE]
+ tag = ep.load[-GCM_ICV_SIZE:]
+ return self.decrypt(ct, raw(ike)[:aad_len], tag)
+ else:
+ self.verify_hmac(raw(ike))
+ integ_trunc = self.ike_integ_alg.trunc_len
- # remove ICV and decrypt payload
- ct = ep.load[:-integ_trunc]
- return self.decrypt(ct)
+ # remove ICV and decrypt payload
+ ct = ep.load[:-integ_trunc]
+ return self.decrypt(ct)
def generate_ts(self):
c = self.child_sas[0]
@@ -388,7 +431,7 @@ class IKEv2SA(object):
if integ not in AUTH_ALGOS:
raise TypeError('unsupported auth algo %r' % integ)
- self.ike_integ = integ
+ self.ike_integ = None if integ == 'NULL' else integ
self.ike_integ_alg = AUTH_ALGOS[integ]
if prf not in PRF_ALGOS:
@@ -411,7 +454,7 @@ class IKEv2SA(object):
self.esp_integ_alg = AUTH_ALGOS[integ]
def crypto_attr(self, key_len):
- if self.ike_crypto in ['AES-CBC', 'AES-GCM']:
+ if self.ike_crypto in ['AES-CBC', 'AES-GCM-16ICV']:
return (0x800e << 16 | key_len << 3, 12)
else:
raise Exception('unsupported attribute type')
@@ -542,24 +585,47 @@ class TemplateResponder(VppTestCase):
number_of_TSs=len(tsr),
traffic_selector=tsr) /
ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
- encr = self.sa.encrypt(raw(plain))
-
- trunc_len = self.sa.ike_integ_alg.trunc_len
- plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
- tlen = plen + len(ikev2.IKEv2())
-
- sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
- length=plen, load=encr)
- sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
- length=tlen, flags='Initiator', exch_type='IKE_AUTH', id=1))
- sa_auth /= sk_p
-
- integ_data = raw(sa_auth)
- hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
- self.sa.my_authkey, integ_data)
- sa_auth = sa_auth / Raw(hmac_data[:trunc_len])
- assert(len(sa_auth) == tlen)
+ if self.sa.ike_crypto == 'AES-GCM-16ICV':
+ data = self.sa.ike_crypto_alg.pad(raw(plain))
+ plen = len(data) + GCM_IV_SIZE + GCM_ICV_SIZE +\
+ len(ikev2.IKEv2_payload_Encrypted())
+ tlen = plen + len(ikev2.IKEv2())
+
+ # prepare aad data
+ sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
+ length=plen)
+ sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi, id=1,
+ length=tlen, flags='Initiator', exch_type='IKE_AUTH'))
+ sa_auth /= sk_p
+
+ encr = self.sa.encrypt(raw(plain), raw(sa_auth))
+ sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
+ length=plen, load=encr)
+ sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi, id=1,
+ length=tlen, flags='Initiator', exch_type='IKE_AUTH'))
+ sa_auth /= sk_p
+ else:
+ encr = self.sa.encrypt(raw(plain))
+ trunc_len = self.sa.ike_integ_alg.trunc_len
+ plen = len(encr) + len(ikev2.IKEv2_payload_Encrypted()) + trunc_len
+ tlen = plen + len(ikev2.IKEv2())
+
+ sk_p = ikev2.IKEv2_payload_Encrypted(next_payload='IDi',
+ length=plen, load=encr)
+ sa_auth = (ikev2.IKEv2(init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi, id=1,
+ length=tlen, flags='Initiator', exch_type='IKE_AUTH'))
+ sa_auth /= sk_p
+
+ integ_data = raw(sa_auth)
+ hmac_data = self.sa.compute_hmac(self.sa.ike_integ_alg.mod(),
+ self.sa.my_authkey, integ_data)
+ sa_auth = sa_auth / Raw(hmac_data[:trunc_len])
+
+ assert(len(sa_auth) == tlen)
packet = self.create_ike_msg(self.pg0, sa_auth, self.sa.sport,
self.sa.dport, self.sa.natt)
self.pg0.add_stream(packet)
@@ -590,8 +656,9 @@ class TemplateResponder(VppTestCase):
sa = ih[ikev2.IKEv2_payload_SA]
self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
- except AttributeError as e:
+ except IndexError as e:
self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
+ self.logger.error(ih.show())
raise
self.sa.complete_dh_data()
self.sa.calc_keys()
@@ -783,5 +850,16 @@ class TestAES_CBC_128_SHA256_128_MODP3072_ESP_AES_GCM_16\
'ike-dh': '3072MODPgr'})
+class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
+ """
+ IKE:AES_GCM_16_256
+ """
+ def config_tc(self):
+ self.config_params({
+ 'ike-crypto': ('AES-GCM-16ICV', 32),
+ 'ike-integ': 'NULL',
+ 'ike-dh': '2048MODPgr'})
+
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)