From 7e6ffba672875f1070348753890d023695d73be6 Mon Sep 17 00:00:00 2001 From: Atzm Watanabe Date: Tue, 9 Aug 2022 14:00:03 +0900 Subject: ikev2: do not accept rekey until old SA is deleted Type: fix Signed-off-by: Atzm Watanabe Change-Id: I11b6107492004a45104857dc2dae01b9a5a01e3b --- test/test_ikev2.py | 48 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 39 insertions(+), 9 deletions(-) (limited to 'test') diff --git a/test/test_ikev2.py b/test/test_ikev2.py index ac77a4163a1..3c588719581 100644 --- a/test/test_ikev2.py +++ b/test/test_ikev2.py @@ -752,14 +752,15 @@ class IkePeer(VppTestCase): else: self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags) - def verify_ipsec_sas(self, is_rekey=False): + def verify_ipsec_sas(self, is_rekey=False, sa_count=None): sas = self.vapi.ipsec_sa_dump() - if is_rekey: - # after rekey there is a short period of time in which old - # inbound SA is still present - sa_count = 3 - else: - sa_count = 2 + if sa_count is None: + if is_rekey: + # after rekey there is a short period of time in which old + # inbound SA is still present + sa_count = 3 + else: + sa_count = 2 self.assertEqual(len(sas), sa_count) if self.sa.is_initiator: if is_rekey: @@ -2078,12 +2079,15 @@ class TestResponderDpd(TestResponderPsk): class TestResponderRekey(TestResponderPsk): """test ikev2 responder - rekey""" - def rekey_from_initiator(self): + def send_rekey_from_initiator(self): packet = self.create_rekey_request() self.pg0.add_stream(packet) self.pg0.enable_capture() self.pg_start() capture = self.pg0.get_capture(1) + return capture + + def process_rekey_response(self, capture): ih = self.get_ike_header(capture[0]) plain = self.sa.hmac_and_decrypt(ih) sa = ikev2.IKEv2_payload_SA(plain) @@ -2094,7 +2098,7 @@ class TestResponderRekey(TestResponderPsk): def test_responder(self): super(TestResponderRekey, self).test_responder() - self.rekey_from_initiator() + self.process_rekey_response(self.send_rekey_from_initiator()) self.sa.calc_child_keys() self.verify_ike_sas() self.verify_ipsec_sas(is_rekey=True) @@ -2103,6 +2107,32 @@ class TestResponderRekey(TestResponderPsk): self.assertEqual(r[0].sa.stats.n_rekey_req, 1) +@tag_fixme_vpp_workers +class TestResponderRekeyRepeat(TestResponderRekey): + """test ikev2 responder - rekey repeat""" + + def test_responder(self): + super(TestResponderRekeyRepeat, self).test_responder() + # rekey request is not accepted until old IPsec SA is expired + capture = self.send_rekey_from_initiator() + ih = self.get_ike_header(capture[0]) + plain = self.sa.hmac_and_decrypt(ih) + notify = ikev2.IKEv2_payload_Notify(plain) + self.assertEqual(notify.type, 43) + self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3) + # rekey request is accepted after old IPsec SA was expired + for _ in range(50): + if len(self.vapi.ipsec_sa_dump()) != 3: + break + time.sleep(0.2) + else: + self.fail("old IPsec SA not expired") + self.process_rekey_response(self.send_rekey_from_initiator()) + self.sa.calc_child_keys() + self.verify_ike_sas() + self.verify_ipsec_sas(sa_count=3) + + class TestResponderVrf(TestResponderPsk, Ikev2Params): """test ikev2 responder - non-default table id""" -- cgit 1.2.3-korg