diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/test_ikev2.py | 176 |
1 files changed, 152 insertions, 24 deletions
diff --git a/test/test_ikev2.py b/test/test_ikev2.py index 30ee2b98110..d2b4c691f85 100644 --- a/test/test_ikev2.py +++ b/test/test_ikev2.py @@ -361,14 +361,21 @@ class IKEv2SA(object): h.update(data) return h.finalize() - def calc_keys(self): + def calc_keys(self, sk_d=None): prf = self.ike_prf_alg.mod() - # SKEYSEED = prf(Ni | Nr, g^ir) - s = self.i_nonce + self.r_nonce - self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret) + if sk_d is None: + # SKEYSEED = prf(Ni | Nr, g^ir) + self.skeyseed = self.calc_prf( + prf, self.i_nonce + self.r_nonce, self.dh_shared_secret + ) + else: + # SKEYSEED = prf(SK_d (old), g^ir (new) | Ni | Nr) + self.skeyseed = self.calc_prf( + prf, sk_d, self.dh_shared_secret + self.i_nonce + self.r_nonce + ) # calculate S = Ni | Nr | SPIi SPIr - s = s + self.ispi + self.rspi + s = self.i_nonce + self.r_nonce + self.ispi + self.rspi prf_key_trunc = self.ike_prf_alg.trunc_len encr_key_len = self.ike_crypto_key_len @@ -585,6 +592,45 @@ class IKEv2SA(object): digest.update(data) return digest.finalize() + def clone(self, test, **kwargs): + if "spi" not in kwargs: + kwargs["spi"] = self.ispi if self.is_initiator else self.rspi + if "nonce" not in kwargs: + kwargs["nonce"] = self.i_nonce if self.is_initiator else self.r_nonce + if self.child_sas: + if "local_ts" not in kwargs: + kwargs["local_ts"] = self.child_sas[0].local_ts + if "remote_ts" not in kwargs: + kwargs["remote_ts"] = self.child_sas[0].remote_ts + sa = type(self)( + test, + is_initiator=self.is_initiator, + i_id=self.i_id, + r_id=self.r_id, + id_type=self.id_type, + auth_data=self.auth_data, + auth_method=self.auth_method, + priv_key=self.priv_key, + i_natt=self.i_natt, + r_natt=self.r_natt, + udp_encap=self.udp_encap, + **kwargs, + ) + if sa.is_initiator: + sa.set_ike_props( + crypto=self.ike_crypto, + crypto_key_len=self.ike_crypto_key_len, + integ=self.ike_integ, + prf=self.ike_prf, + dh=self.ike_dh, + ) + sa.set_esp_props( + crypto=self.esp_crypto, + crypto_key_len=self.esp_crypto_key_len, + integ=self.esp_integ, + ) + return sa + @unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests") class IkePeer(VppTestCase): @@ -650,6 +696,20 @@ class IkePeer(VppTestCase): self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6 ) + def create_sa_rekey_request(self, **kwargs): + sa = self.generate_sa_init_payload(**kwargs) + header = ikev2.IKEv2( + init_SPI=self.sa.ispi, + resp_SPI=self.sa.rspi, + id=self.sa.new_msg_id(), + flags="Initiator", + exch_type="CREATE_CHILD_SA", + ) + ike_msg = self.encrypt_ike_msg(header, sa, "SA") + return self.create_packet( + self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6 + ) + def create_empty_request(self): header = ikev2.IKEv2( init_SPI=self.sa.ispi, @@ -830,10 +890,15 @@ class IkePeer(VppTestCase): self.assertEqual(api_id.data_len, exp_id.data_len) self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type) - def verify_ike_sas(self): + def verify_ike_sas(self, is_rekey=False): r = self.vapi.ikev2_sa_dump() - self.assertEqual(len(r), 1) - sa = r[0].sa + if is_rekey: + sa_count = 2 + sa = r[1].sa + else: + sa_count = 1 + sa = r[0].sa + self.assertEqual(len(r), sa_count) self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big")) self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big")) if self.ip6: @@ -865,7 +930,16 @@ class IkePeer(VppTestCase): self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id) self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr) + n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index) + self.verify_nonce(n, self.sa.i_nonce) + n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index) + self.verify_nonce(n, self.sa.r_nonce) + r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index) + if is_rekey: + self.assertEqual(len(r), 0) + return + self.assertEqual(len(r), 1) csa = r[0].child_sa self.assertEqual(csa.sa_index, sa.sa_index) @@ -894,11 +968,6 @@ class IkePeer(VppTestCase): self.assertEqual(len(r), 1) self.verify_ts(r[0].ts, tsr[0], False) - n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index) - self.verify_nonce(n, self.sa.i_nonce) - n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index) - self.verify_nonce(n, self.sa.r_nonce) - def verify_nonce(self, api_nonce, nonce): self.assertEqual(api_nonce.data_len, len(nonce)) self.assertEqual(api_nonce.nonce, nonce) @@ -1281,7 +1350,9 @@ class TemplateResponder(IkePeer): capture = self.pg0.get_capture(1) self.verify_del_sa(capture[0]) - def send_sa_init_req(self): + def generate_sa_init_payload( + self, spi=None, dh_pub_key=None, nonce=None, next_payload=None + ): tr_attr = self.sa.ike_crypto_attr() trans = ( ikev2.IKEv2_payload_Transform( @@ -1301,23 +1372,36 @@ class TemplateResponder(IkePeer): ) ) + if spi is None: + pargs = {} + else: + pargs = {"SPI": spi, "SPIsize": len(spi)} props = ikev2.IKEv2_payload_Proposal( - proposal=1, proto="IKEv2", trans_nb=4, trans=trans + proposal=1, + proto="IKEv2", + trans_nb=4, + trans=trans, + **pargs, ) - next_payload = None if self.ip6 else "Notify" - - self.sa.init_req_packet = ( - ikev2.IKEv2( - init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT" - ) - / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props) + return ( + ikev2.IKEv2_payload_SA(next_payload="KE", prop=props) / ikev2.IKEv2_payload_KE( - next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key + next_payload="Nonce", + group=self.sa.ike_dh, + load=self.sa.my_dh_pub_key if dh_pub_key is None else dh_pub_key, + ) + / ikev2.IKEv2_payload_Nonce( + next_payload=next_payload, + load=self.sa.i_nonce if nonce is None else nonce, ) - / ikev2.IKEv2_payload_Nonce(next_payload=next_payload, load=self.sa.i_nonce) ) + def send_sa_init_req(self): + self.sa.init_req_packet = ikev2.IKEv2( + init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT" + ) / self.generate_sa_init_payload(next_payload=None if self.ip6 else "Notify") + if not self.ip6: if self.sa.i_natt: src_address = b"\x0a\x0a\x0a\x01" @@ -2186,6 +2270,50 @@ class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat): WITH_KEX = True +@tag_fixme_vpp_workers +class TestResponderRekeySA(TestResponderPsk): + """test ikev2 responder - rekey IKE SA""" + + def send_rekey_from_initiator(self, newsa): + packet = self.create_sa_rekey_request( + spi=newsa.ispi, + dh_pub_key=newsa.my_dh_pub_key, + nonce=newsa.i_nonce, + ) + self.pg0.add_stream(packet) + self.pg0.enable_capture() + self.pg_start() + capture = self.pg0.get_capture(1) + return capture + + def process_rekey_response(self, newsa, capture): + ih = self.get_ike_header(capture[0]) + plain = self.sa.hmac_and_decrypt(ih) + sa = ikev2.IKEv2_payload_SA(plain) + prop = sa[ikev2.IKEv2_payload_Proposal] + newsa.rspi = prop.SPI + newsa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load + newsa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load + newsa.complete_dh_data() + newsa.calc_keys(sk_d=self.sa.sk_d) + newsa.child_sas = self.sa.child_sas + self.sa.child_sas = [] + + def test_responder(self): + super(TestResponderRekeySA, self).test_responder() + newsa = self.sa.clone(self, spi=os.urandom(8)) + newsa.generate_dh_data() + capture = self.send_rekey_from_initiator(newsa) + self.process_rekey_response(newsa, capture) + self.verify_ike_sas(is_rekey=True) + self.assert_counter(1, "rekey_req", "ip4") + r = self.vapi.ikev2_sa_dump() + self.assertEqual(r[1].sa.stats.n_rekey_req, 1) + self.initiate_del_sa_from_initiator() + self.sa = newsa + self.verify_ike_sas() + + @tag_fixme_ubuntu2204 @tag_fixme_debian11 class TestResponderVrf(TestResponderPsk, Ikev2Params): |