summaryrefslogtreecommitdiffstats
path: root/src/plugins/ikev2/test/test_ikev2.py
diff options
context:
space:
mode:
authorFilip Tehlar <ftehlar@cisco.com>2020-10-10 04:39:11 +0000
committerBeno�t Ganne <bganne@cisco.com>2020-10-21 16:11:28 +0000
commitedf2900ac633ae0d8575b04094b1bca40e1a221f (patch)
treec6afab1f6a9162d07aee9835628b90a030f2a33d /src/plugins/ikev2/test/test_ikev2.py
parent6ba4e41d33ffda2596d9d4b3a1d7fdd3c9a6b870 (diff)
ikev2: support sending requests from responder
Type: improvement Ticket: VPP-1894 Change-Id: I5a24a48416bca2ffbd346cdaa813fb25801e6c9b Signed-off-by: Filip Tehlar <ftehlar@cisco.com>
Diffstat (limited to 'src/plugins/ikev2/test/test_ikev2.py')
-rw-r--r--src/plugins/ikev2/test/test_ikev2.py140
1 files changed, 122 insertions, 18 deletions
diff --git a/src/plugins/ikev2/test/test_ikev2.py b/src/plugins/ikev2/test/test_ikev2.py
index f75a517f824..ec68658f564 100644
--- a/src/plugins/ikev2/test/test_ikev2.py
+++ b/src/plugins/ikev2/test/test_ikev2.py
@@ -113,9 +113,7 @@ class CryptoAlgo(object):
self.mode(nonce, icv, len(icv)),
default_backend()).decryptor()
decryptor.authenticate_additional_data(aad)
- pt = decryptor.update(ct) + decryptor.finalize()
- pad_len = pt[-1] + 1
- return pt[:-pad_len]
+ return decryptor.update(ct) + decryptor.finalize()
def pad(self, data):
pad_len = (len(data) // self.bs + 1) * self.bs - len(data)
@@ -435,14 +433,17 @@ class IKEv2SA(object):
aad_len = len(ikev2.IKEv2_payload_Encrypted()) + len(ikev2.IKEv2())
ct = ep.load[:-GCM_ICV_SIZE]
tag = ep.load[-GCM_ICV_SIZE:]
- return self.decrypt(ct, raw(ike)[:aad_len], tag)
+ plain = self.decrypt(ct, raw(ike)[:aad_len], tag)
else:
self.verify_hmac(raw(ike))
integ_trunc = self.ike_integ_alg.trunc_len
# remove ICV and decrypt payload
ct = ep.load[:-integ_trunc]
- return self.decrypt(ct)
+ plain = self.decrypt(ct)
+ # remove padding
+ pad_len = plain[-1]
+ return plain[:-pad_len - 1]
def build_ts_addr(self, ts, version):
return {'starting_address_v' + version: ts['start_addr'],
@@ -540,6 +541,19 @@ class IkePeer(VppTestCase):
def tearDownClass(cls):
super(IkePeer, cls).tearDownClass()
+ def tearDown(self):
+ super(IkePeer, self).tearDown()
+ if self.del_sa_from_responder:
+ self.initiate_del_sa_from_responder()
+ else:
+ self.initiate_del_sa_from_initiator()
+ r = self.vapi.ikev2_sa_dump()
+ self.assertEqual(len(r), 0)
+ sas = self.vapi.ipsec_sa_dump()
+ self.assertEqual(len(sas), 0)
+ self.p.remove_vpp_config()
+ self.assertIsNone(self.p.query_vpp_config())
+
def setUp(self):
super(IkePeer, self).setUp()
self.config_tc()
@@ -580,6 +594,7 @@ class IkePeer(VppTestCase):
esp = packet[ESP]
ih = self.verify_and_remove_non_esp_marker(esp)
self.assertEqual(ih.version, 0x20)
+ self.assertNotIn('Version', ih.flags)
return ih
def verify_and_remove_non_esp_marker(self, packet):
@@ -775,8 +790,49 @@ class IkePeer(VppTestCase):
class TemplateInitiator(IkePeer):
""" initiator test template """
- def tearDown(self):
- super(TemplateInitiator, self).tearDown()
+ def initiate_del_sa_from_initiator(self):
+ ispi = int.from_bytes(self.sa.ispi, 'little')
+ self.pg0.enable_capture()
+ self.pg_start()
+ self.vapi.ikev2_initiate_del_ike_sa(ispi=ispi)
+ capture = self.pg0.get_capture(1)
+ ih = self.get_ike_header(capture[0])
+ self.assertNotIn('Response', ih.flags)
+ self.assertIn('Initiator', ih.flags)
+ self.assertEqual(ih.init_SPI, self.sa.ispi)
+ self.assertEqual(ih.resp_SPI, self.sa.rspi)
+ plain = self.sa.hmac_and_decrypt(ih)
+ d = ikev2.IKEv2_payload_Delete(plain)
+ self.assertEqual(d.proto, 1) # proto=IKEv2
+ header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ flags='Response', exch_type='INFORMATIONAL',
+ id=ih.id, next_payload='Encrypted')
+ resp = self.encrypt_ike_msg(header, b'', None)
+ self.send_and_assert_no_replies(self.pg0, resp)
+
+ def verify_del_sa(self, packet):
+ ih = self.get_ike_header(packet)
+ self.assertEqual(ih.id, self.sa.msg_id)
+ self.assertEqual(ih.exch_type, 37) # exchange informational
+ self.assertIn('Response', ih.flags)
+ self.assertIn('Initiator', ih.flags)
+ plain = self.sa.hmac_and_decrypt(ih)
+ self.assertEqual(plain, b'')
+
+ def initiate_del_sa_from_responder(self):
+ header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ exch_type='INFORMATIONAL',
+ id=self.sa.new_msg_id())
+ del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
+ ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
+ packet = self.create_packet(self.pg0, ike_msg,
+ self.sa.sport, self.sa.dport,
+ self.sa.natt, self.ip6)
+ self.pg0.add_stream(packet)
+ self.pg0.enable_capture()
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ self.verify_del_sa(capture[0])
@staticmethod
def find_notify_payload(packet, notify_type):
@@ -946,22 +1002,41 @@ class TemplateInitiator(IkePeer):
class TemplateResponder(IkePeer):
""" responder test template """
- def tearDown(self):
- super(TemplateResponder, self).tearDown()
- if self.sa.is_initiator:
- self.initiate_del_sa()
- r = self.vapi.ikev2_sa_dump()
- self.assertEqual(len(r), 0)
-
- self.p.remove_vpp_config()
- self.assertIsNone(self.p.query_vpp_config())
+ def initiate_del_sa_from_responder(self):
+ self.pg0.enable_capture()
+ self.pg_start()
+ self.vapi.ikev2_initiate_del_ike_sa(
+ ispi=int.from_bytes(self.sa.ispi, 'little'))
+ capture = self.pg0.get_capture(1)
+ ih = self.get_ike_header(capture[0])
+ self.assertNotIn('Response', ih.flags)
+ self.assertNotIn('Initiator', ih.flags)
+ self.assertEqual(ih.exch_type, 37) # INFORMATIONAL
+ plain = self.sa.hmac_and_decrypt(ih)
+ d = ikev2.IKEv2_payload_Delete(plain)
+ self.assertEqual(d.proto, 1) # proto=IKEv2
+ self.assertEqual(ih.init_SPI, self.sa.ispi)
+ self.assertEqual(ih.resp_SPI, self.sa.rspi)
+ header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ flags='Initiator+Response',
+ exch_type='INFORMATIONAL',
+ id=ih.id, next_payload='Encrypted')
+ resp = self.encrypt_ike_msg(header, b'', None)
+ self.send_and_assert_no_replies(self.pg0, resp)
def verify_del_sa(self, packet):
ih = self.get_ike_header(packet)
self.assertEqual(ih.id, self.sa.msg_id)
self.assertEqual(ih.exch_type, 37) # exchange informational
+ self.assertIn('Response', ih.flags)
+ self.assertNotIn('Initiator', ih.flags)
+ self.assertEqual(ih.next_payload, 46) # Encrypted
+ self.assertEqual(ih.init_SPI, self.sa.ispi)
+ self.assertEqual(ih.resp_SPI, self.sa.rspi)
+ plain = self.sa.hmac_and_decrypt(ih)
+ self.assertEqual(plain, b'')
- def initiate_del_sa(self):
+ def initiate_del_sa_from_initiator(self):
header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
flags='Initiator', exch_type='INFORMATIONAL',
id=self.sa.new_msg_id())
@@ -1081,7 +1156,7 @@ class TemplateResponder(IkePeer):
self.assertEqual(ih.id, self.sa.msg_id)
self.assertEqual(ih.exch_type, 34)
- self.assertTrue('Response' in ih.flags)
+ self.assertIn('Response', ih.flags)
self.assertEqual(ih.init_SPI, self.sa.ispi)
self.assertNotEqual(ih.resp_SPI, 0)
self.sa.rspi = ih.resp_SPI
@@ -1129,6 +1204,8 @@ class Ikev2Params(object):
'SHA2-384-192': ei.IPSEC_API_INTEG_ALG_SHA_384_192,
'SHA2-512-256': ei.IPSEC_API_INTEG_ALG_SHA_512_256}
+ self.del_sa_from_responder = False if 'del_sa_from_responder'\
+ not in params else params['del_sa_from_responder']
is_natt = 'natt' in params and params['natt'] or False
self.p = Profile(self, 'pr1')
self.ip6 = False if 'ip6' not in params else params['ip6']
@@ -1392,8 +1469,34 @@ class TestApi(VppTestCase):
class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
""" test ikev2 initiator - pre shared key auth """
+
+ def config_tc(self):
+ self.config_params({
+ 'is_initiator': False, # seen from test case perspective
+ # thus vpp is initiator
+ 'responder': {'sw_if_index': self.pg0.sw_if_index,
+ 'addr': self.pg0.remote_ip4},
+ 'ike-crypto': ('AES-GCM-16ICV', 32),
+ 'ike-integ': 'NULL',
+ 'ike-dh': '3072MODPgr',
+ 'ike_transforms': {
+ 'crypto_alg': 20, # "aes-gcm-16"
+ 'crypto_key_size': 256,
+ 'dh_group': 15, # "modp-3072"
+ },
+ 'esp_transforms': {
+ 'crypto_alg': 12, # "aes-cbc"
+ 'crypto_key_size': 256,
+ # "hmac-sha2-256-128"
+ 'integ_alg': 12}})
+
+
+class TestInitiatorDelSAFromResponder(TemplateInitiator, Ikev2Params):
+ """ test ikev2 initiator - delete IKE SA from responder """
+
def config_tc(self):
self.config_params({
+ 'del_sa_from_responder': True,
'is_initiator': False, # seen from test case perspective
# thus vpp is initiator
'responder': {'sw_if_index': self.pg0.sw_if_index,
@@ -1471,6 +1574,7 @@ class Test_IKE_AES_GCM_16_256(TemplateResponder, Ikev2Params):
"""
def config_tc(self):
self.config_params({
+ 'del_sa_from_responder': True,
'ip6': True,
'natt': True,
'ike-crypto': ('AES-GCM-16ICV', 32),