aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFilip Tehlar <ftehlar@cisco.com>2020-10-30 05:28:11 +0000
committerBeno�t Ganne <bganne@cisco.com>2020-11-04 08:07:55 +0000
commit68ad6258374201ba8f0dc052e6f44d6250555249 (patch)
tree3b984145b8b4fcf9750adcd6f6298b7c3bcfe20a
parent761f8f0eaaf43f38fdd9d160ba19ff833de7d210 (diff)
ikev2: fix reply during rekey
Type: fix Change-Id: If87f4b8ae92508215fe91178958fe2ddb91e5a35 Signed-off-by: Filip Tehlar <ftehlar@cisco.com>
-rw-r--r--src/plugins/ikev2/ikev2.c2
-rw-r--r--src/plugins/ikev2/test/test_ikev2.py234
2 files changed, 192 insertions, 44 deletions
diff --git a/src/plugins/ikev2/ikev2.c b/src/plugins/ikev2/ikev2.c
index 5103da0486c..ae48b44f36b 100644
--- a/src/plugins/ikev2/ikev2.c
+++ b/src/plugins/ikev2/ikev2.c
@@ -3155,7 +3155,7 @@ ikev2_node_internal (vlib_main_t * vm,
ikev2_create_tunnel_interface (vm, sa0, child, p[0],
child - sa0->childs, 1);
}
- if (sa0->is_initiator)
+ if (ike_hdr_is_response (ike0))
{
vec_free (sa0->rekey);
}
diff --git a/src/plugins/ikev2/test/test_ikev2.py b/src/plugins/ikev2/test/test_ikev2.py
index ec68658f564..91cec8e9a62 100644
--- a/src/plugins/ikev2/test/test_ikev2.py
+++ b/src/plugins/ikev2/test/test_ikev2.py
@@ -151,10 +151,28 @@ PRF_ALGOS = {
hashes.SHA256, 32),
}
+CRYPTO_IDS = {
+ 12: 'AES-CBC',
+ 20: 'AES-GCM-16ICV',
+}
+
+INTEG_IDS = {
+ 2: 'HMAC-SHA1-96',
+ 12: 'SHA2-256-128',
+ 13: 'SHA2-384-192',
+ 14: 'SHA2-512-256',
+}
+
class IKEv2ChildSA(object):
- def __init__(self, local_ts, remote_ts, spi=None):
- self.spi = spi or os.urandom(4)
+ def __init__(self, local_ts, remote_ts, is_initiator):
+ spi = os.urandom(4)
+ if is_initiator:
+ self.ispi = spi
+ self.rspi = None
+ else:
+ self.rspi = spi
+ self.ispi = None
self.local_ts = local_ts
self.remote_ts = remote_ts
@@ -193,7 +211,8 @@ class IKEv2SA(object):
self.rspi = spi
self.ispi = 8 * b'\x00'
self.r_nonce = nonce
- self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
+ self.child_sas = [IKEv2ChildSA(local_ts, remote_ts,
+ self.is_initiator)]
def new_msg_id(self):
self.msg_id += 1
@@ -559,9 +578,11 @@ class IkePeer(VppTestCase):
self.config_tc()
self.p.add_vpp_config()
self.assertIsNotNone(self.p.query_vpp_config())
- self.sa.generate_dh_data()
+ if self.sa.is_initiator:
+ self.sa.generate_dh_data()
self.vapi.cli('ikev2 set logging level 4')
self.vapi.cli('event-lo clear')
+ self.vapi.cli('ikev2 dpd disable')
def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
use_ip6=False):
@@ -641,16 +662,30 @@ class IkePeer(VppTestCase):
assert(len(res) == tlen)
return res
- def verify_ipsec_sas(self):
+ def verify_ipsec_sas(self, is_rekey=False):
sas = self.vapi.ipsec_sa_dump()
- self.assertEqual(len(sas), 2)
+ if is_rekey:
+ # after rekey there is a short period of time in which old
+ # inbound SA is still present
+ sa_count = 3
+ else:
+ sa_count = 2
+ self.assertEqual(len(sas), sa_count)
e = VppEnum.vl_api_ipsec_sad_flags_t
if self.sa.is_initiator:
- sa0 = sas[0].entry
- sa1 = sas[1].entry
+ if is_rekey:
+ sa0 = sas[0].entry
+ sa1 = sas[2].entry
+ else:
+ sa0 = sas[0].entry
+ sa1 = sas[1].entry
else:
- sa1 = sas[0].entry
- sa0 = sas[1].entry
+ if is_rekey:
+ sa0 = sas[2].entry
+ sa1 = sas[0].entry
+ else:
+ sa1 = sas[0].entry
+ sa0 = sas[1].entry
c = self.sa.child_sas[0]
@@ -738,6 +773,8 @@ class IkePeer(VppTestCase):
self.verify_keymat(csa.keys, c, 'sk_ar')
self.verify_keymat(csa.keys, c, 'sk_ei')
self.verify_keymat(csa.keys, c, 'sk_er')
+ self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi)
+ self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi)
tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
tsi = tsi[0]
@@ -888,9 +925,23 @@ class TemplateInitiator(IkePeer):
self.p.ike_transforms['dh_group'])
self.verify_nat_detection(packet)
+ self.sa.set_ike_props(
+ crypto='AES-GCM-16ICV', crypto_key_len=32,
+ integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr')
+ self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
+ integ='SHA2-256-128')
+ self.sa.generate_dh_data()
self.sa.complete_dh_data()
self.sa.calc_keys()
+ def update_esp_transforms(self, trans, sa):
+ while trans:
+ if trans.transform_type == 1: # ecryption
+ sa.esp_crypto = CRYPTO_IDS[trans.transform_id]
+ elif trans.transform_type == 3: # integrity
+ sa.esp_integ = INTEG_IDS[trans.transform_id]
+ trans = trans.payload
+
def verify_sa_auth_req(self, packet):
ih = self.get_ike_header(packet)
self.assertEqual(ih.resp_SPI, self.sa.rspi)
@@ -908,6 +959,11 @@ class TemplateInitiator(IkePeer):
idr = ikev2.IKEv2_payload_IDr(idi.payload)
self.assertEqual(idi.load, self.sa.i_id)
self.assertEqual(idr.load, self.sa.r_id)
+ prop = idi[ikev2.IKEv2_payload_Proposal]
+ c = self.sa.child_sas[0]
+ c.ispi = prop.SPI
+ self.update_esp_transforms(
+ prop[ikev2.IKEv2_payload_Transform], self.sa)
def send_init_response(self):
tr_attr = self.sa.ike_crypto_attr()
@@ -961,8 +1017,9 @@ class TemplateInitiator(IkePeer):
transform_type='Extended Sequence Number',
transform_id='ESN'))
+ c = self.sa.child_sas[0]
props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
- SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
+ SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans))
tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
@@ -1103,8 +1160,9 @@ class TemplateResponder(IkePeer):
capture = self.pg0.get_capture(1)
self.verify_sa_init(capture[0])
- def send_sa_auth(self):
+ def generate_auth_payload(self, last_payload=None, is_rekey=False):
tr_attr = self.sa.esp_crypto_attr()
+ last_payload = last_payload or 'Notify'
trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
transform_id=self.sa.esp_crypto, length=tr_attr[1],
key_length=tr_attr[0]) /
@@ -1117,32 +1175,45 @@ class TemplateResponder(IkePeer):
transform_type='Extended Sequence Number',
transform_id='ESN'))
+ c = self.sa.child_sas[0]
props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
- SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
+ SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans))
tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
- plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
- IDtype=self.sa.id_type, load=self.sa.i_id) /
- ikev2.IKEv2_payload_IDr(next_payload='AUTH',
- IDtype=self.sa.id_type, load=self.sa.r_id) /
- ikev2.IKEv2_payload_AUTH(next_payload='SA',
+ plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA',
auth_type=AuthMethod.value(self.sa.auth_method),
load=self.sa.auth_data) /
ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
ikev2.IKEv2_payload_TSi(next_payload='TSr',
- number_of_TSs=len(tsi),
- traffic_selector=tsi) /
- ikev2.IKEv2_payload_TSr(next_payload='Notify',
- number_of_TSs=len(tsr),
- traffic_selector=tsr) /
- ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
+ number_of_TSs=len(tsi), traffic_selector=tsi) /
+ ikev2.IKEv2_payload_TSr(next_payload=last_payload,
+ number_of_TSs=len(tsr), traffic_selector=tsr))
+
+ if is_rekey:
+ first_payload = 'Nonce'
+ plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce,
+ next_payload='SA') / plain /
+ ikev2.IKEv2_payload_Notify(type='REKEY_SA',
+ proto='ESP', SPI=c.ispi))
+ else:
+ first_payload = 'IDi'
+ ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
+ IDtype=self.sa.id_type, load=self.sa.i_id) /
+ ikev2.IKEv2_payload_IDr(next_payload='AUTH',
+ IDtype=self.sa.id_type, load=self.sa.r_id))
+ plain = ids / plain
+ return plain, first_payload
+ def send_sa_auth(self):
+ plain, first_payload = self.generate_auth_payload(
+ last_payload='Notify')
+ plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')
header = ikev2.IKEv2(
init_SPI=self.sa.ispi,
resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
flags='Initiator', exch_type='IKE_AUTH')
- ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
+ ike_msg = self.encrypt_ike_msg(header, plain, first_payload)
packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
self.sa.dport, self.sa.natt, self.ip6)
self.pg0.add_stream(packet)
@@ -1178,6 +1249,10 @@ class TemplateResponder(IkePeer):
self.verify_udp(udp)
self.assertEqual(ike.id, self.sa.msg_id)
plain = self.sa.hmac_and_decrypt(ike)
+ idr = ikev2.IKEv2_payload_IDr(plain)
+ prop = idr[ikev2.IKEv2_payload_Proposal]
+ self.assertEqual(prop.SPIsize, 4)
+ self.sa.child_sas[0].rspi = prop.SPI
self.sa.calc_child_keys()
def test_responder(self):
@@ -1264,24 +1339,25 @@ class Ikev2Params(object):
auth_data=auth_data,
local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
- ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
- params['ike-crypto']
- ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
- params['ike-integ']
- ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
- params['ike-dh']
-
- esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
- params['esp-crypto']
- esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
- params['esp-integ']
-
- self.sa.set_ike_props(
- crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
- integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
- self.sa.set_esp_props(
- crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
- integ=esp_integ)
+ if is_init:
+ ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\
+ params['ike-crypto']
+ ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
+ params['ike-integ']
+ ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
+ params['ike-dh']
+
+ esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
+ params['esp-crypto']
+ esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\
+ params['esp-integ']
+
+ self.sa.set_ike_props(
+ crypto=ike_crypto[0], crypto_key_len=ike_crypto[1],
+ integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh)
+ self.sa.set_esp_props(
+ crypto=esp_crypto[0], crypto_key_len=esp_crypto[1],
+ integ=esp_integ)
class TestApi(VppTestCase):
@@ -1491,6 +1567,44 @@ class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
'integ_alg': 12}})
+class TestInitiatorRekey(TestInitiatorPsk):
+ """ test ikev2 initiator - rekey """
+
+ def rekey_from_initiator(self):
+ ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
+ self.pg0.enable_capture()
+ self.pg_start()
+ self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
+ capture = self.pg0.get_capture(1)
+ ih = self.get_ike_header(capture[0])
+ self.assertEqual(ih.exch_type, 36) # CHILD_SA
+ self.assertNotIn('Response', ih.flags)
+ self.assertIn('Initiator', ih.flags)
+ plain = self.sa.hmac_and_decrypt(ih)
+ sa = ikev2.IKEv2_payload_SA(plain)
+ prop = sa[ikev2.IKEv2_payload_Proposal]
+ nonce = sa[ikev2.IKEv2_payload_Nonce]
+ self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
+ self.sa.r_nonce = self.sa.i_nonce
+ # update new responder SPI
+ self.sa.child_sas[0].ispi = prop.SPI
+ self.sa.child_sas[0].rspi = prop.SPI
+ self.sa.calc_child_keys()
+ header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ flags='Response', exch_type=36,
+ id=ih.id, next_payload='Encrypted')
+ resp = self.encrypt_ike_msg(header, sa, 'SA')
+ packet = self.create_packet(self.pg0, resp, self.sa.sport,
+ self.sa.dport, self.sa.natt, self.ip6)
+ self.send_and_assert_no_replies(self.pg0, packet)
+
+ def test_initiator(self):
+ super(TestInitiatorRekey, self).test_initiator()
+ self.rekey_from_initiator()
+ self.verify_ike_sas()
+ self.verify_ipsec_sas(is_rekey=True)
+
+
class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
""" test ikev2 initiator - delete IKE SA from responder """
@@ -1529,6 +1643,40 @@ class TestResponderPsk(TemplateResponder, Ikev2Params):
self.config_params()
+class TestResponderRekey(TestResponderPsk):
+ """ test ikev2 responder - rekey """
+
+ def rekey_from_initiator(self):
+ sa, first_payload = self.generate_auth_payload(is_rekey=True)
+ header = ikev2.IKEv2(
+ init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
+ flags='Initiator', exch_type='CREATE_CHILD_SA')
+
+ ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
+ packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
+ self.sa.dport, self.sa.natt, self.ip6)
+ self.pg0.add_stream(packet)
+ self.pg0.enable_capture()
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ ih = self.get_ike_header(capture[0])
+ plain = self.sa.hmac_and_decrypt(ih)
+ sa = ikev2.IKEv2_payload_SA(plain)
+ prop = sa[ikev2.IKEv2_payload_Proposal]
+ nonce = sa[ikev2.IKEv2_payload_Nonce]
+ self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
+ # update new responder SPI
+ self.sa.child_sas[0].rspi = prop.SPI
+
+ def test_responder(self):
+ super(TestResponderRekey, self).test_responder()
+ self.rekey_from_initiator()
+ self.sa.calc_child_keys()
+ self.verify_ike_sas()
+ self.verify_ipsec_sas(is_rekey=True)
+
+
class TestResponderRsaSign(TemplateResponder, Ikev2Params):
""" test ikev2 responder - cert based auth """
def config_tc(self):