aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins/ikev2/test/test_ikev2.py
diff options
context:
space:
mode:
authorFilip Tehlar <ftehlar@cisco.com>2020-06-23 20:35:58 +0000
committerBenoƮt Ganne <bganne@cisco.com>2020-06-30 08:15:01 +0000
commitbfeae8c57e5da3c8c36291d8d6834cffe4db7408 (patch)
tree973f40bcc212f052a7819d1875279f0cc6f9ad86 /src/plugins/ikev2/test/test_ikev2.py
parentcc1085647b2ae36e6c086d65b4e81b9f1cf9fc9a (diff)
tests: ikev2: add nat traversal & cert based auth test
Type: test Change-Id: I3e8e451c5deaf04f519a471369370c383d9cda3b Signed-off-by: Filip Tehlar <ftehlar@cisco.com>
Diffstat (limited to 'src/plugins/ikev2/test/test_ikev2.py')
-rw-r--r--src/plugins/ikev2/test/test_ikev2.py210
1 files changed, 163 insertions, 47 deletions
diff --git a/src/plugins/ikev2/test/test_ikev2.py b/src/plugins/ikev2/test/test_ikev2.py
index d2d82ba58e4..f0053fd55b8 100644
--- a/src/plugins/ikev2/test/test_ikev2.py
+++ b/src/plugins/ikev2/test/test_ikev2.py
@@ -1,17 +1,20 @@
import os
+from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, hmac
-from cryptography.hazmat.primitives.asymmetric import dh
+from cryptography.hazmat.primitives.asymmetric import dh, padding
+from cryptography.hazmat.primitives.serialization import load_pem_private_key
from cryptography.hazmat.primitives.ciphers import (
Cipher,
algorithms,
modes,
)
+from scapy.layers.ipsec import ESP
from scapy.layers.inet import IP, UDP, Ether
from scapy.packet import raw, Raw
from scapy.utils import long_converter
from framework import VppTestCase, VppTestRunner
-from vpp_ikev2 import Profile, IDType
+from vpp_ikev2 import Profile, IDType, AuthMethod
KEY_PAD = b"Key Pad for IKEv2"
@@ -100,9 +103,17 @@ class IKEv2SA(object):
def __init__(self, test, is_initiator=True, spi=b'\x04' * 8,
i_id=None, r_id=None, id_type='fqdn', nonce=None,
auth_data=None, local_ts=None, remote_ts=None,
- auth_method='shared-key'):
+ auth_method='shared-key', priv_key=None, natt=False):
+ self.natt = natt
+ if natt:
+ self.sport = 4500
+ self.dport = 4500
+ else:
+ self.sport = 500
+ self.dport = 500
self.dh_params = None
self.test = test
+ self.priv_key = priv_key
self.is_initiator = is_initiator
nonce = nonce or os.urandom(32)
self.auth_data = auth_data
@@ -248,8 +259,14 @@ class IKEv2SA(object):
def auth_init(self):
prf = self.ike_prf_alg.mod()
authmsg = self.generate_authmsg(prf, raw(self.init_req_packet))
- psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
- self.auth_data = self.calc_prf(prf, psk, authmsg)
+ if self.auth_method == 'shared-key':
+ psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
+ self.auth_data = self.calc_prf(prf, psk, authmsg)
+ elif self.auth_method == 'rsa-sig':
+ self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(),
+ hashes.SHA1())
+ else:
+ raise TypeError('unknown auth method type!')
def encrypt(self, data):
data = self.ike_crypto_alg.pad(data)
@@ -359,15 +376,21 @@ class IKEv2SA(object):
def esp_crypto_attr(self):
return self.crypto_attr(self.esp_crypto_key_len)
+ def compute_nat_sha1(self, ip, port):
+ data = self.ispi + b'\x00' * 8 + ip + (port).to_bytes(2, 'big')
+ digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
+ digest.update(data)
+ return digest.finalize()
-class TestResponder(VppTestCase):
+
+class TemplateResponder(VppTestCase):
""" responder test """
@classmethod
def setUpClass(cls):
import scapy.contrib.ikev2 as _ikev2
globals()['ikev2'] = _ikev2
- super(TestResponder, cls).setUpClass()
+ super(TemplateResponder, cls).setUpClass()
cls.create_pg_interfaces(range(2))
for i in cls.pg_interfaces:
i.admin_up()
@@ -376,40 +399,24 @@ class TestResponder(VppTestCase):
@classmethod
def tearDownClass(cls):
- super(TestResponder, cls).tearDownClass()
+ super(TemplateResponder, cls).tearDownClass()
def setUp(self):
- super(TestResponder, self).setUp()
+ super(TemplateResponder, self).setUp()
self.config_tc()
-
- def config_tc(self):
- self.p = Profile(self, 'pr1')
- self.p.add_auth(method='shared-key', data=b'$3cr3tpa$$w0rd')
- self.p.add_local_id(id_type='fqdn', data=b'vpp.home')
- self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com')
- self.p.add_local_ts(start_addr=0x0a0a0a0, end_addr=0x0a0a0aff)
- self.p.add_remote_ts(start_addr=0xa000000, end_addr=0xa0000ff)
self.p.add_vpp_config()
-
- self.sa = IKEv2SA(self, i_id=self.p.remote_id['data'],
- r_id=self.p.local_id['data'],
- is_initiator=True, auth_data=self.p.auth['data'],
- id_type=self.p.local_id['id_type'],
- local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
-
- self.sa.set_ike_props(crypto='AES-CBC', crypto_key_len=32,
- integ='HMAC-SHA1-96', prf='PRF_HMAC_SHA2_256',
- dh='2048MODPgr')
- self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
- integ='HMAC-SHA1-96')
self.sa.generate_dh_data()
- def create_ike_msg(self, src_if, msg, sport=500, dport=500):
- return (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
- IP(src=src_if.remote_ip4, dst=src_if.local_ip4) /
- UDP(sport=sport, dport=dport) / msg)
+ def create_ike_msg(self, src_if, msg, sport=500, dport=500, natt=False):
+ res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
+ IP(src=src_if.remote_ip4, dst=src_if.local_ip4) /
+ UDP(sport=sport, dport=dport))
+ if natt:
+ # insert non ESP marker
+ res = res / Raw(b'\x00' * 4)
+ return res / msg
- def send_sa_init(self):
+ def send_sa_init(self, behind_nat=False):
tr_attr = self.sa.ike_crypto_attr()
trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
transform_id=self.sa.ike_crypto, length=tr_attr[1],
@@ -424,6 +431,11 @@ class TestResponder(VppTestCase):
props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
trans_nb=4, trans=trans))
+ if behind_nat:
+ next_payload = 'Notify'
+ else:
+ next_payload = None
+
self.sa.init_req_packet = (
ikev2.IKEv2(init_SPI=self.sa.ispi,
flags='Initiator', exch_type='IKE_SA_INIT') /
@@ -431,9 +443,20 @@ class TestResponder(VppTestCase):
ikev2.IKEv2_payload_KE(next_payload='Nonce',
group=self.sa.ike_dh,
load=self.sa.dh_pub_key()) /
- ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce))
-
- ike_msg = self.create_ike_msg(self.pg0, self.sa.init_req_packet)
+ ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
+ load=self.sa.i_nonce))
+
+ if behind_nat:
+ src_nat = self.sa.compute_nat_sha1(b'\x0a\x0a\x0a\x01',
+ self.sa.sport)
+ nat_detection = ikev2.IKEv2_payload_Notify(
+ type='NAT_DETECTION_SOURCE_IP',
+ load=src_nat)
+ self.sa.init_req_packet = self.sa.init_req_packet / nat_detection
+
+ ike_msg = self.create_ike_msg(self.pg0, self.sa.init_req_packet,
+ self.sa.sport, self.sa.dport,
+ self.sa.natt)
self.pg0.add_stream(ike_msg)
self.pg0.enable_capture()
self.pg_start()
@@ -463,7 +486,8 @@ class TestResponder(VppTestCase):
ikev2.IKEv2_payload_IDr(next_payload='AUTH',
IDtype=self.sa.id_type, load=self.sa.r_id) /
ikev2.IKEv2_payload_AUTH(next_payload='SA',
- auth_type=2, load=self.sa.auth_data) /
+ auth_type=AuthMethod.value(self.sa.auth_method),
+ load=self.sa.auth_data) /
ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
ikev2.IKEv2_payload_TSi(next_payload='TSr',
number_of_TSs=len(tsi),
@@ -490,15 +514,27 @@ class TestResponder(VppTestCase):
sa_auth = sa_auth / Raw(hmac_data[:trunc_len])
assert(len(sa_auth) == tlen)
- packet = self.create_ike_msg(self.pg0, sa_auth)
+ packet = self.create_ike_msg(self.pg0, sa_auth, self.sa.sport,
+ self.sa.dport, self.sa.natt)
self.pg0.add_stream(packet)
self.pg0.enable_capture()
self.pg_start()
capture = self.pg0.get_capture(1)
self.verify_sa_auth(capture[0])
+ def get_ike_header(self, packet):
+ try:
+ ih = packet[ikev2.IKEv2]
+ except IndexError as e:
+ # this is a workaround for getting IKEv2 layer as both ikev2 and
+ # ipsec register for port 4500
+ esp = packet[ESP]
+ ih = self.verify_and_remove_non_esp_marker(esp)
+ return ih
+
def verify_sa_init(self, packet):
- ih = packet[ikev2.IKEv2]
+ ih = self.get_ike_header(packet)
+
self.assertEqual(ih.exch_type, 34)
self.assertTrue('Response' in ih.flags)
self.assertEqual(ih.init_SPI, self.sa.ispi)
@@ -515,13 +551,24 @@ class TestResponder(VppTestCase):
self.sa.calc_keys()
self.sa.auth_init()
+ def verify_and_remove_non_esp_marker(self, packet):
+ if self.sa.natt:
+ # if we are in nat traversal mode check for non esp marker
+ # and remove it
+ data = raw(packet)
+ self.assertEqual(data[:4], b'\x00' * 4)
+ return ikev2.IKEv2(data[4:])
+ else:
+ return packet
+
+ def verify_udp(self, udp):
+ self.assertEqual(udp.sport, self.sa.sport)
+ self.assertEqual(udp.dport, self.sa.dport)
+
def verify_sa_auth(self, packet):
- try:
- ike = packet[ikev2.IKEv2]
- ep = packet[ikev2.IKEv2_payload_Encrypted]
- except KeyError as e:
- self.logger.error("unexpected reply: no IKEv2/Encrypt payload!")
- raise
+ ike = self.get_ike_header(packet)
+ udp = packet[UDP]
+ self.verify_udp(udp)
plain = self.sa.hmac_and_decrypt(ike)
self.sa.calc_child_keys()
@@ -545,10 +592,79 @@ class TestResponder(VppTestCase):
self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai)
def test_responder(self):
- self.send_sa_init()
+ self.send_sa_init(self.sa.natt)
self.send_sa_auth()
self.verify_child_sas()
+class Ikev2Params(object):
+ def config_params(self, params={}):
+ is_natt = 'natt' in params and params['natt'] or False
+ self.p = Profile(self, 'pr1')
+
+ if 'auth' in params and params['auth'] == 'rsa-sig':
+ auth_method = 'rsa-sig'
+ work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/'
+ self.vapi.ikev2_set_local_key(
+ key_file=work_dir + params['server-key'])
+
+ client_file = work_dir + params['client-cert']
+ server_pem = open(work_dir + params['server-cert']).read()
+ client_priv = open(work_dir + params['client-key']).read()
+ client_priv = load_pem_private_key(str.encode(client_priv), None,
+ default_backend())
+ self.peer_cert = x509.load_pem_x509_certificate(
+ str.encode(server_pem),
+ default_backend())
+ self.p.add_auth(method='rsa-sig', data=str.encode(client_file))
+ auth_data = None
+ else:
+ auth_data = b'$3cr3tpa$$w0rd'
+ self.p.add_auth(method='shared-key', data=auth_data)
+ auth_method = 'shared-key'
+ client_priv = None
+
+ self.p.add_local_id(id_type='fqdn', data=b'vpp.home')
+ self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com')
+ self.p.add_local_ts(start_addr=0x0a0a0a0, end_addr=0x0a0a0aff)
+ self.p.add_remote_ts(start_addr=0xa000000, end_addr=0xa0000ff)
+
+ self.sa = IKEv2SA(self, i_id=self.p.remote_id['data'],
+ r_id=self.p.local_id['data'],
+ id_type=self.p.local_id['id_type'], natt=is_natt,
+ priv_key=client_priv, auth_method=auth_method,
+ auth_data=auth_data,
+ local_ts=self.p.remote_ts, remote_ts=self.p.local_ts)
+
+ self.sa.set_ike_props(crypto='AES-CBC', crypto_key_len=32,
+ integ='HMAC-SHA1-96', prf='PRF_HMAC_SHA2_256',
+ dh='2048MODPgr')
+ self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32,
+ integ='HMAC-SHA1-96')
+
+
+class TestResponderNATT(TemplateResponder, Ikev2Params):
+ """ test ikev2 responder - nat traversal """
+ def config_tc(self):
+ self.config_params(
+ {'natt': True})
+
+
+class TestResponderPsk(TemplateResponder, Ikev2Params):
+ """ test ikev2 responder - pre shared key auth """
+ def config_tc(self):
+ self.config_params()
+
+
+class TestResponderRsaSign(TemplateResponder, Ikev2Params):
+ """ test ikev2 responder - cert based auth """
+ def config_tc(self):
+ self.config_params({
+ 'auth': 'rsa-sig',
+ 'server-key': 'server-key.pem',
+ 'client-key': 'client-key.pem',
+ 'client-cert': 'client-cert.pem',
+ 'server-cert': 'server-cert.pem'})
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)