summaryrefslogtreecommitdiffstats
path: root/src/plugins/ikev2/test
diff options
context:
space:
mode:
authorFilip Tehlar <ftehlar@cisco.com>2020-09-23 11:20:12 +0000
committerBeno�t Ganne <bganne@cisco.com>2020-10-13 09:56:55 +0000
commite7c8396982607634b4c747870499671ffa53868e (patch)
treeb010ad2786fa3145c6d5d02f73004da2393d1151 /src/plugins/ikev2/test
parentdc6378f71bc7c9835845a91dbbc1646ea46df51e (diff)
ikev2: fix initial contact cleanup
When looking for existing SA connection to clean up search all per thread data, not only current one. Type: fix Change-Id: I59312e08a07ca1f474b6389999e59320c5128e7d Signed-off-by: Filip Tehlar <ftehlar@cisco.com>
Diffstat (limited to 'src/plugins/ikev2/test')
-rw-r--r--src/plugins/ikev2/test/test_ikev2.py658
1 files changed, 440 insertions, 218 deletions
diff --git a/src/plugins/ikev2/test/test_ikev2.py b/src/plugins/ikev2/test/test_ikev2.py
index a47c59f578f..4f64b56d853 100644
--- a/src/plugins/ikev2/test/test_ikev2.py
+++ b/src/plugins/ikev2/test/test_ikev2.py
@@ -193,38 +193,52 @@ class IKEv2SA(object):
else:
self.rspi = spi
self.ispi = 8 * b'\x00'
- self.r_nonce = None
+ self.r_nonce = nonce
self.child_sas = [IKEv2ChildSA(local_ts, remote_ts)]
def new_msg_id(self):
self.msg_id += 1
return self.msg_id
- def dh_pub_key(self):
+ @property
+ def my_dh_pub_key(self):
+ if self.is_initiator:
+ return self.i_dh_data
+ return self.r_dh_data
+
+ @property
+ def peer_dh_pub_key(self):
+ if self.is_initiator:
+ return self.r_dh_data
return self.i_dh_data
def compute_secret(self):
priv = self.dh_private_key
- peer = self.r_dh_data
+ peer = self.peer_dh_pub_key
p, g, l = self.ike_group
return pow(int.from_bytes(peer, 'big'),
int.from_bytes(priv, 'big'), p).to_bytes(l, 'big')
def generate_dh_data(self):
# generate DH keys
+ if self.ike_dh not in DH:
+ raise NotImplementedError('%s not in DH group' % self.ike_dh)
+
+ if self.dh_params is None:
+ dhg = DH[self.ike_dh]
+ pn = dh.DHParameterNumbers(dhg[0], dhg[1])
+ self.dh_params = pn.parameters(default_backend())
+
+ priv = self.dh_params.generate_private_key()
+ pub = priv.public_key()
+ x = priv.private_numbers().x
+ self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
+ y = pub.public_numbers().y
+
if self.is_initiator:
- if self.ike_dh not in DH:
- raise NotImplementedError('%s not in DH group' % self.ike_dh)
- if self.dh_params is None:
- dhg = DH[self.ike_dh]
- pn = dh.DHParameterNumbers(dhg[0], dhg[1])
- self.dh_params = pn.parameters(default_backend())
- priv = self.dh_params.generate_private_key()
- pub = priv.public_key()
- x = priv.private_numbers().x
- self.dh_private_key = x.to_bytes(priv.key_size // 8, 'big')
- y = pub.public_numbers().y
self.i_dh_data = y.to_bytes(pub.key_size // 8, 'big')
+ else:
+ self.r_dh_data = y.to_bytes(pub.key_size // 8, 'big')
def complete_dh_data(self):
self.dh_shared_secret = self.compute_secret()
@@ -335,13 +349,21 @@ class IKEv2SA(object):
id = self.i_id
nonce = self.r_nonce
key = self.sk_pi
+ else:
+ id = self.r_id
+ nonce = self.i_nonce
+ key = self.sk_pr
data = bytes([self.id_type, 0, 0, 0]) + id
id_hash = self.calc_prf(prf, key, data)
return packet + nonce + id_hash
def auth_init(self):
prf = self.ike_prf_alg.mod()
- authmsg = self.generate_authmsg(prf, raw(self.init_req_packet))
+ if self.is_initiator:
+ packet = self.init_req_packet
+ else:
+ packet = self.init_resp_packet
+ authmsg = self.generate_authmsg(prf, raw(packet))
if self.auth_method == 'shared-key':
psk = self.calc_prf(prf, self.auth_data, KEY_PAD)
self.auth_data = self.calc_prf(prf, psk, authmsg)
@@ -440,7 +462,10 @@ class IKEv2SA(object):
ts1 = ikev2.IPv6TrafficSelector(**ts_data)
ts_data.update(self.build_ts_addr(c.remote_ts, '6'))
ts2 = ikev2.IPv6TrafficSelector(**ts_data)
- return ([ts1], [ts2])
+
+ if self.is_initiator:
+ return ([ts1], [ts2])
+ return ([ts2], [ts1])
def set_ike_props(self, crypto, crypto_key_len, integ, prf, dh):
if crypto not in CRYPTO_ALGOS:
@@ -492,14 +517,14 @@ class IKEv2SA(object):
return digest.finalize()
-class TemplateResponder(VppTestCase):
- """ responder test template """
+class IkePeer(VppTestCase):
+ """ common class for initiator and responder """
@classmethod
def setUpClass(cls):
import scapy.contrib.ikev2 as _ikev2
globals()['ikev2'] = _ikev2
- super(TemplateResponder, cls).setUpClass()
+ super(IkePeer, cls).setUpClass()
cls.create_pg_interfaces(range(2))
for i in cls.pg_interfaces:
i.admin_up()
@@ -510,10 +535,10 @@ class TemplateResponder(VppTestCase):
@classmethod
def tearDownClass(cls):
- super(TemplateResponder, cls).tearDownClass()
+ super(IkePeer, cls).tearDownClass()
def setUp(self):
- super(TemplateResponder, self).setUp()
+ super(IkePeer, self).setUp()
self.config_tc()
self.p.add_vpp_config()
self.assertIsNotNone(self.p.query_vpp_config())
@@ -521,36 +546,6 @@ class TemplateResponder(VppTestCase):
self.vapi.cli('ikev2 set logging level 4')
self.vapi.cli('event-lo clear')
- def tearDown(self):
- super(TemplateResponder, self).tearDown()
- if self.sa.is_initiator:
- self.initiate_del_sa()
- r = self.vapi.ikev2_sa_dump()
- self.assertEqual(len(r), 0)
-
- self.p.remove_vpp_config()
- self.assertIsNone(self.p.query_vpp_config())
-
- def verify_del_sa(self, packet):
- ih = self.get_ike_header(packet)
- self.assertEqual(ih.id, self.sa.msg_id)
- self.assertEqual(ih.exch_type, 37) # exchange informational
-
- def initiate_del_sa(self):
- header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
- flags='Initiator', exch_type='INFORMATIONAL',
- id=self.sa.new_msg_id())
- del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
- ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
- packet = self.create_packet(self.pg0, ike_msg,
- self.sa.sport, self.sa.dport,
- self.sa.natt, self.ip6)
- self.pg0.add_stream(packet)
- self.pg0.enable_capture()
- self.pg_start()
- capture = self.pg0.get_capture(1)
- self.verify_del_sa(capture[0])
-
def create_packet(self, src_if, msg, sport=500, dport=500, natt=False,
use_ip6=False):
if use_ip6:
@@ -569,60 +564,30 @@ class TemplateResponder(VppTestCase):
res = res / Raw(b'\x00' * 4)
return res / msg
- def send_sa_init(self, behind_nat=False):
- tr_attr = self.sa.ike_crypto_attr()
- trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
- transform_id=self.sa.ike_crypto, length=tr_attr[1],
- key_length=tr_attr[0]) /
- ikev2.IKEv2_payload_Transform(transform_type='Integrity',
- transform_id=self.sa.ike_integ) /
- ikev2.IKEv2_payload_Transform(transform_type='PRF',
- transform_id=self.sa.ike_prf_alg.name) /
- ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
- transform_id=self.sa.ike_dh))
-
- props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
- trans_nb=4, trans=trans))
-
- if behind_nat:
- next_payload = 'Notify'
- else:
- next_payload = None
+ def verify_udp(self, udp):
+ self.assertEqual(udp.sport, self.sa.sport)
+ self.assertEqual(udp.dport, self.sa.dport)
- self.sa.init_req_packet = (
- ikev2.IKEv2(init_SPI=self.sa.ispi,
- flags='Initiator', exch_type='IKE_SA_INIT') /
- ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
- ikev2.IKEv2_payload_KE(next_payload='Nonce',
- group=self.sa.ike_dh,
- load=self.sa.dh_pub_key()) /
- ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
- load=self.sa.i_nonce))
+ def get_ike_header(self, packet):
+ try:
+ ih = packet[ikev2.IKEv2]
+ except IndexError as e:
+ # this is a workaround for getting IKEv2 layer as both ikev2 and
+ # ipsec register for port 4500
+ esp = packet[ESP]
+ ih = self.verify_and_remove_non_esp_marker(esp)
+ self.assertEqual(ih.version, 0x20)
+ return ih
- if behind_nat:
- src_address = b'\x0a\x0a\x0a\x01'
+ def verify_and_remove_non_esp_marker(self, packet):
+ if self.sa.natt:
+ # if we are in nat traversal mode check for non esp marker
+ # and remove it
+ data = raw(packet)
+ self.assertEqual(data[:4], b'\x00' * 4)
+ return ikev2.IKEv2(data[4:])
else:
- src_address = bytes(self.pg0.local_ip4, 'ascii')
-
- src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
- dst_nat = self.sa.compute_nat_sha1(bytes(self.pg0.remote_ip4, 'ascii'),
- self.sa.sport)
- nat_src_detection = ikev2.IKEv2_payload_Notify(
- type='NAT_DETECTION_SOURCE_IP', load=src_nat)
- nat_dst_detection = ikev2.IKEv2_payload_Notify(
- type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
- self.sa.init_req_packet = (self.sa.init_req_packet /
- nat_src_detection /
- nat_dst_detection)
-
- ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
- self.sa.sport, self.sa.dport,
- self.sa.natt, self.ip6)
- self.pg0.add_stream(ike_msg)
- self.pg0.enable_capture()
- self.pg_start()
- capture = self.pg0.get_capture(1)
- self.verify_sa_init(capture[0])
+ return packet
def encrypt_ike_msg(self, header, plain, first_payload):
if self.sa.ike_crypto == 'AES-GCM-16ICV':
@@ -658,114 +623,17 @@ class TemplateResponder(VppTestCase):
assert(len(res) == tlen)
return res
- def send_sa_auth(self):
- tr_attr = self.sa.esp_crypto_attr()
- trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
- transform_id=self.sa.esp_crypto, length=tr_attr[1],
- key_length=tr_attr[0]) /
- ikev2.IKEv2_payload_Transform(transform_type='Integrity',
- transform_id=self.sa.esp_integ) /
- ikev2.IKEv2_payload_Transform(
- transform_type='Extended Sequence Number',
- transform_id='No ESN') /
- ikev2.IKEv2_payload_Transform(
- transform_type='Extended Sequence Number',
- transform_id='ESN'))
-
- props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
- SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
-
- tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
- plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
- IDtype=self.sa.id_type, load=self.sa.i_id) /
- ikev2.IKEv2_payload_IDr(next_payload='AUTH',
- IDtype=self.sa.id_type, load=self.sa.r_id) /
- ikev2.IKEv2_payload_AUTH(next_payload='SA',
- auth_type=AuthMethod.value(self.sa.auth_method),
- load=self.sa.auth_data) /
- ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
- ikev2.IKEv2_payload_TSi(next_payload='TSr',
- number_of_TSs=len(tsi),
- traffic_selector=tsi) /
- ikev2.IKEv2_payload_TSr(next_payload='Notify',
- number_of_TSs=len(tsr),
- traffic_selector=tsr) /
- ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
-
- header = ikev2.IKEv2(
- init_SPI=self.sa.ispi,
- resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
- flags='Initiator', exch_type='IKE_AUTH')
-
- ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
- packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
- self.sa.dport, self.sa.natt, self.ip6)
- self.pg0.add_stream(packet)
- self.pg0.enable_capture()
- self.pg_start()
- capture = self.pg0.get_capture(1)
- self.verify_sa_auth(capture[0])
-
- def get_ike_header(self, packet):
- try:
- ih = packet[ikev2.IKEv2]
- except IndexError as e:
- # this is a workaround for getting IKEv2 layer as both ikev2 and
- # ipsec register for port 4500
- esp = packet[ESP]
- ih = self.verify_and_remove_non_esp_marker(esp)
-
- self.assertEqual(ih.version, 0x20)
- return ih
-
- def verify_sa_init(self, packet):
- ih = self.get_ike_header(packet)
-
- self.assertEqual(ih.id, self.sa.msg_id)
- self.assertEqual(ih.exch_type, 34)
- self.assertTrue('Response' in ih.flags)
- self.assertEqual(ih.init_SPI, self.sa.ispi)
- self.assertNotEqual(ih.resp_SPI, 0)
- self.sa.rspi = ih.resp_SPI
- try:
- sa = ih[ikev2.IKEv2_payload_SA]
- self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
- self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
- except IndexError as e:
- self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
- self.logger.error(ih.show())
- raise
- self.sa.complete_dh_data()
- self.sa.calc_keys()
- self.sa.auth_init()
-
- def verify_and_remove_non_esp_marker(self, packet):
- if self.sa.natt:
- # if we are in nat traversal mode check for non esp marker
- # and remove it
- data = raw(packet)
- self.assertEqual(data[:4], b'\x00' * 4)
- return ikev2.IKEv2(data[4:])
- else:
- return packet
-
- def verify_udp(self, udp):
- self.assertEqual(udp.sport, self.sa.sport)
- self.assertEqual(udp.dport, self.sa.dport)
-
- def verify_sa_auth(self, packet):
- ike = self.get_ike_header(packet)
- udp = packet[UDP]
- self.verify_udp(udp)
- self.assertEqual(ike.id, self.sa.msg_id)
- plain = self.sa.hmac_and_decrypt(ike)
- self.sa.calc_child_keys()
-
def verify_ipsec_sas(self):
sas = self.vapi.ipsec_sa_dump()
self.assertEqual(len(sas), 2)
- sa0 = sas[0].entry
- sa1 = sas[1].entry
+ e = VppEnum.vl_api_ipsec_sad_flags_t
+ if self.sa.is_initiator:
+ sa0 = sas[0].entry
+ sa1 = sas[1].entry
+ else:
+ sa1 = sas[0].entry
+ sa0 = sas[1].entry
+
c = self.sa.child_sas[0]
vpp_crypto_alg = self.vpp_enums[self.sa.vpp_esp_cypto_alg]
@@ -814,11 +682,19 @@ class TemplateResponder(VppTestCase):
self.assertEqual(self.sa.ispi, (sa.ispi).to_bytes(8, 'big'))
self.assertEqual(self.sa.rspi, (sa.rspi).to_bytes(8, 'big'))
if self.ip6:
- self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
- self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
+ if self.sa.is_initiator:
+ self.assertEqual(sa.iaddr, IPv6Address(self.pg0.remote_ip6))
+ self.assertEqual(sa.raddr, IPv6Address(self.pg0.local_ip6))
+ else:
+ self.assertEqual(sa.iaddr, IPv6Address(self.pg0.local_ip6))
+ self.assertEqual(sa.raddr, IPv6Address(self.pg0.remote_ip6))
else:
- self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
- self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
+ if self.sa.is_initiator:
+ self.assertEqual(sa.iaddr, IPv4Address(self.pg0.remote_ip4))
+ self.assertEqual(sa.raddr, IPv4Address(self.pg0.local_ip4))
+ else:
+ self.assertEqual(sa.iaddr, IPv4Address(self.pg0.local_ip4))
+ self.assertEqual(sa.raddr, IPv4Address(self.pg0.remote_ip4))
self.verify_keymat(sa.keys, self.sa, 'sk_d')
self.verify_keymat(sa.keys, self.sa, 'sk_ai')
self.verify_keymat(sa.keys, self.sa, 'sk_ar')
@@ -892,8 +768,314 @@ class TemplateResponder(VppTestCase):
self.assertEqual(api_ts.end_port, ts.end_port)
self.assertEqual(api_ts.protocol_id, ts.IP_protocol_ID)
+
+class TemplateInitiator(IkePeer):
+ """ initiator test template """
+
+ def tearDown(self):
+ super(TemplateInitiator, self).tearDown()
+
+ def verify_sa_init_request(self, packet):
+ ih = packet[ikev2.IKEv2]
+ self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
+ self.assertEqual(ih.exch_type, 34) # SA_INIT
+ self.sa.ispi = ih.init_SPI
+ self.assertEqual(ih.resp_SPI, 8 * b'\x00')
+ self.assertIn('Initiator', ih.flags)
+ self.assertNotIn('Response', ih.flags)
+ self.sa.i_nonce = ih[ikev2.IKEv2_payload_Nonce].load
+ self.sa.i_dh_data = ih[ikev2.IKEv2_payload_KE].load
+
+ prop = packet[ikev2.IKEv2_payload_Proposal]
+ self.assertEqual(prop.proto, 1) # proto = ikev2
+ self.assertEqual(prop.proposal, 1)
+ self.assertEqual(prop.trans[0].transform_type, 1) # encryption
+ self.assertEqual(prop.trans[0].transform_id,
+ self.p.ike_transforms['crypto_alg'])
+ self.assertEqual(prop.trans[1].transform_type, 2) # prf
+ self.assertEqual(prop.trans[1].transform_id, 5) # "hmac-sha2-256"
+ self.assertEqual(prop.trans[2].transform_type, 4) # dh
+ self.assertEqual(prop.trans[2].transform_id,
+ self.p.ike_transforms['dh_group'])
+
+ self.sa.complete_dh_data()
+ self.sa.calc_keys()
+
+ def verify_sa_auth_req(self, packet):
+ ih = self.get_ike_header(packet)
+ self.assertEqual(ih.resp_SPI, self.sa.rspi)
+ self.assertEqual(ih.init_SPI, self.sa.ispi)
+ self.assertEqual(ih.exch_type, 35) # IKE_AUTH
+ self.assertIn('Initiator', ih.flags)
+ self.assertNotIn('Response', ih.flags)
+
+ udp = packet[UDP]
+ self.verify_udp(udp)
+ self.assertEqual(ih.id, self.sa.msg_id + 1)
+ self.sa.msg_id += 1
+ plain = self.sa.hmac_and_decrypt(ih)
+ idi = ikev2.IKEv2_payload_IDi(plain)
+ idr = ikev2.IKEv2_payload_IDr(idi.payload)
+ self.assertEqual(idi.load, self.sa.i_id)
+ self.assertEqual(idr.load, self.sa.r_id)
+
+ def send_init_response(self):
+ tr_attr = self.sa.ike_crypto_attr()
+ trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
+ transform_id=self.sa.ike_crypto, length=tr_attr[1],
+ key_length=tr_attr[0]) /
+ ikev2.IKEv2_payload_Transform(transform_type='Integrity',
+ transform_id=self.sa.ike_integ) /
+ ikev2.IKEv2_payload_Transform(transform_type='PRF',
+ transform_id=self.sa.ike_prf_alg.name) /
+ ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
+ transform_id=self.sa.ike_dh))
+ props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
+ trans_nb=4, trans=trans))
+ self.sa.init_resp_packet = (
+ ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ exch_type='IKE_SA_INIT', flags='Response') /
+ ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
+ ikev2.IKEv2_payload_KE(next_payload='Nonce',
+ group=self.sa.ike_dh,
+ load=self.sa.my_dh_pub_key) /
+ ikev2.IKEv2_payload_Nonce(load=self.sa.r_nonce))
+
+ ike_msg = self.create_packet(self.pg0, self.sa.init_resp_packet,
+ self.sa.sport, self.sa.dport,
+ self.sa.natt, self.ip6)
+ self.pg_send(self.pg0, ike_msg)
+ capture = self.pg0.get_capture(1)
+ self.verify_sa_auth_req(capture[0])
+
+ def initiate_sa_init(self):
+ self.pg0.enable_capture()
+ self.pg_start()
+ self.vapi.ikev2_initiate_sa_init(name=self.p.profile_name)
+
+ capture = self.pg0.get_capture(1)
+ self.verify_sa_init_request(capture[0])
+ self.send_init_response()
+
+ def send_auth_response(self):
+ tr_attr = self.sa.esp_crypto_attr()
+ trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
+ transform_id=self.sa.esp_crypto, length=tr_attr[1],
+ key_length=tr_attr[0]) /
+ ikev2.IKEv2_payload_Transform(transform_type='Integrity',
+ transform_id=self.sa.esp_integ) /
+ ikev2.IKEv2_payload_Transform(
+ transform_type='Extended Sequence Number',
+ transform_id='No ESN') /
+ ikev2.IKEv2_payload_Transform(
+ transform_type='Extended Sequence Number',
+ transform_id='ESN'))
+
+ props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
+ SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
+
+ tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
+ plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
+ IDtype=self.sa.id_type, load=self.sa.i_id) /
+ ikev2.IKEv2_payload_IDr(next_payload='AUTH',
+ IDtype=self.sa.id_type, load=self.sa.r_id) /
+ ikev2.IKEv2_payload_AUTH(next_payload='SA',
+ auth_type=AuthMethod.value(self.sa.auth_method),
+ load=self.sa.auth_data) /
+ ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
+ ikev2.IKEv2_payload_TSi(next_payload='TSr',
+ number_of_TSs=len(tsi),
+ traffic_selector=tsi) /
+ ikev2.IKEv2_payload_TSr(next_payload='Notify',
+ number_of_TSs=len(tsr),
+ traffic_selector=tsr) /
+ ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
+
+ header = ikev2.IKEv2(
+ init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
+ flags='Response', exch_type='IKE_AUTH')
+
+ ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
+ packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
+ self.sa.dport, self.sa.natt, self.ip6)
+ self.pg_send(self.pg0, packet)
+
+ def test_initiator(self):
+ self.initiate_sa_init()
+ self.sa.auth_init()
+ self.sa.calc_child_keys()
+ self.send_auth_response()
+ self.verify_ike_sas()
+
+
+class TemplateResponder(IkePeer):
+ """ responder test template """
+
+ def tearDown(self):
+ super(TemplateResponder, self).tearDown()
+ if self.sa.is_initiator:
+ self.initiate_del_sa()
+ r = self.vapi.ikev2_sa_dump()
+ self.assertEqual(len(r), 0)
+
+ self.p.remove_vpp_config()
+ self.assertIsNone(self.p.query_vpp_config())
+
+ def verify_del_sa(self, packet):
+ ih = self.get_ike_header(packet)
+ self.assertEqual(ih.id, self.sa.msg_id)
+ self.assertEqual(ih.exch_type, 37) # exchange informational
+
+ def initiate_del_sa(self):
+ header = ikev2.IKEv2(init_SPI=self.sa.ispi, resp_SPI=self.sa.rspi,
+ flags='Initiator', exch_type='INFORMATIONAL',
+ id=self.sa.new_msg_id())
+ del_sa = ikev2.IKEv2_payload_Delete(proto='IKEv2')
+ ike_msg = self.encrypt_ike_msg(header, del_sa, 'Delete')
+ packet = self.create_packet(self.pg0, ike_msg,
+ self.sa.sport, self.sa.dport,
+ self.sa.natt, self.ip6)
+ self.pg0.add_stream(packet)
+ self.pg0.enable_capture()
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ self.verify_del_sa(capture[0])
+
+ def send_sa_init_req(self, behind_nat=False):
+ tr_attr = self.sa.ike_crypto_attr()
+ trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
+ transform_id=self.sa.ike_crypto, length=tr_attr[1],
+ key_length=tr_attr[0]) /
+ ikev2.IKEv2_payload_Transform(transform_type='Integrity',
+ transform_id=self.sa.ike_integ) /
+ ikev2.IKEv2_payload_Transform(transform_type='PRF',
+ transform_id=self.sa.ike_prf_alg.name) /
+ ikev2.IKEv2_payload_Transform(transform_type='GroupDesc',
+ transform_id=self.sa.ike_dh))
+
+ props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
+ trans_nb=4, trans=trans))
+
+ if behind_nat:
+ next_payload = 'Notify'
+ else:
+ next_payload = None
+
+ self.sa.init_req_packet = (
+ ikev2.IKEv2(init_SPI=self.sa.ispi,
+ flags='Initiator', exch_type='IKE_SA_INIT') /
+ ikev2.IKEv2_payload_SA(next_payload='KE', prop=props) /
+ ikev2.IKEv2_payload_KE(next_payload='Nonce',
+ group=self.sa.ike_dh,
+ load=self.sa.my_dh_pub_key) /
+ ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
+ load=self.sa.i_nonce))
+
+ if behind_nat:
+ src_address = b'\x0a\x0a\x0a\x01'
+ else:
+ src_address = bytes(self.pg0.local_ip4, 'ascii')
+
+ src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
+ dst_nat = self.sa.compute_nat_sha1(bytes(self.pg0.remote_ip4, 'ascii'),
+ self.sa.sport)
+ nat_src_detection = ikev2.IKEv2_payload_Notify(
+ type='NAT_DETECTION_SOURCE_IP', load=src_nat)
+ nat_dst_detection = ikev2.IKEv2_payload_Notify(
+ type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
+ self.sa.init_req_packet = (self.sa.init_req_packet /
+ nat_src_detection /
+ nat_dst_detection)
+
+ ike_msg = self.create_packet(self.pg0, self.sa.init_req_packet,
+ self.sa.sport, self.sa.dport,
+ self.sa.natt, self.ip6)
+ self.pg0.add_stream(ike_msg)
+ self.pg0.enable_capture()
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ self.verify_sa_init(capture[0])
+
+ def send_sa_auth(self):
+ tr_attr = self.sa.esp_crypto_attr()
+ trans = (ikev2.IKEv2_payload_Transform(transform_type='Encryption',
+ transform_id=self.sa.esp_crypto, length=tr_attr[1],
+ key_length=tr_attr[0]) /
+ ikev2.IKEv2_payload_Transform(transform_type='Integrity',
+ transform_id=self.sa.esp_integ) /
+ ikev2.IKEv2_payload_Transform(
+ transform_type='Extended Sequence Number',
+ transform_id='No ESN') /
+ ikev2.IKEv2_payload_Transform(
+ transform_type='Extended Sequence Number',
+ transform_id='ESN'))
+
+ props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='ESP',
+ SPIsize=4, SPI=os.urandom(4), trans_nb=4, trans=trans))
+
+ tsi, tsr = self.sa.generate_ts(self.p.ts_is_ip4)
+ plain = (ikev2.IKEv2_payload_IDi(next_payload='IDr',
+ IDtype=self.sa.id_type, load=self.sa.i_id) /
+ ikev2.IKEv2_payload_IDr(next_payload='AUTH',
+ IDtype=self.sa.id_type, load=self.sa.r_id) /
+ ikev2.IKEv2_payload_AUTH(next_payload='SA',
+ auth_type=AuthMethod.value(self.sa.auth_method),
+ load=self.sa.auth_data) /
+ ikev2.IKEv2_payload_SA(next_payload='TSi', prop=props) /
+ ikev2.IKEv2_payload_TSi(next_payload='TSr',
+ number_of_TSs=len(tsi),
+ traffic_selector=tsi) /
+ ikev2.IKEv2_payload_TSr(next_payload='Notify',
+ number_of_TSs=len(tsr),
+ traffic_selector=tsr) /
+ ikev2.IKEv2_payload_Notify(type='INITIAL_CONTACT'))
+
+ header = ikev2.IKEv2(
+ init_SPI=self.sa.ispi,
+ resp_SPI=self.sa.rspi, id=self.sa.new_msg_id(),
+ flags='Initiator', exch_type='IKE_AUTH')
+
+ ike_msg = self.encrypt_ike_msg(header, plain, 'IDi')
+ packet = self.create_packet(self.pg0, ike_msg, self.sa.sport,
+ self.sa.dport, self.sa.natt, self.ip6)
+ self.pg0.add_stream(packet)
+ self.pg0.enable_capture()
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ self.verify_sa_auth_resp(capture[0])
+
+ def verify_sa_init(self, packet):
+ ih = self.get_ike_header(packet)
+
+ self.assertEqual(ih.id, self.sa.msg_id)
+ self.assertEqual(ih.exch_type, 34)
+ self.assertTrue('Response' in ih.flags)
+ self.assertEqual(ih.init_SPI, self.sa.ispi)
+ self.assertNotEqual(ih.resp_SPI, 0)
+ self.sa.rspi = ih.resp_SPI
+ try:
+ sa = ih[ikev2.IKEv2_payload_SA]
+ self.sa.r_nonce = ih[ikev2.IKEv2_payload_Nonce].load
+ self.sa.r_dh_data = ih[ikev2.IKEv2_payload_KE].load
+ except IndexError as e:
+ self.logger.error("unexpected reply: SA/Nonce/KE payload found!")
+ self.logger.error(ih.show())
+ raise
+ self.sa.complete_dh_data()
+ self.sa.calc_keys()
+ self.sa.auth_init()
+
+ def verify_sa_auth_resp(self, packet):
+ ike = self.get_ike_header(packet)
+ udp = packet[UDP]
+ self.verify_udp(udp)
+ self.assertEqual(ike.id, self.sa.msg_id)
+ plain = self.sa.hmac_and_decrypt(ike)
+ self.sa.calc_child_keys()
+
def test_responder(self):
- self.send_sa_init(self.sa.natt)
+ self.send_sa_init_req(self.sa.natt)
self.send_sa_auth()
self.verify_ipsec_sas()
self.verify_ike_sas()
@@ -942,17 +1124,33 @@ class Ikev2Params(object):
auth_method = 'shared-key'
client_priv = None
- self.p.add_local_id(id_type='fqdn', data=b'vpp.home')
- self.p.add_remote_id(id_type='fqdn', data=b'roadwarrior.example.com')
+ is_init = True if 'is_initiator' not in params else\
+ params['is_initiator']
+
+ idr = {'id_type': 'fqdn', 'data': b'vpp.home'}
+ idi = {'id_type': 'fqdn', 'data': b'roadwarrior.example.com'}
+ if is_init:
+ self.p.add_local_id(**idr)
+ self.p.add_remote_id(**idi)
+ else:
+ self.p.add_local_id(**idi)
+ self.p.add_remote_id(**idr)
+
loc_ts = {'start_addr': '10.10.10.0', 'end_addr': '10.10.10.255'} if\
'loc_ts' not in params else params['loc_ts']
rem_ts = {'start_addr': '10.0.0.0', 'end_addr': '10.0.0.255'} if\
'rem_ts' not in params else params['rem_ts']
self.p.add_local_ts(**loc_ts)
self.p.add_remote_ts(**rem_ts)
-
- self.sa = IKEv2SA(self, i_id=self.p.remote_id['data'],
- r_id=self.p.local_id['data'],
+ if 'responder' in params:
+ self.p.add_responder(params['responder'])
+ if 'ike_transforms' in params:
+ self.p.add_ike_transforms(params['ike_transforms'])
+ if 'esp_transforms' in params:
+ self.p.add_esp_transforms(params['esp_transforms'])
+
+ self.sa = IKEv2SA(self, i_id=idi['data'], r_id=idr['data'],
+ is_initiator=is_init,
id_type=self.p.local_id['id_type'], natt=is_natt,
priv_key=client_priv, auth_method=auth_method,
auth_data=auth_data,
@@ -962,7 +1160,8 @@ class Ikev2Params(object):
params['ike-crypto']
ike_integ = 'HMAC-SHA1-96' if 'ike-integ' not in params else\
params['ike-integ']
- ike_dh = '2048MODPgr' if 'ike-dh' not in params else params['ike-dh']
+ ike_dh = '2048MODPgr' if 'ike-dh' not in params else\
+ params['ike-dh']
esp_crypto = ('AES-CBC', 32) if 'esp-crypto' not in params else\
params['esp-crypto']
@@ -1160,6 +1359,29 @@ class TestApi(VppTestCase):
self.assertEqual(ap.tun_itf, 0xffffffff)
+class TestInitiatorPsk(TemplateInitiator, Ikev2Params):
+ """ test ikev2 initiator - pre shared key auth """
+ def config_tc(self):
+ self.config_params({
+ 'is_initiator': False, # seen from test case perspective
+ # thus vpp is initiator
+ 'responder': {'sw_if_index': self.pg0.sw_if_index,
+ 'addr': self.pg0.remote_ip4},
+ 'ike-crypto': ('AES-GCM-16ICV', 32),
+ 'ike-integ': 'NULL',
+ 'ike-dh': '3072MODPgr',
+ 'ike_transforms': {
+ 'crypto_alg': 20, # "aes-gcm-16"
+ 'crypto_key_size': 256,
+ 'dh_group': 15, # "modp-3072"
+ },
+ 'esp_transforms': {
+ 'crypto_alg': 12, # "aes-cbc"
+ 'crypto_key_size': 256,
+ # "hmac-sha2-256-128"
+ 'integ_alg': 12}})
+
+
class TestResponderNATT(TemplateResponder, Ikev2Params):
""" test ikev2 responder - nat traversal """
def config_tc(self):