From f6deabd4fc887b12d2335111e58354a87390813c Mon Sep 17 00:00:00 2001 From: Filip Tehlar Date: Fri, 30 Oct 2020 05:28:11 +0000 Subject: ikev2: fix reply during rekey Type: fix Change-Id: If87f4b8ae92508215fe91178958fe2ddb91e5a35 Signed-off-by: Filip Tehlar (cherry picked from commit 68ad6258374201ba8f0dc052e6f44d6250555249) --- src/plugins/ikev2/ikev2.c | 2 +- src/plugins/ikev2/test/test_ikev2.py | 234 ++++++++++++++++++++++++++++------- 2 files changed, 192 insertions(+), 44 deletions(-) (limited to 'src') diff --git a/src/plugins/ikev2/ikev2.c b/src/plugins/ikev2/ikev2.c index 0e6b1c61f88..206e1db0e2b 100644 --- a/src/plugins/ikev2/ikev2.c +++ b/src/plugins/ikev2/ikev2.c @@ -3152,7 +3152,7 @@ ikev2_node_internal (vlib_main_t * vm, ikev2_create_tunnel_interface (vm, sa0, child, p[0], child - sa0->childs, 1); } - if (sa0->is_initiator) + if (ike_hdr_is_response (ike0)) { vec_free (sa0->rekey); } diff --git a/src/plugins/ikev2/test/test_ikev2.py b/src/plugins/ikev2/test/test_ikev2.py index ec68658f564..91cec8e9a62 100644 --- a/src/plugins/ikev2/test/test_ikev2.py +++ b/src/plugins/ikev2/test/test_ikev2.py @@ -151,10 +151,28 @@ PRF_ALGOS = { hashes.SHA256, 32), } +CRYPTO_IDS = { + 12: 'AES-CBC', + 20: 'AES-GCM-16ICV', +} + +INTEG_IDS = { + 2: 'HMAC-SHA1-96', + 12: 'SHA2-256-128', + 13: 'SHA2-384-192', + 14: 'SHA2-512-256', +} + class IKEv2ChildSA(object): - def __init__(self, local_ts, remote_ts, spi=None): - self.spi = spi or os.urandom(4) + def __init__(self, local_ts, remote_ts, is_initiator): + spi = os.urandom(4) + if is_initiator: + self.ispi = spi + self.rspi = None + else: + self.rspi = spi + self.ispi = None self.local_ts = local_ts self.remote_ts = remote_ts @@ -193,7 +211,8 @@ class IKEv2SA(object): self.rspi = spi self.ispi = 8 * b'\x00' self.r_nonce = nonce - self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)] + self.child_sas = [IKEv2ChildSA(local_ts, remote_ts, + self.is_initiator)] def new_msg_id(self): self.msg_id += 1 @@ -559,9 +578,11 @@ class IkePeer(VppTestCase): self.config_tc() self.p.add_vpp_config() self.assertIsNotNone(self.p.query_vpp_config()) - self.sa.generate_dh_data() + if self.sa.is_initiator: + self.sa.generate_dh_data() self.vapi.cli('ikev2 set logging level 4') self.vapi.cli('event-lo clear') + self.vapi.cli('ikev2 dpd disable') def create_packet(self, src_if, msg, sport=500, dport=500, natt=False, use_ip6=False): @@ -641,16 +662,30 @@ class IkePeer(VppTestCase): assert(len(res) == tlen) return res - def verify_ipsec_sas(self): + def verify_ipsec_sas(self, is_rekey=False): sas = self.vapi.ipsec_sa_dump() - self.assertEqual(len(sas), 2) + if is_rekey: + # after rekey there is a short period of time in which old + # inbound SA is still present + sa_count = 3 + else: + sa_count = 2 + self.assertEqual(len(sas), sa_count) e = VppEnum.vl_api_ipsec_sad_flags_t if self.sa.is_initiator: - sa0 = sas[0].entry - sa1 = sas[1].entry + if is_rekey: + sa0 = sas[0].entry + sa1 = sas[2].entry + else: + sa0 = sas[0].entry + sa1 = sas[1].entry else: - sa1 = sas[0].entry - sa0 = sas[1].entry + if is_rekey: + sa0 = sas[2].entry + sa1 = sas[0].entry + else: + sa1 = sas[0].entry + sa0 = sas[1].entry c = self.sa.child_sas[0] @@ -738,6 +773,8 @@ class IkePeer(VppTestCase): self.verify_keymat(csa.keys, c, 'sk_ar') self.verify_keymat(csa.keys, c, 'sk_ei') self.verify_keymat(csa.keys, c, 'sk_er') + self.assertEqual(csa.i_spi.to_bytes(4, 'big'), c.ispi) + self.assertEqual(csa.r_spi.to_bytes(4, 'big'), c.rspi) tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4) tsi = tsi[0] @@ -888,9 +925,23 @@ class TemplateInitiator(IkePeer): self.p.ike_transforms['dh_group']) self.verify_nat_detection(packet) + self.sa.set_ike_props( + crypto='AES-GCM-16ICV', crypto_key_len=32, + integ='NULL', prf='PRF_HMAC_SHA2_256', dh='3072MODPgr') + self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32, + integ='SHA2-256-128') + self.sa.generate_dh_data() self.sa.complete_dh_data() self.sa.calc_keys() + def update_esp_transforms(self, trans, sa): + while trans: + if trans.transform_type == 1: # ecryption + sa.esp_crypto = CRYPTO_IDS[trans.transform_id] + elif trans.transform_type == 3: # integrity + sa.esp_integ = INTEG_IDS[trans.transform_id] + trans = trans.payload + def verify_sa_auth_req(self, packet): ih = self.get_ike_header(packet) self.assertEqual(ih.resp_SPI, self.sa.rspi) @@ -908,6 +959,11 @@ class TemplateInitiator(IkePeer): idr = ikev2.IKEv2_payload_IDr(idi.payload) self.assertEqual(idi.load, self.sa.i_id) self.assertEqual(idr.load, self.sa.r_id) + prop = idi[ikev2.IKEv2_payload_Proposal] + c = self.sa.child_sas[0] + c.ispi = prop.SPI + self.update_esp_transforms( + prop[ikev2.IKEv2_payload_Transform], self.sa) def send_init_response(self): tr_attr = self.sa.ike_crypto_attr() @@ -961,8 +1017,9 @@ class TemplateInitiator(IkePeer): transform_type='Extended Sequence Number', transform_id='ESN')) + c = self.sa.child_sas[0] props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP', - SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans)) + SPIsize=4, SPI=c.rspi, trans_nb=4, trans=trans)) tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4) plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr', @@ -1103,8 +1160,9 @@ class TemplateResponder(IkePeer): capture = self.pg0.get_capture(1) self.verify_sa_init(capture[0]) - def send_sa_auth(self): + def generate_auth_payload(self, last_payload=None, is_rekey=False): tr_attr = self.sa.esp_crypto_attr() + last_payload = last_payload or 'Notify' trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption', transform_id=self.sa.esp_crypto, length=tr_attr[1], key_length=tr_attr[0]) / @@ -1117,32 +1175,45 @@ class TemplateResponder(IkePeer): transform_type='Extended Sequence Number', transform_id='ESN')) + c = self.sa.child_sas[0] props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP', - SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans)) + SPIsize=4, SPI=c.ispi, trans_nb=4, trans=trans)) tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4) - plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr', - IDtype=self.sa.id_type, load=self.sa.i_id) / - ikev2.IKEv2_payload_IDr(next_payload='AUTH', - IDtype=self.sa.id_type, load=self.sa.r_id) / - ikev2.IKEv2_payload_AUTH(next_payload='SA', + plain = (ikev2.IKEv2_payload_AUTH(next_payload='SA', auth_type=AuthMethod.value(self.sa.auth_method), load=self.sa.auth_data) / ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) / ikev2.IKEv2_payload_TSi(next_payload='TSr', - number_of_TSs=len(tsi), - traffic_selector=tsi) / - ikev2.IKEv2_payload_TSr(next_payload='Notify', - number_of_TSs=len(tsr), - traffic_selector=tsr) / - ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT')) + number_of_TSs=len(tsi), traffic_selector=tsi) / + ikev2.IKEv2_payload_TSr(next_payload=last_payload, + number_of_TSs=len(tsr), traffic_selector=tsr)) + + if is_rekey: + first_payload = 'Nonce' + plain = (ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce, + next_payload='SA') / plain / + ikev2.IKEv2_payload_Notify(type='REKEY_SA', + proto='ESP', SPI=c.ispi)) + else: + first_payload = 'IDi' + ids = (ikev2.IKEv2_payload_IDi(next_payload='IDr', + IDtype=self.sa.id_type, load=self.sa.i_id) / + ikev2.IKEv2_payload_IDr(next_payload='AUTH', + IDtype=self.sa.id_type, load=self.sa.r_id)) + plain = ids / plain + return plain, first_payload + def send_sa_auth(self): + plain, first_payload = self.generate_auth_payload( + last_payload='Notify') + plain = plain / ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT') header = ikev2.IKEv2( init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(), flags='Initiator', exch_type='IKE_AUTH') - ike_msg = self.encrypt_ike_msg(header, plain, 'IDi') + ike_msg = self.encrypt_ike_msg(header, plain, first_payload) packet = self.create_packet(self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6) self.pg0.add_stream(packet) @@ -1178,6 +1249,10 @@ class TemplateResponder(IkePeer): self.verify_udp(udp) self.assertEqual(ike.id, self.sa.msg_id) plain = self.sa.hmac_and_decrypt(ike) + idr = ikev2.IKEv2_payload_IDr(plain) + prop = idr[ikev2.IKEv2_payload_Proposal] + self.assertEqual(prop.SPIsize, 4) + self.sa.child_sas[0].rspi = prop.SPI self.sa.calc_child_keys() def test_responder(self): @@ -1264,24 +1339,25 @@ class Ikev2Params(object): auth_data=auth_data, local_ts=self.p.remote_ts, remote_ts=self.p.local_ts) - ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\ - params['ike-crypto'] - ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\ - params['ike-integ'] - ike_dh = '2048MODPgr' if 'ike-dh' not in params else\ - params['ike-dh'] - - esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\ - params['esp-crypto'] - esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\ - params['esp-integ'] - - self.sa.set_ike_props( - crypto=ike_crypto[0], crypto_key_len=ike_crypto[1], - integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh) - self.sa.set_esp_props( - crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], - integ=esp_integ) + if is_init: + ike_crypto = ('AES-CBC', 32) if 'ike-crypto' not in params else\ + params['ike-crypto'] + ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\ + params['ike-integ'] + ike_dh = '2048MODPgr' if 'ike-dh' not in params else\ + params['ike-dh'] + + esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\ + params['esp-crypto'] + esp_integ = 'HMAC-SHA1-96' if 'esp-integ' not in params else\ + params['esp-integ'] + + self.sa.set_ike_props( + crypto=ike_crypto[0], crypto_key_len=ike_crypto[1], + integ=ike_integ, prf='PRF_HMAC_SHA2_256', dh=ike_dh) + self.sa.set_esp_props( + crypto=esp_crypto[0], crypto_key_len=esp_crypto[1], + integ=esp_integ) class TestApi(VppTestCase): @@ -1491,6 +1567,44 @@ class TestInitiatorPsk(TemplateInitiator, Ikev2Params): 'integ_alg': 12}}) +class TestInitiatorRekey(TestInitiatorPsk): + """ test ikev2 initiator - rekey """ + + def rekey_from_initiator(self): + ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little') + self.pg0.enable_capture() + self.pg_start() + self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi) + capture = self.pg0.get_capture(1) + ih = self.get_ike_header(capture[0]) + self.assertEqual(ih.exch_type, 36) # CHILD_SA + self.assertNotIn('Response', ih.flags) + self.assertIn('Initiator', ih.flags) + plain = self.sa.hmac_and_decrypt(ih) + sa = ikev2.IKEv2_payload_SA(plain) + prop = sa[ikev2.IKEv2_payload_Proposal] + nonce = sa[ikev2.IKEv2_payload_Nonce] + self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load + self.sa.r_nonce = self.sa.i_nonce + # update new responder SPI + self.sa.child_sas[0].ispi = prop.SPI + self.sa.child_sas[0].rspi = prop.SPI + self.sa.calc_child_keys() + header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi, + flags='Response', exch_type=36, + id=ih.id, next_payload='Encrypted') + resp = self.encrypt_ike_msg(header, sa, 'SA') + packet = self.create_packet(self.pg0, resp, self.sa.sport, + self.sa.dport, self.sa.natt, self.ip6) + self.send_and_assert_no_replies(self.pg0, packet) + + def test_initiator(self): + super(TestInitiatorRekey, self).test_initiator() + self.rekey_from_initiator() + self.verify_ike_sas() + self.verify_ipsec_sas(is_rekey=True) + + class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params): """ test ikev2 initiator - delete IKE SA from responder """ @@ -1529,6 +1643,40 @@ class TestResponderPsk(TemplateResponder, Ikev2Params): self.config_params() +class TestResponderRekey(TestResponderPsk): + """ test ikev2 responder - rekey """ + + def rekey_from_initiator(self): + sa, first_payload = self.generate_auth_payload(is_rekey=True) + header = ikev2.IKEv2( + init_SPI=self.sa.ispi, + resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(), + flags='Initiator', exch_type='CREATE_CHILD_SA') + + ike_msg = self.encrypt_ike_msg(header, sa, first_payload) + packet = self.create_packet(self.pg0, ike_msg, self.sa.sport, + self.sa.dport, self.sa.natt, self.ip6) + self.pg0.add_stream(packet) + self.pg0.enable_capture() + self.pg_start() + capture = self.pg0.get_capture(1) + ih = self.get_ike_header(capture[0]) + plain = self.sa.hmac_and_decrypt(ih) + sa = ikev2.IKEv2_payload_SA(plain) + prop = sa[ikev2.IKEv2_payload_Proposal] + nonce = sa[ikev2.IKEv2_payload_Nonce] + self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load + # update new responder SPI + self.sa.child_sas[0].rspi = prop.SPI + + def test_responder(self): + super(TestResponderRekey, self).test_responder() + self.rekey_from_initiator() + self.sa.calc_child_keys() + self.verify_ike_sas() + self.verify_ipsec_sas(is_rekey=True) + + class TestResponderRsaSign(TemplateResponder, Ikev2Params): """ test ikev2 responder - cert based auth """ def config_tc(self): -- cgit 1.2.3-korg