diff options
author | Filip Tehlar <ftehlar@cisco.com> | 2020-06-23 20:35:58 +0000 |
---|---|---|
committer | BenoƮt Ganne <bganne@cisco.com> | 2020-06-30 08:15:01 +0000 |
commit | bfeae8c57e5da3c8c36291d8d6834cffe4db7408 (patch) | |
tree | 973f40bcc212f052a7819d1875279f0cc6f9ad86 /src/plugins/ikev2/test/test_ikev2.py | |
parent | cc1085647b2ae36e6c086d65b4e81b9f1cf9fc9a (diff) |
tests: ikev2: add nat traversal & cert based auth test
Type: test
Change-Id: I3e8e451c5deaf04f519a471369370c383d9cda3b
Signed-off-by: Filip Tehlar <ftehlar@cisco.com>
Diffstat (limited to 'src/plugins/ikev2/test/test_ikev2.py')
-rw-r--r-- | src/plugins/ikev2/test/test_ikev2.py | 210 |
1 files changed, 163 insertions, 47 deletions
diff --git a/src/plugins/ikev2/test/test_ikev2.py b/src/plugins/ikev2/test/test_ikev2.py index d2d82ba58e4..f0053fd55b8 100644 --- a/src/plugins/ikev2/test/test_ikev2.py +++ b/src/plugins/ikev2/test/test_ikev2.py @@ -1,17 +1,20 @@ import os +from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes, hmac -from cryptography.hazmat.primitives.asymmetric import dh +from cryptography.hazmat.primitives.asymmetric import dh, padding +from cryptography.hazmat.primitives.serialization import load_pem_private_key from cryptography.hazmat.primitives.ciphers import ( Cipher, algorithms, modes, ) +from scapy.layers.ipsec import ESP from scapy.layers.inet import IP, UDP, Ether from scapy.packet import raw, Raw from scapy.utils import long_converter from framework import VppTestCase, VppTestRunner -from vpp_ikev2 import Profile, IDType +from vpp_ikev2 import Profile, IDType, AuthMethod KEY_PAD = b"Key Pad for IKEv2" @@ -100,9 +103,17 @@ class IKEv2SA(object): def __init__(self, test, is_initiator=True, spi=b'\x04' * 8, i_id=None, r_id=None, id_type='fqdn', nonce=None, auth_data=None, local_ts=None, remote_ts=None, - auth_method='shared-key'): + auth_method='shared-key', priv_key=None, natt=False): + self.natt = natt + if natt: + self.sport = 4500 + self.dport = 4500 + else: + self.sport = 500 + self.dport = 500 self.dh_params = None self.test = test + self.priv_key = priv_key self.is_initiator = is_initiator nonce = nonce or os.urandom(32) self.auth_data = auth_data @@ -248,8 +259,14 @@ class IKEv2SA(object): def auth_init(self): prf = self.ike_prf_alg.mod() authmsg = self.generate_authmsg(prf, raw(self.init_req_packet)) - psk = self.calc_prf(prf, self.auth_data, KEY_PAD) - self.auth_data = self.calc_prf(prf, psk, authmsg) + if self.auth_method == 'shared-key': + psk = self.calc_prf(prf, self.auth_data, KEY_PAD) + self.auth_data = self.calc_prf(prf, psk, authmsg) + elif self.auth_method == 'rsa-sig': + self.auth_data = self.priv_key.sign(authmsg, padding.PKCS1v15(), + hashes.SHA1()) + else: + raise TypeError('unknown auth method type!') def encrypt(self, data): data = self.ike_crypto_alg.pad(data) @@ -359,15 +376,21 @@ class IKEv2SA(object): def esp_crypto_attr(self): return self.crypto_attr(self.esp_crypto_key_len) + def compute_nat_sha1(self, ip, port): + data = self.ispi + b'\x00' * 8 + ip + (port).to_bytes(2, 'big') + digest = hashes.Hash(hashes.SHA1(), backend=default_backend()) + digest.update(data) + return digest.finalize() -class TestResponder(VppTestCase): + +class TemplateResponder(VppTestCase): """ responder test """ @classmethod def setUpClass(cls): import scapy.contrib.ikev2 as _ikev2 globals()['ikev2'] = _ikev2 - super(TestResponder, cls).setUpClass() + super(TemplateResponder, cls).setUpClass() cls.create_pg_interfaces(range(2)) for i in cls.pg_interfaces: i.admin_up() @@ -376,40 +399,24 @@ class TestResponder(VppTestCase): @classmethod def tearDownClass(cls): - super(TestResponder, cls).tearDownClass() + super(TemplateResponder, cls).tearDownClass() def setUp(self): - super(TestResponder, self).setUp() + super(TemplateResponder, self).setUp() self.config_tc() - - def config_tc(self): - self.p = Profile(self, 'pr1') - self.p.add_auth(method='shared-key', data=b'$3cr3tpa$$w0rd') - self.p.add_local_id(id_type='fqdn', data=b'vpp.home') - self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com') - self.p.add_local_ts(start_addr=0x0a0a0a0, end_addr=0x0a0a0aff) - self.p.add_remote_ts(start_addr=0xa000000, end_addr=0xa0000ff) self.p.add_vpp_config() - - self.sa = IKEv2SA(self, i_id=self.p.remote_id['data'], - r_id=self.p.local_id['data'], - is_initiator=True, auth_data=self.p.auth['data'], - id_type=self.p.local_id['id_type'], - local_ts=self.p.remote_ts, remote_ts=self.p.local_ts) - - self.sa.set_ike_props(crypto='AES-CBC', crypto_key_len=32, - integ='HMAC-SHA1-96', prf='PRF_HMAC_SHA2_256', - dh='2048MODPgr') - self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32, - integ='HMAC-SHA1-96') self.sa.generate_dh_data() - def create_ike_msg(self, src_if, msg, sport=500, dport=500): - return (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / - IP(src=src_if.remote_ip4, dst=src_if.local_ip4) / - UDP(sport=sport, dport=dport) / msg) + def create_ike_msg(self, src_if, msg, sport=500, dport=500, natt=False): + res = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / + IP(src=src_if.remote_ip4, dst=src_if.local_ip4) / + UDP(sport=sport, dport=dport)) + if natt: + # insert non ESP marker + res = res / Raw(b'\x00' * 4) + return res / msg - def send_sa_init(self): + def send_sa_init(self, behind_nat=False): tr_attr = self.sa.ike_crypto_attr() trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption', transform_id=self.sa.ike_crypto, length=tr_attr[1], @@ -424,6 +431,11 @@ class TestResponder(VppTestCase): props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2', trans_nb=4, trans=trans)) + if behind_nat: + next_payload = 'Notify' + else: + next_payload = None + self.sa.init_req_packet = ( ikev2.IKEv2(init_SPI=self.sa.ispi, flags='Initiator', exch_type='IKE_SA_INIT') / @@ -431,9 +443,20 @@ class TestResponder(VppTestCase): ikev2.IKEv2_payload_KE(next_payload='Nonce', group=self.sa.ike_dh, load=self.sa.dh_pub_key()) / - ikev2.IKEv2_payload_Nonce(load=self.sa.i_nonce)) - - ike_msg = self.create_ike_msg(self.pg0, self.sa.init_req_packet) + ikev2.IKEv2_payload_Nonce(next_payload=next_payload, + load=self.sa.i_nonce)) + + if behind_nat: + src_nat = self.sa.compute_nat_sha1(b'\x0a\x0a\x0a\x01', + self.sa.sport) + nat_detection = ikev2.IKEv2_payload_Notify( + type='NAT_DETECTION_SOURCE_IP', + load=src_nat) + self.sa.init_req_packet = self.sa.init_req_packet / nat_detection + + ike_msg = self.create_ike_msg(self.pg0, self.sa.init_req_packet, + self.sa.sport, self.sa.dport, + self.sa.natt) self.pg0.add_stream(ike_msg) self.pg0.enable_capture() self.pg_start() @@ -463,7 +486,8 @@ class TestResponder(VppTestCase): ikev2.IKEv2_payload_IDr(next_payload='AUTH', IDtype=self.sa.id_type, load=self.sa.r_id) / ikev2.IKEv2_payload_AUTH(next_payload='SA', - auth_type=2, load=self.sa.auth_data) / + auth_type=AuthMethod.value(self.sa.auth_method), + load=self.sa.auth_data) / ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) / ikev2.IKEv2_payload_TSi(next_payload='TSr', number_of_TSs=len(tsi), @@ -490,15 +514,27 @@ class TestResponder(VppTestCase): sa_auth = sa_auth / Raw(hmac_data[:trunc_len]) assert(len(sa_auth) == tlen) - packet = self.create_ike_msg(self.pg0, sa_auth) + packet = self.create_ike_msg(self.pg0, sa_auth, self.sa.sport, + self.sa.dport, self.sa.natt) self.pg0.add_stream(packet) self.pg0.enable_capture() self.pg_start() capture = self.pg0.get_capture(1) self.verify_sa_auth(capture[0]) + def get_ike_header(self, packet): + try: + ih = packet[ikev2.IKEv2] + except IndexError as e: + # this is a workaround for getting IKEv2 layer as both ikev2 and + # ipsec register for port 4500 + esp = packet[ESP] + ih = self.verify_and_remove_non_esp_marker(esp) + return ih + def verify_sa_init(self, packet): - ih = packet[ikev2.IKEv2] + ih = self.get_ike_header(packet) + self.assertEqual(ih.exch_type, 34) self.assertTrue('Response' in ih.flags) self.assertEqual(ih.init_SPI, self.sa.ispi) @@ -515,13 +551,24 @@ class TestResponder(VppTestCase): self.sa.calc_keys() self.sa.auth_init() + def verify_and_remove_non_esp_marker(self, packet): + if self.sa.natt: + # if we are in nat traversal mode check for non esp marker + # and remove it + data = raw(packet) + self.assertEqual(data[:4], b'\x00' * 4) + return ikev2.IKEv2(data[4:]) + else: + return packet + + def verify_udp(self, udp): + self.assertEqual(udp.sport, self.sa.sport) + self.assertEqual(udp.dport, self.sa.dport) + def verify_sa_auth(self, packet): - try: - ike = packet[ikev2.IKEv2] - ep = packet[ikev2.IKEv2_payload_Encrypted] - except KeyError as e: - self.logger.error("unexpected reply: no IKEv2/Encrypt payload!") - raise + ike = self.get_ike_header(packet) + udp = packet[UDP] + self.verify_udp(udp) plain = self.sa.hmac_and_decrypt(ike) self.sa.calc_child_keys() @@ -545,10 +592,79 @@ class TestResponder(VppTestCase): self.assertEqual(sa1.integrity_key.data[:len(c.sk_ai)], c.sk_ai) def test_responder(self): - self.send_sa_init() + self.send_sa_init(self.sa.natt) self.send_sa_auth() self.verify_child_sas() +class Ikev2Params(object): + def config_params(self, params={}): + is_natt = 'natt' in params and params['natt'] or False + self.p = Profile(self, 'pr1') + + if 'auth' in params and params['auth'] == 'rsa-sig': + auth_method = 'rsa-sig' + work_dir = os.getenv('BR') + '/../src/plugins/ikev2/test/certs/' + self.vapi.ikev2_set_local_key( + key_file=work_dir + params['server-key']) + + client_file = work_dir + params['client-cert'] + server_pem = open(work_dir + params['server-cert']).read() + client_priv = open(work_dir + params['client-key']).read() + client_priv = load_pem_private_key(str.encode(client_priv), None, + default_backend()) + self.peer_cert = x509.load_pem_x509_certificate( + str.encode(server_pem), + default_backend()) + self.p.add_auth(method='rsa-sig', data=str.encode(client_file)) + auth_data = None + else: + auth_data = b'$3cr3tpa$$w0rd' + self.p.add_auth(method='shared-key', data=auth_data) + auth_method = 'shared-key' + client_priv = None + + self.p.add_local_id(id_type='fqdn', data=b'vpp.home') + self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com') + self.p.add_local_ts(start_addr=0x0a0a0a0, end_addr=0x0a0a0aff) + self.p.add_remote_ts(start_addr=0xa000000, end_addr=0xa0000ff) + + self.sa = IKEv2SA(self, i_id=self.p.remote_id['data'], + r_id=self.p.local_id['data'], + id_type=self.p.local_id['id_type'], natt=is_natt, + priv_key=client_priv, auth_method=auth_method, + auth_data=auth_data, + local_ts=self.p.remote_ts, remote_ts=self.p.local_ts) + + self.sa.set_ike_props(crypto='AES-CBC', crypto_key_len=32, + integ='HMAC-SHA1-96', prf='PRF_HMAC_SHA2_256', + dh='2048MODPgr') + self.sa.set_esp_props(crypto='AES-CBC', crypto_key_len=32, + integ='HMAC-SHA1-96') + + +class TestResponderNATT(TemplateResponder, Ikev2Params): + """ test ikev2 responder - nat traversal """ + def config_tc(self): + self.config_params( + {'natt': True}) + + +class TestResponderPsk(TemplateResponder, Ikev2Params): + """ test ikev2 responder - pre shared key auth """ + def config_tc(self): + self.config_params() + + +class TestResponderRsaSign(TemplateResponder, Ikev2Params): + """ test ikev2 responder - cert based auth """ + def config_tc(self): + self.config_params({ + 'auth': 'rsa-sig', + 'server-key': 'server-key.pem', + 'client-key': 'client-key.pem', + 'client-cert': 'client-cert.pem', + 'server-cert': 'server-cert.pem'}) + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner) |