aboutsummaryrefslogtreecommitdiffstats
path: root/src/plugins/ikev2/test/test_ikev2.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugins/ikev2/test/test_ikev2.py')
-rw-r--r--src/plugins/ikev2/test/test_ikev2.py55
1 files changed, 43 insertions, 12 deletions
diff --git a/src/plugins/ikev2/test/test_ikev2.py b/src/plugins/ikev2/test/test_ikev2.py
index 4f64b56d853..f75a517f824 100644
--- a/src/plugins/ikev2/test/test_ikev2.py
+++ b/src/plugins/ikev2/test/test_ikev2.py
@@ -1,4 +1,5 @@
import os
+from socket import inet_pton
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, hmac
@@ -510,8 +511,10 @@ class IKEv2SA(object):
def esp_crypto_attr(self):
return self.crypto_attr(self.esp_crypto_key_len)
- def compute_nat_sha1(self, ip, port):
- data = self.ispi + self.rspi + ip + (port).to_bytes(2, 'big')
+ def compute_nat_sha1(self, ip, port, rspi=None):
+ if rspi is None:
+ rspi = self.rspi
+ data = self.ispi + rspi + ip + (port).to_bytes(2, 'big')
digest = hashes.Hash(hashes.SHA1(), backend=default_backend())
digest.update(data)
return digest.finalize()
@@ -775,6 +778,36 @@ class TemplateInitiator(IkePeer):
def tearDown(self):
super(TemplateInitiator, self).tearDown()
+ @staticmethod
+ def find_notify_payload(packet, notify_type):
+ n = packet[ikev2.IKEv2_payload_Notify]
+ while n is not None:
+ if n.type == notify_type:
+ return n
+ n = n.payload
+ return None
+
+ def verify_nat_detection(self, packet):
+ if self.ip6:
+ iph = packet[IPv6]
+ else:
+ iph = packet[IP]
+ udp = packet[UDP]
+
+ # NAT_DETECTION_SOURCE_IP
+ s = self.find_notify_payload(packet, 16388)
+ self.assertIsNotNone(s)
+ src_sha = self.sa.compute_nat_sha1(
+ inet_pton(socket.AF_INET, iph.src), udp.sport, b'\x00' * 8)
+ self.assertEqual(s.load, src_sha)
+
+ # NAT_DETECTION_DESTINATION_IP
+ s = self.find_notify_payload(packet, 16389)
+ self.assertIsNotNone(s)
+ dst_sha = self.sa.compute_nat_sha1(
+ inet_pton(socket.AF_INET, iph.dst), udp.dport, b'\x00' * 8)
+ self.assertEqual(s.load, dst_sha)
+
def verify_sa_init_request(self, packet):
ih = packet[ikev2.IKEv2]
self.assertNotEqual(ih.init_SPI, 8 * b'\x00')
@@ -798,6 +831,7 @@ class TemplateInitiator(IkePeer):
self.assertEqual(prop.trans[2].transform_id,
self.p.ike_transforms['dh_group'])
+ self.verify_nat_detection(packet)
self.sa.complete_dh_data()
self.sa.calc_keys()
@@ -957,11 +991,6 @@ class TemplateResponder(IkePeer):
props = (ikev2.IKEv2_payload_Proposal(proposal=1, proto='IKEv2',
trans_nb=4, trans=trans))
- if behind_nat:
- next_payload = 'Notify'
- else:
- next_payload = None
-
self.sa.init_req_packet = (
ikev2.IKEv2(init_SPI=self.sa.ispi,
flags='Initiator', exch_type='IKE_SA_INIT') /
@@ -969,19 +998,21 @@ class TemplateResponder(IkePeer):
ikev2.IKEv2_payload_KE(next_payload='Nonce',
group=self.sa.ike_dh,
load=self.sa.my_dh_pub_key) /
- ikev2.IKEv2_payload_Nonce(next_payload=next_payload,
+ ikev2.IKEv2_payload_Nonce(next_payload='Notify',
load=self.sa.i_nonce))
if behind_nat:
src_address = b'\x0a\x0a\x0a\x01'
else:
- src_address = bytes(self.pg0.local_ip4, 'ascii')
+ src_address = inet_pton(socket.AF_INET, self.pg0.remote_ip4)
src_nat = self.sa.compute_nat_sha1(src_address, self.sa.sport)
- dst_nat = self.sa.compute_nat_sha1(bytes(self.pg0.remote_ip4, 'ascii'),
- self.sa.sport)
+ dst_nat = self.sa.compute_nat_sha1(
+ inet_pton(socket.AF_INET, self.pg0.local_ip4),
+ self.sa.dport)
nat_src_detection = ikev2.IKEv2_payload_Notify(
- type='NAT_DETECTION_SOURCE_IP', load=src_nat)
+ type='NAT_DETECTION_SOURCE_IP', load=src_nat,
+ next_payload='Notify')
nat_dst_detection = ikev2.IKEv2_payload_Notify(
type='NAT_DETECTION_DESTINATION_IP', load=dst_nat)
self.sa.init_req_packet = (self.sa.init_req_packet /