aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorAtzm Watanabe <atzmism@gmail.com>2022-08-18 17:57:53 +0900
committerBeno�t Ganne <bganne@cisco.com>2024-02-09 14:19:31 +0000
commitd4f405a70f28f6e5399a503c91da7ae8f90f94af (patch)
treea148ea35049fc74757e2f91fc1720bed015517e1 /test
parentd9b4d9fb1ff1319c5e24800780a24e15a24cbb2f (diff)
ikev2: accept rekey request for IKE SA
RFC 7296 describes the way to rekey IKE SAs: to rekey an IKE SA, establish a new equivalent IKE SA with the peer to whom the old IKE SA is shared using a CREATE_CHILD_SA within the existing IKE SA. An IKE SA so created inherits all of the original IKE SA's Child SAs, and the new IKE SA is used for all control messages needed to maintain those Child SAs. Type: improvement Signed-off-by: Atzm Watanabe <atzmism@gmail.com> Change-Id: Icdf43b67c38bf183913a28a08a85236ba16343af
Diffstat (limited to 'test')
-rw-r--r--test/test_ikev2.py176
1 files changed, 152 insertions, 24 deletions
diff --git a/test/test_ikev2.py b/test/test_ikev2.py
index 30ee2b98110..d2b4c691f85 100644
--- a/test/test_ikev2.py
+++ b/test/test_ikev2.py
@@ -361,14 +361,21 @@ class IKEv2SA(object):
h.update(data)
return h.finalize()
- def calc_keys(self):
+ def calc_keys(self, sk_d=None):
prf = self.ike_prf_alg.mod()
- # SKEYSEED = prf(Ni | Nr, g^ir)
- s = self.i_nonce + self.r_nonce
- self.skeyseed = self.calc_prf(prf, s, self.dh_shared_secret)
+ if sk_d is None:
+ # SKEYSEED = prf(Ni | Nr, g^ir)
+ self.skeyseed = self.calc_prf(
+ prf, self.i_nonce + self.r_nonce, self.dh_shared_secret
+ )
+ else:
+ # SKEYSEED = prf(SK_d (old), g^ir (new) | Ni | Nr)
+ self.skeyseed = self.calc_prf(
+ prf, sk_d, self.dh_shared_secret + self.i_nonce + self.r_nonce
+ )
# calculate S = Ni | Nr | SPIi SPIr
- s = s + self.ispi + self.rspi
+ s = self.i_nonce + self.r_nonce + self.ispi + self.rspi
prf_key_trunc = self.ike_prf_alg.trunc_len
encr_key_len = self.ike_crypto_key_len
@@ -585,6 +592,45 @@ class IKEv2SA(object):
digest.update(data)
return digest.finalize()
+ def clone(self, test, **kwargs):
+ if "spi" not in kwargs:
+ kwargs["spi"] = self.ispi if self.is_initiator else self.rspi
+ if "nonce" not in kwargs:
+ kwargs["nonce"] = self.i_nonce if self.is_initiator else self.r_nonce
+ if self.child_sas:
+ if "local_ts" not in kwargs:
+ kwargs["local_ts"] = self.child_sas[0].local_ts
+ if "remote_ts" not in kwargs:
+ kwargs["remote_ts"] = self.child_sas[0].remote_ts
+ sa = type(self)(
+ test,
+ is_initiator=self.is_initiator,
+ i_id=self.i_id,
+ r_id=self.r_id,
+ id_type=self.id_type,
+ auth_data=self.auth_data,
+ auth_method=self.auth_method,
+ priv_key=self.priv_key,
+ i_natt=self.i_natt,
+ r_natt=self.r_natt,
+ udp_encap=self.udp_encap,
+ **kwargs,
+ )
+ if sa.is_initiator:
+ sa.set_ike_props(
+ crypto=self.ike_crypto,
+ crypto_key_len=self.ike_crypto_key_len,
+ integ=self.ike_integ,
+ prf=self.ike_prf,
+ dh=self.ike_dh,
+ )
+ sa.set_esp_props(
+ crypto=self.esp_crypto,
+ crypto_key_len=self.esp_crypto_key_len,
+ integ=self.esp_integ,
+ )
+ return sa
+
@unittest.skipIf("ikev2" in config.excluded_plugins, "Exclude IKEv2 plugin tests")
class IkePeer(VppTestCase):
@@ -650,6 +696,20 @@ class IkePeer(VppTestCase):
self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
)
+ def create_sa_rekey_request(self, **kwargs):
+ sa = self.generate_sa_init_payload(**kwargs)
+ header = ikev2.IKEv2(
+ init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi,
+ id=self.sa.new_msg_id(),
+ flags="Initiator",
+ exch_type="CREATE_CHILD_SA",
+ )
+ ike_msg = self.encrypt_ike_msg(header, sa, "SA")
+ return self.create_packet(
+ self.pg0, ike_msg, self.sa.sport, self.sa.dport, self.sa.natt, self.ip6
+ )
+
def create_empty_request(self):
header = ikev2.IKEv2(
init_SPI=self.sa.ispi,
@@ -830,10 +890,15 @@ class IkePeer(VppTestCase):
self.assertEqual(api_id.data_len, exp_id.data_len)
self.assertEqual(bytes(api_id.data, "ascii"), exp_id.type)
- def verify_ike_sas(self):
+ def verify_ike_sas(self, is_rekey=False):
r = self.vapi.ikev2_sa_dump()
- self.assertEqual(len(r), 1)
- sa = r[0].sa
+ if is_rekey:
+ sa_count = 2
+ sa = r[1].sa
+ else:
+ sa_count = 1
+ sa = r[0].sa
+ self.assertEqual(len(r), sa_count)
self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, "big"))
self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, "big"))
if self.ip6:
@@ -865,7 +930,16 @@ class IkePeer(VppTestCase):
self.assertEqual(bytes(sa.i_id.data, "ascii"), self.sa.i_id)
self.assertEqual(bytes(sa.r_id.data, "ascii"), self.idr)
+ n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
+ self.verify_nonce(n, self.sa.i_nonce)
+ n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
+ self.verify_nonce(n, self.sa.r_nonce)
+
r = self.vapi.ikev2_child_sa_dump(sa_index=sa.sa_index)
+ if is_rekey:
+ self.assertEqual(len(r), 0)
+ return
+
self.assertEqual(len(r), 1)
csa = r[0].child_sa
self.assertEqual(csa.sa_index, sa.sa_index)
@@ -894,11 +968,6 @@ class IkePeer(VppTestCase):
self.assertEqual(len(r), 1)
self.verify_ts(r[0].ts, tsr[0], False)
- n = self.vapi.ikev2_nonce_get(is_initiator=True, sa_index=sa.sa_index)
- self.verify_nonce(n, self.sa.i_nonce)
- n = self.vapi.ikev2_nonce_get(is_initiator=False, sa_index=sa.sa_index)
- self.verify_nonce(n, self.sa.r_nonce)
-
def verify_nonce(self, api_nonce, nonce):
self.assertEqual(api_nonce.data_len, len(nonce))
self.assertEqual(api_nonce.nonce, nonce)
@@ -1281,7 +1350,9 @@ class TemplateResponder(IkePeer):
capture = self.pg0.get_capture(1)
self.verify_del_sa(capture[0])
- def send_sa_init_req(self):
+ def generate_sa_init_payload(
+ self, spi=None, dh_pub_key=None, nonce=None, next_payload=None
+ ):
tr_attr = self.sa.ike_crypto_attr()
trans = (
ikev2.IKEv2_payload_Transform(
@@ -1301,23 +1372,36 @@ class TemplateResponder(IkePeer):
)
)
+ if spi is None:
+ pargs = {}
+ else:
+ pargs = {"SPI": spi, "SPIsize": len(spi)}
props = ikev2.IKEv2_payload_Proposal(
- proposal=1, proto="IKEv2", trans_nb=4, trans=trans
+ proposal=1,
+ proto="IKEv2",
+ trans_nb=4,
+ trans=trans,
+ **pargs,
)
- next_payload = None if self.ip6 else "Notify"
-
- self.sa.init_req_packet = (
- ikev2.IKEv2(
- init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
- )
- / ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
+ return (
+ ikev2.IKEv2_payload_SA(next_payload="KE", prop=props)
/ ikev2.IKEv2_payload_KE(
- next_payload="Nonce", group=self.sa.ike_dh, load=self.sa.my_dh_pub_key
+ next_payload="Nonce",
+ group=self.sa.ike_dh,
+ load=self.sa.my_dh_pub_key if dh_pub_key is None else dh_pub_key,
+ )
+ / ikev2.IKEv2_payload_Nonce(
+ next_payload=next_payload,
+ load=self.sa.i_nonce if nonce is None else nonce,
)
- / ikev2.IKEv2_payload_Nonce(next_payload=next_payload, load=self.sa.i_nonce)
)
+ def send_sa_init_req(self):
+ self.sa.init_req_packet = ikev2.IKEv2(
+ init_SPI=self.sa.ispi, flags="Initiator", exch_type="IKE_SA_INIT"
+ ) / self.generate_sa_init_payload(next_payload=None if self.ip6 else "Notify")
+
if not self.ip6:
if self.sa.i_natt:
src_address = b"\x0a\x0a\x0a\x01"
@@ -2186,6 +2270,50 @@ class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat):
WITH_KEX = True
+@tag_fixme_vpp_workers
+class TestResponderRekeySA(TestResponderPsk):
+ """test ikev2 responder - rekey IKE SA"""
+
+ def send_rekey_from_initiator(self, newsa):
+ packet = self.create_sa_rekey_request(
+ spi=newsa.ispi,
+ dh_pub_key=newsa.my_dh_pub_key,
+ nonce=newsa.i_nonce,
+ )
+ self.pg0.add_stream(packet)
+ self.pg0.enable_capture()
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ return capture
+
+ def process_rekey_response(self, newsa, capture):
+ ih = self.get_ike_header(capture[0])
+ plain = self.sa.hmac_and_decrypt(ih)
+ sa = ikev2.IKEv2_payload_SA(plain)
+ prop = sa[ikev2.IKEv2_payload_Proposal]
+ newsa.rspi = prop.SPI
+ newsa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
+ newsa.r_dh_data = sa[ikev2.IKEv2_payload_KE].load
+ newsa.complete_dh_data()
+ newsa.calc_keys(sk_d=self.sa.sk_d)
+ newsa.child_sas = self.sa.child_sas
+ self.sa.child_sas = []
+
+ def test_responder(self):
+ super(TestResponderRekeySA, self).test_responder()
+ newsa = self.sa.clone(self, spi=os.urandom(8))
+ newsa.generate_dh_data()
+ capture = self.send_rekey_from_initiator(newsa)
+ self.process_rekey_response(newsa, capture)
+ self.verify_ike_sas(is_rekey=True)
+ self.assert_counter(1, "rekey_req", "ip4")
+ r = self.vapi.ikev2_sa_dump()
+ self.assertEqual(r[1].sa.stats.n_rekey_req, 1)
+ self.initiate_del_sa_from_initiator()
+ self.sa = newsa
+ self.verify_ike_sas()
+
+
@tag_fixme_ubuntu2204
@tag_fixme_debian11
class TestResponderVrf(TestResponderPsk, Ikev2Params):