summaryrefslogtreecommitdiffstats
path: root/src/plugins/ikev2/test/test_ikev2.py
diff options
context:
space:
mode:
authorFilip Tehlar <ftehlar@cisco.com>2020-11-19 21:34:48 +0000
committerBeno�t Ganne <bganne@cisco.com>2020-11-25 12:24:11 +0000
commit38340fa32c96e9c6cb1593f03117dd504efbd5f4 (patch)
tree450c31297b966fc409192e78f5d305483abb1fb9 /src/plugins/ikev2/test/test_ikev2.py
parent45d60492a425def70e2ee64de6c36327be429479 (diff)
ikev2: fix issue when sending multiple requests at once
Type: fix Change-Id: I8ed556de4370a03d10c56cce101cd5ea0d0aaf8b Signed-off-by: Filip Tehlar <ftehlar@cisco.com>
Diffstat (limited to 'src/plugins/ikev2/test/test_ikev2.py')
-rw-r--r--src/plugins/ikev2/test/test_ikev2.py83
1 files changed, 64 insertions, 19 deletions
diff --git a/src/plugins/ikev2/test/test_ikev2.py b/src/plugins/ikev2/test/test_ikev2.py
index 453a47e04c8..eb2c5f988d4 100644
--- a/src/plugins/ikev2/test/test_ikev2.py
+++ b/src/plugins/ikev2/test/test_ikev2.py
@@ -586,6 +586,27 @@ class IkePeer(VppTestCase):
self.vapi.cli('ikev2 set logging level 4')
self.vapi.cli('event-lo clear')
+ def create_rekey_request(self):
+ sa, first_payload = self.generate_auth_payload(is_rekey=True)
+ header = ikev2.IKEv2(
+ init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
+ flags='Initiator', exch_type='CREATE_CHILD_SA')
+
+ ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
+ return self.create_packet(self.pg0, ike_msg, self.sa.sport,
+ self.sa.dport, self.sa.natt, self.ip6)
+
+ def create_empty_request(self):
+ header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ id=self.sa.new_msg_id(), flags='Initiator',
+ exch_type='INFORMATIONAL',
+ next_payload='Encrypted')
+
+ msg = self.encrypt_ike_msg(header, b'', None)
+ return self.create_packet(self.pg0, msg, self.sa.sport,
+ self.sa.dport, self.sa.natt, self.ip6)
+
def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
use_ip6=False):
if use_ip6:
@@ -1591,6 +1612,47 @@ class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
'integ_alg': 12}})
+class TestInitiatorRequestWindowSize(TestInitiatorPsk):
+ """ test initiator - request window size (1) """
+
+ def rekey_respond(self, req, update_child_sa_data):
+ ih = self.get_ike_header(req)
+ plain = self.sa.hmac_and_decrypt(ih)
+ sa = ikev2.IKEv2_payload_SA(plain)
+ if update_child_sa_data:
+ prop = sa[ikev2.IKEv2_payload_Proposal]
+ self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
+ self.sa.r_nonce = self.sa.i_nonce
+ self.sa.child_sas[0].ispi = prop.SPI
+ self.sa.child_sas[0].rspi = prop.SPI
+ self.sa.calc_child_keys()
+
+ header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ flags='Response', exch_type=36,
+ id=ih.id, next_payload='Encrypted')
+ resp = self.encrypt_ike_msg(header, sa, 'SA')
+ packet = self.create_packet(self.pg0, resp, self.sa.sport,
+ self.sa.dport, self.sa.natt, self.ip6)
+ self.send_and_assert_no_replies(self.pg0, packet)
+
+ def test_initiator(self):
+ super(TestInitiatorRequestWindowSize, self).test_initiator()
+ self.pg0.enable_capture()
+ self.pg_start()
+ ispi = int.from_bytes(self.sa.child_sas[0].ispi, 'little')
+ self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
+ self.vapi.ikev2_initiate_rekey_child_sa(ispi=ispi)
+ capture = self.pg0.get_capture(2)
+
+ # reply in reverse order
+ self.rekey_respond(capture[1], True)
+ self.rekey_respond(capture[0], False)
+
+ # verify that only the second request was accepted
+ self.verify_ike_sas()
+ self.verify_ipsec_sas(is_rekey=True)
+
+
class TestInitiatorRekey(TestInitiatorPsk):
""" test ikev2 initiator - rekey """
@@ -1607,7 +1669,6 @@ class TestInitiatorRekey(TestInitiatorPsk):
plain = self.sa.hmac_and_decrypt(ih)
sa = ikev2.IKEv2_payload_SA(plain)
prop = sa[ikev2.IKEv2_payload_Proposal]
- nonce = sa[ikev2.IKEv2_payload_Nonce]
self.sa.i_nonce = sa[ikev2.IKEv2_payload_Nonce].load
self.sa.r_nonce = self.sa.i_nonce
# update new responder SPI
@@ -1700,15 +1761,7 @@ class TestResponderRekey(TestResponderPsk):
""" test ikev2 responder - rekey """
def rekey_from_initiator(self):
- sa, first_payload = self.generate_auth_payload(is_rekey=True)
- header = ikev2.IKEv2(
- init_SPI=self.sa.ispi,
- resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
- flags='Initiator', exch_type='CREATE_CHILD_SA')
-
- ike_msg = self.encrypt_ike_msg(header, sa, first_payload)
- packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
- self.sa.dport, self.sa.natt, self.ip6)
+ packet = self.create_rekey_request()
self.pg0.add_stream(packet)
self.pg0.enable_capture()
self.pg_start()
@@ -1717,7 +1770,6 @@ class TestResponderRekey(TestResponderPsk):
plain = self.sa.hmac_and_decrypt(ih)
sa = ikev2.IKEv2_payload_SA(plain)
prop = sa[ikev2.IKEv2_payload_Proposal]
- nonce = sa[ikev2.IKEv2_payload_Nonce]
self.sa.r_nonce = sa[ikev2.IKEv2_payload_Nonce].load
# update new responder SPI
self.sa.child_sas[0].rspi = prop.SPI
@@ -1794,14 +1846,7 @@ class TestInitiatorKeepaliveMsg(TestInitiatorPsk):
"""
def send_empty_req_from_responder(self):
- header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
- id=self.sa.new_msg_id(), flags='Initiator',
- exch_type='INFORMATIONAL',
- next_payload='Encrypted')
-
- msg = self.encrypt_ike_msg(header, b'', None)
- packet = self.create_packet(self.pg0, msg, self.sa.sport,
- self.sa.dport, self.sa.natt, self.ip6)
+ packet = self.create_empty_request()
self.pg0.add_stream(packet)
self.pg0.enable_capture()
self.pg_start()