summaryrefslogtreecommitdiffstats
path: root/src/plugins/ikev2/test
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins/ikev2/test')
-rw-r--r--src/plugins/ikev2/test/test_ikev2.py177
-rw-r--r--src/plugins/ikev2/test/vpp_ikev2.py64
2 files changed, 234 insertions, 7 deletions
diff --git a/src/plugins/ikev2/test/test_ikev2.py b/src/plugins/ikev2/test/test_ikev2.py
index 77698d6a5ed..02d1bde8f3d 100644
--- a/src/plugins/ikev2/test/test_ikev2.py
+++ b/src/plugins/ikev2/test/test_ikev2.py
@@ -9,6 +9,7 @@ from cryptography.hazmat.primitives.ciphers import (
algorithms,
modes,
)
+from ipaddress import IPv4Address
from scapy.layers.ipsec import ESP
from scapy.layers.inet import IP, UDP, Ether
from scapy.packet import raw, Raw
@@ -494,8 +495,14 @@ class TemplateResponder(VppTestCase):
super(TemplateResponder, self).setUp()
self.config_tc()
self.p.add_vpp_config()
+ self.assertIsNotNone(self.p.query_vpp_config())
self.sa.generate_dh_data()
+ def tearDown(self):
+ super(TemplateResponder, self).tearDown()
+ self.p.remove_vpp_config()
+ self.assertIsNone(self.p.query_vpp_config())
+
def create_ike_msg(self, src_if, msg, sport=500, dport=500, natt=False):
res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
IP(src=src_if.remote_ip4, dst=src_if.local_ip4) /
@@ -769,8 +776,8 @@ class Ikev2Params(object):
self.p.add_local_id(id_type='fqdn', data=b'vpp.home')
self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com')
- self.p.add_local_ts(start_addr=0x0a0a0a0, end_addr=0x0a0a0aff)
- self.p.add_remote_ts(start_addr=0xa000000, end_addr=0xa0000ff)
+ self.p.add_local_ts(start_addr='10.10.10.0', end_addr='10.10.10.255')
+ self.p.add_remote_ts(start_addr='10.0.0.0', end_addr='10.0.0.255')
self.sa = IKEv2SA(self, i_id=self.p.remote_id['data'],
r_id=self.p.local_id['data'],
@@ -798,6 +805,172 @@ class Ikev2Params(object):
integ=esp_integ)
+class TestApi(VppTestCase):
+ """ Test IKEV2 API """
+ @classmethod
+ def setUpClass(cls):
+ super(TestApi, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestApi, cls).tearDownClass()
+
+ def tearDown(self):
+ super(TestApi, self).tearDown()
+ self.p1.remove_vpp_config()
+ self.p2.remove_vpp_config()
+ r = self.vapi.ikev2_profile_dump()
+ self.assertEqual(len(r), 0)
+
+ def configure_profile(self, cfg):
+ p = Profile(self, cfg['name'])
+ p.add_local_id(id_type=cfg['loc_id'][0], data=cfg['loc_id'][1])
+ p.add_remote_id(id_type=cfg['rem_id'][0], data=cfg['rem_id'][1])
+ p.add_local_ts(**cfg['loc_ts'])
+ p.add_remote_ts(**cfg['rem_ts'])
+ p.add_responder(cfg['responder'])
+ p.add_ike_transforms(cfg['ike_ts'])
+ p.add_esp_transforms(cfg['esp_ts'])
+ p.add_auth(**cfg['auth'])
+ p.set_udp_encap(cfg['udp_encap'])
+ p.set_ipsec_over_udp_port(cfg['ipsec_over_udp_port'])
+ if 'lifetime_data' in cfg:
+ p.set_lifetime_data(cfg['lifetime_data'])
+ if 'tun_itf' in cfg:
+ p.set_tunnel_interface(cfg['tun_itf'])
+ p.add_vpp_config()
+ return p
+
+ def test_profile_api(self):
+ """ test profile dump API """
+ loc_ts = {
+ 'proto': 8,
+ 'start_port': 1,
+ 'end_port': 19,
+ 'start_addr': '3.3.3.2',
+ 'end_addr': '3.3.3.3',
+ }
+ rem_ts = {
+ 'proto': 9,
+ 'start_port': 10,
+ 'end_port': 119,
+ 'start_addr': '4.5.76.80',
+ 'end_addr': '2.3.4.6',
+ }
+
+ conf = {
+ 'p1': {
+ 'name': 'p1',
+ 'loc_id': ('fqdn', b'vpp.home'),
+ 'rem_id': ('fqdn', b'roadwarrior.example.com'),
+ 'loc_ts': loc_ts,
+ 'rem_ts': rem_ts,
+ 'responder': {'sw_if_index': 0, 'ip4': '5.6.7.8'},
+ 'ike_ts': {
+ 'crypto_alg': 20,
+ 'crypto_key_size': 32,
+ 'integ_alg': 1,
+ 'dh_group': 1},
+ 'esp_ts': {
+ 'crypto_alg': 13,
+ 'crypto_key_size': 24,
+ 'integ_alg': 2},
+ 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
+ 'udp_encap': True,
+ 'ipsec_over_udp_port': 4501,
+ 'lifetime_data': {
+ 'lifetime': 123,
+ 'lifetime_maxdata': 20192,
+ 'lifetime_jitter': 9,
+ 'handover': 132},
+ },
+ 'p2': {
+ 'name': 'p2',
+ 'loc_id': ('ip4-addr', b'192.168.2.1'),
+ 'rem_id': ('ip4-addr', b'192.168.2.2'),
+ 'loc_ts': loc_ts,
+ 'rem_ts': rem_ts,
+ 'responder': {'sw_if_index': 4, 'ip4': '5.6.7.99'},
+ 'ike_ts': {
+ 'crypto_alg': 12,
+ 'crypto_key_size': 16,
+ 'integ_alg': 3,
+ 'dh_group': 3},
+ 'esp_ts': {
+ 'crypto_alg': 9,
+ 'crypto_key_size': 24,
+ 'integ_alg': 4},
+ 'auth': {'method': 'shared-key', 'data': b'sharedkeydata'},
+ 'udp_encap': False,
+ 'ipsec_over_udp_port': 4600,
+ 'tun_itf': 0}
+ }
+ self.p1 = self.configure_profile(conf['p1'])
+ self.p2 = self.configure_profile(conf['p2'])
+
+ r = self.vapi.ikev2_profile_dump()
+ self.assertEqual(len(r), 2)
+ self.verify_profile(r[0].profile, conf['p1'])
+ self.verify_profile(r[1].profile, conf['p2'])
+
+ def verify_id(self, api_id, cfg_id):
+ self.assertEqual(api_id.type, IDType.value(cfg_id[0]))
+ self.assertEqual(bytes(api_id.data, 'ascii'), cfg_id[1])
+
+ def verify_ts(self, api_ts, cfg_ts):
+ self.assertEqual(api_ts.protocol_id, cfg_ts['proto'])
+ self.assertEqual(api_ts.start_port, cfg_ts['start_port'])
+ self.assertEqual(api_ts.end_port, cfg_ts['end_port'])
+ self.assertEqual(api_ts.start_addr, IPv4Address(cfg_ts['start_addr']))
+ self.assertEqual(api_ts.end_addr, IPv4Address(cfg_ts['end_addr']))
+
+ def verify_responder(self, api_r, cfg_r):
+ self.assertEqual(api_r.sw_if_index, cfg_r['sw_if_index'])
+ self.assertEqual(api_r.ip4, IPv4Address(cfg_r['ip4']))
+
+ def verify_transforms(self, api_ts, cfg_ts):
+ self.assertEqual(api_ts.crypto_alg, cfg_ts['crypto_alg'])
+ self.assertEqual(api_ts.crypto_key_size, cfg_ts['crypto_key_size'])
+ self.assertEqual(api_ts.integ_alg, cfg_ts['integ_alg'])
+
+ def verify_ike_transforms(self, api_ts, cfg_ts):
+ self.verify_transforms(api_ts, cfg_ts)
+ self.assertEqual(api_ts.dh_group, cfg_ts['dh_group'])
+
+ def verify_esp_transforms(self, api_ts, cfg_ts):
+ self.verify_transforms(api_ts, cfg_ts)
+
+ def verify_auth(self, api_auth, cfg_auth):
+ self.assertEqual(api_auth.method, AuthMethod.value(cfg_auth['method']))
+ self.assertEqual(api_auth.data, cfg_auth['data'])
+ self.assertEqual(api_auth.data_len, len(cfg_auth['data']))
+
+ def verify_lifetime_data(self, p, ld):
+ self.assertEqual(p.lifetime, ld['lifetime'])
+ self.assertEqual(p.lifetime_maxdata, ld['lifetime_maxdata'])
+ self.assertEqual(p.lifetime_jitter, ld['lifetime_jitter'])
+ self.assertEqual(p.handover, ld['handover'])
+
+ def verify_profile(self, ap, cp):
+ self.assertEqual(ap.name, cp['name'])
+ self.assertEqual(ap.udp_encap, cp['udp_encap'])
+ self.verify_id(ap.loc_id, cp['loc_id'])
+ self.verify_id(ap.rem_id, cp['rem_id'])
+ self.verify_ts(ap.loc_ts, cp['loc_ts'])
+ self.verify_ts(ap.rem_ts, cp['rem_ts'])
+ self.verify_responder(ap.responder, cp['responder'])
+ self.verify_ike_transforms(ap.ike_ts, cp['ike_ts'])
+ self.verify_esp_transforms(ap.esp_ts, cp['esp_ts'])
+ self.verify_auth(ap.auth, cp['auth'])
+ if 'lifetime_data' in cp:
+ self.verify_lifetime_data(ap, cp['lifetime_data'])
+ self.assertEqual(ap.ipsec_over_udp_port, cp['ipsec_over_udp_port'])
+ if 'tun_itf' in cp:
+ self.assertEqual(ap.tun_itf, cp['tun_itf'])
+ else:
+ self.assertEqual(ap.tun_itf, 0xffffffff)
+
+
class TestResponderNATT(TemplateResponder, Ikev2Params):
""" test ikev2 responder - nat traversal """
def config_tc(self):
diff --git a/src/plugins/ikev2/test/vpp_ikev2.py b/src/plugins/ikev2/test/vpp_ikev2.py
index 67df1d53b5e..5a2a51eb607 100644
--- a/src/plugins/ikev2/test/vpp_ikev2.py
+++ b/src/plugins/ikev2/test/vpp_ikev2.py
@@ -24,6 +24,7 @@ class Profile(VppObject):
self.test = test
self.vapi = test.vapi
self.profile_name = profile_name
+ self.udp_encap = False
def add_auth(self, method, data, is_hex=False):
if isinstance(method, int):
@@ -53,7 +54,7 @@ class Profile(VppObject):
def add_local_ts(self, start_addr, end_addr, start_port=0, end_port=0xffff,
proto=0):
self.local_ts = {'is_local': True,
- 'proto': proto,
+ 'protocol_id': proto,
'start_port': start_port,
'end_port': end_port,
'start_addr': start_addr,
@@ -62,12 +63,34 @@ class Profile(VppObject):
def add_remote_ts(self, start_addr, end_addr, start_port=0,
end_port=0xffff, proto=0):
self.remote_ts = {'is_local': False,
- 'proto': proto,
+ 'protocol_id': proto,
'start_port': start_port,
'end_port': end_port,
'start_addr': start_addr,
'end_addr': end_addr}
+ def add_responder(self, responder):
+ self.responder = responder
+
+ def add_ike_transforms(self, tr):
+ self.ike_transforms = tr
+
+ def add_esp_transforms(self, tr):
+ self.esp_transforms = tr
+
+ def set_udp_encap(self, udp_encap):
+ self.udp_encap = udp_encap
+
+ def set_lifetime_data(self, data):
+ self.lifetime_data = data
+
+ def set_ipsec_over_udp_port(self, port):
+ self.ipsec_udp_port = {'is_set': 1,
+ 'port': port}
+
+ def set_tunnel_interface(self, sw_if_index):
+ self.tun_itf = sw_if_index
+
def object_id(self):
return 'ikev2-profile-%s' % self.profile_name
@@ -92,10 +115,41 @@ class Profile(VppObject):
**self.remote_id)
if hasattr(self, 'local_ts'):
self.vapi.ikev2_profile_set_ts(name=self.profile_name,
- **self.local_ts)
+ ts={**self.local_ts})
+
if hasattr(self, 'remote_ts'):
self.vapi.ikev2_profile_set_ts(name=self.profile_name,
- **self.remote_ts)
+ ts={**self.remote_ts})
+
+ if hasattr(self, 'responder'):
+ self.vapi.ikev2_set_responder(name=self.profile_name,
+ responder={**self.responder})
+
+ if hasattr(self, 'ike_transforms'):
+ self.vapi.ikev2_set_ike_transforms(name=self.profile_name,
+ tr={**self.ike_transforms})
+
+ if hasattr(self, 'esp_transforms'):
+ self.vapi.ikev2_set_esp_transforms(name=self.profile_name,
+ tr=self.esp_transforms)
+
+ if self.udp_encap:
+ self.vapi.ikev2_profile_set_udp_encap(name=self.profile_name)
+
+ if hasattr(self, 'lifetime_data'):
+ self.vapi.ikev2_set_sa_lifetime(name=self.profile_name,
+ **self.lifetime_data)
+
+ if hasattr(self, 'ipsec_udp_port'):
+ self.vapi.ikev2_profile_set_ipsec_udp_port(name=self.profile_name,
+ **self.ipsec_udp_port)
+ if hasattr(self, 'tun_itf'):
+ self.vapi.ikev2_set_tunnel_interface(name=self.profile_name,
+ sw_if_index=self.tun_itf)
def query_vpp_config(self):
- raise NotImplementedError()
+ res = self.vapi.ikev2_profile_dump()
+ for r in res:
+ if r.profile.name == self.profile_name:
+ return r.profile
+ return None