aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_ikev2.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_ikev2.py')
-rw-r--r--test/test_ikev2.py48
1 files changed, 39 insertions, 9 deletions
diff --git a/test/test_ikev2.py b/test/test_ikev2.py
index ac77a4163a1..3c588719581 100644
--- a/test/test_ikev2.py
+++ b/test/test_ikev2.py
@@ -752,14 +752,15 @@ class IkePeer(VppTestCase):
else:
self.assertNotIn(e.IPSEC_API_SAD_FLAG_UDP_ENCAP, ipsec_sa.flags)
- def verify_ipsec_sas(self, is_rekey=False):
+ def verify_ipsec_sas(self, is_rekey=False, sa_count=None):
sas = self.vapi.ipsec_sa_dump()
- if is_rekey:
- # after rekey there is a short period of time in which old
- # inbound SA is still present
- sa_count = 3
- else:
- sa_count = 2
+ if sa_count is None:
+ if is_rekey:
+ # after rekey there is a short period of time in which old
+ # inbound SA is still present
+ sa_count = 3
+ else:
+ sa_count = 2
self.assertEqual(len(sas), sa_count)
if self.sa.is_initiator:
if is_rekey:
@@ -2078,12 +2079,15 @@ class TestResponderDpd(TestResponderPsk):
class TestResponderRekey(TestResponderPsk):
"""test ikev2 responder - rekey"""
- def rekey_from_initiator(self):
+ def send_rekey_from_initiator(self):
packet = self.create_rekey_request()
self.pg0.add_stream(packet)
self.pg0.enable_capture()
self.pg_start()
capture = self.pg0.get_capture(1)
+ return capture
+
+ def process_rekey_response(self, capture):
ih = self.get_ike_header(capture[0])
plain = self.sa.hmac_and_decrypt(ih)
sa = ikev2.IKEv2_payload_SA(plain)
@@ -2094,7 +2098,7 @@ class TestResponderRekey(TestResponderPsk):
def test_responder(self):
super(TestResponderRekey, self).test_responder()
- self.rekey_from_initiator()
+ self.process_rekey_response(self.send_rekey_from_initiator())
self.sa.calc_child_keys()
self.verify_ike_sas()
self.verify_ipsec_sas(is_rekey=True)
@@ -2103,6 +2107,32 @@ class TestResponderRekey(TestResponderPsk):
self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
+@tag_fixme_vpp_workers
+class TestResponderRekeyRepeat(TestResponderRekey):
+ """test ikev2 responder - rekey repeat"""
+
+ def test_responder(self):
+ super(TestResponderRekeyRepeat, self).test_responder()
+ # rekey request is not accepted until old IPsec SA is expired
+ capture = self.send_rekey_from_initiator()
+ ih = self.get_ike_header(capture[0])
+ plain = self.sa.hmac_and_decrypt(ih)
+ notify = ikev2.IKEv2_payload_Notify(plain)
+ self.assertEqual(notify.type, 43)
+ self.assertEqual(len(self.vapi.ipsec_sa_dump()), 3)
+ # rekey request is accepted after old IPsec SA was expired
+ for _ in range(50):
+ if len(self.vapi.ipsec_sa_dump()) != 3:
+ break
+ time.sleep(0.2)
+ else:
+ self.fail("old IPsec SA not expired")
+ self.process_rekey_response(self.send_rekey_from_initiator())
+ self.sa.calc_child_keys()
+ self.verify_ike_sas()
+ self.verify_ipsec_sas(sa_count=3)
+
+
class TestResponderVrf(TestResponderPsk, Ikev2Params):
"""test ikev2 responder - non-default table id"""