aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_wireguard.py
diff options
context:
space:
mode:
authorArtem Glazychev <artem.glazychev@xored.com>2021-06-03 20:11:54 +0700
committerEd Warnicke <hagbard@gmail.com>2021-10-06 17:57:46 +0000
commit7dd3b5b5e37a4019ae335296ba9c4bd1e465fd17 (patch)
tree0cd0a76ebce52b2907514e4e4394af32094d2ab7 /test/test_wireguard.py
parent0c4931cb351929a1ccdb6b29431def3705f101d7 (diff)
wireguard: add ipv6 support
Type: improvement Signed-off-by: Artem Glazychev <artem.glazychev@xored.com> Change-Id: If1a7e82ce163c4c4acaa5acf45ad2b88371396f6
Diffstat (limited to 'test/test_wireguard.py')
-rwxr-xr-xtest/test_wireguard.py499
1 files changed, 448 insertions, 51 deletions
diff --git a/test/test_wireguard.py b/test/test_wireguard.py
index 96c1bc0f5fd..65ebd8d2c08 100755
--- a/test/test_wireguard.py
+++ b/test/test_wireguard.py
@@ -9,6 +9,7 @@ from scapy.packet import Packet
from scapy.packet import Raw
from scapy.layers.l2 import Ether, ARP
from scapy.layers.inet import IP, UDP
+from scapy.layers.inet6 import IPv6
from scapy.contrib.wireguard import Wireguard, WireguardResponse, \
WireguardInitiation, WireguardTransport
from cryptography.hazmat.primitives.asymmetric.x25519 import \
@@ -98,8 +99,8 @@ class VppWgInterface(VppInterface):
return "wireguard-%d" % self._sw_if_index
-def find_route(test, prefix, table_id=0):
- routes = test.vapi.ip_route_dump(table_id, False)
+def find_route(test, prefix, is_ip6, table_id=0):
+ routes = test.vapi.ip_route_dump(table_id, is_ip6)
for e in routes:
if table_id == e.route.table_id \
@@ -134,7 +135,7 @@ class VppWgPeer(VppObject):
self.noise = NoiseConnection.from_name(NOISE_HANDSHAKE_NAME)
- def add_vpp_config(self):
+ def add_vpp_config(self, is_ip6=False):
rv = self._test.vapi.wireguard_peer_add(
peer={
'public_key': self.public_key_bytes(),
@@ -179,10 +180,15 @@ class VppWgPeer(VppObject):
def set_responder(self):
self.noise.set_as_responder()
- def mk_tunnel_header(self, tx_itf):
- return (Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac) /
- IP(src=self.endpoint, dst=self.itf.src) /
- UDP(sport=self.port, dport=self.itf.port))
+ def mk_tunnel_header(self, tx_itf, is_ip6=False):
+ if is_ip6 is False:
+ return (Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac) /
+ IP(src=self.endpoint, dst=self.itf.src) /
+ UDP(sport=self.port, dport=self.itf.port))
+ else:
+ return (Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac) /
+ IPv6(src=self.endpoint, dst=self.itf.src) /
+ UDP(sport=self.port, dport=self.itf.port))
def noise_init(self, public_key=None):
self.noise.set_prologue(NOISE_IDENTIFIER_NAME)
@@ -202,7 +208,7 @@ class VppWgPeer(VppObject):
self.noise.start_handshake()
- def mk_handshake(self, tx_itf, public_key=None):
+ def mk_handshake(self, tx_itf, is_ip6=False, public_key=None):
self.noise.set_as_initiator()
self.noise_init(public_key)
@@ -232,21 +238,25 @@ class VppWgPeer(VppObject):
key=mac_key).digest()
p[WireguardInitiation].mac2 = bytearray(16)
- p = (self.mk_tunnel_header(tx_itf) / p)
+ p = (self.mk_tunnel_header(tx_itf, is_ip6) / p)
return p
- def verify_header(self, p):
- self._test.assertEqual(p[IP].src, self.itf.src)
- self._test.assertEqual(p[IP].dst, self.endpoint)
+ def verify_header(self, p, is_ip6=False):
+ if is_ip6 is False:
+ self._test.assertEqual(p[IP].src, self.itf.src)
+ self._test.assertEqual(p[IP].dst, self.endpoint)
+ else:
+ self._test.assertEqual(p[IPv6].src, self.itf.src)
+ self._test.assertEqual(p[IPv6].dst, self.endpoint)
self._test.assertEqual(p[UDP].sport, self.itf.port)
self._test.assertEqual(p[UDP].dport, self.port)
self._test.assert_packet_checksums_valid(p)
- def consume_init(self, p, tx_itf):
+ def consume_init(self, p, tx_itf, is_ip6=False):
self.noise.set_as_responder()
self.noise_init(self.itf.public_key)
- self.verify_header(p)
+ self.verify_header(p, is_ip6)
init = Wireguard(p[Raw])
@@ -281,13 +291,13 @@ class VppWgPeer(VppObject):
key=mac_key).digest()
resp[WireguardResponse].mac1 = mac1
- resp = (self.mk_tunnel_header(tx_itf) / resp)
+ resp = (self.mk_tunnel_header(tx_itf, is_ip6) / resp)
self._test.assertTrue(self.noise.handshake_finished)
return resp
- def consume_response(self, p):
- self.verify_header(p)
+ def consume_response(self, p, is_ip6=False):
+ self.verify_header(p, is_ip6)
resp = Wireguard(p[Raw])
@@ -302,8 +312,8 @@ class VppWgPeer(VppObject):
self._test.assertEqual(payload, b'')
self._test.assertTrue(self.noise.handshake_finished)
- def decrypt_transport(self, p):
- self.verify_header(p)
+ def decrypt_transport(self, p, is_ip6=False):
+ self.verify_header(p, is_ip6)
p = Wireguard(p[Raw])
self._test.assertEqual(p[Wireguard].message_type, 4)
@@ -318,13 +328,20 @@ class VppWgPeer(VppObject):
def encrypt_transport(self, p):
return self.noise.encrypt(bytes(p))
- def validate_encapped(self, rxs, tx):
+ def validate_encapped(self, rxs, tx, is_ip6=False):
for rx in rxs:
- rx = IP(self.decrypt_transport(rx))
+ if is_ip6 is False:
+ rx = IP(self.decrypt_transport(rx))
- # chech the oringial packet is present
- self._test.assertEqual(rx[IP].dst, tx[IP].dst)
- self._test.assertEqual(rx[IP].ttl, tx[IP].ttl-1)
+ # chech the oringial packet is present
+ self._test.assertEqual(rx[IP].dst, tx[IP].dst)
+ self._test.assertEqual(rx[IP].ttl, tx[IP].ttl-1)
+ else:
+ rx = IPv6(self.decrypt_transport(rx))
+
+ # chech the oringial packet is present
+ self._test.assertEqual(rx[IPv6].dst, tx[IPv6].dst)
+ self._test.assertEqual(rx[IPv6].ttl, tx[IPv6].ttl-1)
class TestWg(VppTestCase):
@@ -332,6 +349,17 @@ class TestWg(VppTestCase):
error_str = compile(r"Error")
+ wg4_output_node_name = '/err/wg4-output-tun/'
+ wg4_input_node_name = '/err/wg4-input/'
+ wg6_output_node_name = '/err/wg6-output-tun/'
+ wg6_input_node_name = '/err/wg6-input/'
+ kp4_error = wg4_output_node_name + "Keypair error"
+ mac4_error = wg4_input_node_name + "Invalid MAC handshake"
+ peer4_error = wg4_input_node_name + "Peer error"
+ kp6_error = wg6_output_node_name + "Keypair error"
+ mac6_error = wg6_input_node_name + "Invalid MAC handshake"
+ peer6_error = wg6_input_node_name + "Peer error"
+
@classmethod
def setUpClass(cls):
super(TestWg, cls).setUpClass()
@@ -340,7 +368,9 @@ class TestWg(VppTestCase):
for i in cls.pg_interfaces:
i.admin_up()
i.config_ip4()
+ i.config_ip6()
i.resolve_arp()
+ i.resolve_ndp()
except Exception:
super(TestWg, cls).tearDownClass()
@@ -350,6 +380,15 @@ class TestWg(VppTestCase):
def tearDownClass(cls):
super(TestWg, cls).tearDownClass()
+ def setUp(self):
+ super(VppTestCase, self).setUp()
+ self.base_kp4_err = self.statistics.get_err_counter(self.kp4_error)
+ self.base_mac4_err = self.statistics.get_err_counter(self.mac4_error)
+ self.base_peer4_err = self.statistics.get_err_counter(self.peer4_error)
+ self.base_kp6_err = self.statistics.get_err_counter(self.kp6_error)
+ self.base_mac6_err = self.statistics.get_err_counter(self.mac6_error)
+ self.base_peer6_err = self.statistics.get_err_counter(self.peer6_error)
+
def test_wg_interface(self):
""" Simple interface creation """
port = 12312
@@ -409,9 +448,6 @@ class TestWg(VppTestCase):
def test_wg_peer_resp(self):
""" Send handshake response """
- wg_output_node_name = '/err/wg-output-tun/'
- wg_input_node_name = '/err/wg-input/'
-
port = 12323
# Create interfaces
@@ -481,10 +517,8 @@ class TestWg(VppTestCase):
peer_1.remove_vpp_config()
wg0.remove_vpp_config()
- def test_wg_peer_init(self):
- """ Send handshake init """
- wg_output_node_name = '/err/wg-output-tun/'
- wg_input_node_name = '/err/wg-input/'
+ def test_wg_peer_v4o4(self):
+ """ Test v4o4"""
port = 12333
@@ -514,23 +548,23 @@ class TestWg(VppTestCase):
UDP(sport=555, dport=556) /
Raw())
self.send_and_assert_no_replies(self.pg0, [p])
-
- kp_error = wg_output_node_name + "Keypair error"
- self.assertEqual(1, self.statistics.get_err_counter(kp_error))
+ self.assertEqual(self.base_kp4_err + 1,
+ self.statistics.get_err_counter(self.kp4_error))
# send a handsake from the peer with an invalid MAC
p = peer_1.mk_handshake(self.pg1)
p[WireguardInitiation].mac1 = b'foobar'
self.send_and_assert_no_replies(self.pg1, [p])
- self.assertEqual(1, self.statistics.get_err_counter(
- wg_input_node_name + "Invalid MAC handshake"))
+ self.assertEqual(self.base_mac4_err + 1,
+ self.statistics.get_err_counter(self.mac4_error))
# send a handsake from the peer but signed by the wrong key.
p = peer_1.mk_handshake(self.pg1,
+ False,
X25519PrivateKey.generate().public_key())
self.send_and_assert_no_replies(self.pg1, [p])
- self.assertEqual(1, self.statistics.get_err_counter(
- wg_input_node_name + "Peer error"))
+ self.assertEqual(self.base_peer4_err + 1,
+ self.statistics.get_err_counter(self.peer4_error))
# send a valid handsake init for which we expect a response
p = peer_1.mk_handshake(self.pg1)
@@ -546,7 +580,8 @@ class TestWg(VppTestCase):
UDP(sport=555, dport=556) /
Raw())
self.send_and_assert_no_replies(self.pg0, [p])
- self.assertEqual(2, self.statistics.get_err_counter(kp_error))
+ self.assertEqual(self.base_kp4_err + 2,
+ self.statistics.get_err_counter(self.kp4_error))
# send a data packet from the peer through the tunnel
# this completes the handshake
@@ -602,9 +637,373 @@ class TestWg(VppTestCase):
peer_1.remove_vpp_config()
wg0.remove_vpp_config()
+ def test_wg_peer_v6o6(self):
+ """ Test v6o6"""
+
+ port = 12343
+
+ # Create interfaces
+ wg0 = VppWgInterface(self,
+ self.pg1.local_ip6,
+ port).add_vpp_config()
+ wg0.admin_up()
+ wg0.config_ip6()
+
+ peer_1 = VppWgPeer(self,
+ wg0,
+ self.pg1.remote_ip6,
+ port+1,
+ ["1::3:0/112"]).add_vpp_config(True)
+ self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
+
+ r1 = VppIpRoute(self, "1::3:0", 112,
+ [VppRoutePath("1::3:1",
+ wg0.sw_if_index)]).add_vpp_config()
+
+ # route a packet into the wg interface
+ # use the allowed-ip prefix
+ # this is dropped because the peer is not initiated
+
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
+ UDP(sport=555, dport=556) /
+ Raw())
+ self.send_and_assert_no_replies(self.pg0, [p])
+
+ self.assertEqual(self.base_kp6_err + 1,
+ self.statistics.get_err_counter(self.kp6_error))
+
+ # send a handsake from the peer with an invalid MAC
+ p = peer_1.mk_handshake(self.pg1, True)
+ p[WireguardInitiation].mac1 = b'foobar'
+ self.send_and_assert_no_replies(self.pg1, [p])
+
+ self.assertEqual(self.base_mac6_err + 1,
+ self.statistics.get_err_counter(self.mac6_error))
+
+ # send a handsake from the peer but signed by the wrong key.
+ p = peer_1.mk_handshake(self.pg1,
+ True,
+ X25519PrivateKey.generate().public_key())
+ self.send_and_assert_no_replies(self.pg1, [p])
+ self.assertEqual(self.base_peer6_err + 1,
+ self.statistics.get_err_counter(self.peer6_error))
+
+ # send a valid handsake init for which we expect a response
+ p = peer_1.mk_handshake(self.pg1, True)
+
+ rx = self.send_and_expect(self.pg1, [p], self.pg1)
+
+ peer_1.consume_response(rx[0], True)
+
+ # route a packet into the wg interface
+ # this is dropped because the peer is still not initiated
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
+ UDP(sport=555, dport=556) /
+ Raw())
+ self.send_and_assert_no_replies(self.pg0, [p])
+ self.assertEqual(self.base_kp6_err + 2,
+ self.statistics.get_err_counter(self.kp6_error))
+
+ # send a data packet from the peer through the tunnel
+ # this completes the handshake
+ p = (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) /
+ UDP(sport=222, dport=223) /
+ Raw())
+ d = peer_1.encrypt_transport(p)
+ p = (peer_1.mk_tunnel_header(self.pg1, True) /
+ (Wireguard(message_type=4, reserved_zero=0) /
+ WireguardTransport(receiver_index=peer_1.sender,
+ counter=0,
+ encrypted_encapsulated_packet=d)))
+ rxs = self.send_and_expect(self.pg1, [p], self.pg0)
+
+ for rx in rxs:
+ self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
+ self.assertEqual(rx[IPv6].hlim, 19)
+
+ # send a packets that are routed into the tunnel
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
+ UDP(sport=555, dport=556) /
+ Raw(b'\x00' * 80))
+
+ rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
+
+ for rx in rxs:
+ rx = IPv6(peer_1.decrypt_transport(rx, True))
+
+ # chech the oringial packet is present
+ self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
+ self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim-1)
+
+ # send packets into the tunnel, expect to receive them on
+ # the other side
+ p = [(peer_1.mk_tunnel_header(self.pg1, True) /
+ Wireguard(message_type=4, reserved_zero=0) /
+ WireguardTransport(
+ receiver_index=peer_1.sender,
+ counter=ii+1,
+ encrypted_encapsulated_packet=peer_1.encrypt_transport(
+ (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) /
+ UDP(sport=222, dport=223) /
+ Raw())))) for ii in range(255)]
+
+ rxs = self.send_and_expect(self.pg1, p, self.pg0)
+
+ for rx in rxs:
+ self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
+ self.assertEqual(rx[IPv6].hlim, 19)
+
+ r1.remove_vpp_config()
+ peer_1.remove_vpp_config()
+ wg0.remove_vpp_config()
+
+ def test_wg_peer_v6o4(self):
+ """ Test v6o4"""
+
+ port = 12353
+
+ # Create interfaces
+ wg0 = VppWgInterface(self,
+ self.pg1.local_ip4,
+ port).add_vpp_config()
+ wg0.admin_up()
+ wg0.config_ip6()
+
+ peer_1 = VppWgPeer(self,
+ wg0,
+ self.pg1.remote_ip4,
+ port+1,
+ ["1::3:0/112"]).add_vpp_config(True)
+ self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
+
+ r1 = VppIpRoute(self, "1::3:0", 112,
+ [VppRoutePath("1::3:1",
+ wg0.sw_if_index)]).add_vpp_config()
+
+ # route a packet into the wg interface
+ # use the allowed-ip prefix
+ # this is dropped because the peer is not initiated
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
+ UDP(sport=555, dport=556) /
+ Raw())
+ self.send_and_assert_no_replies(self.pg0, [p])
+ self.assertEqual(self.base_kp6_err + 1,
+ self.statistics.get_err_counter(self.kp6_error))
+
+ # send a handsake from the peer with an invalid MAC
+ p = peer_1.mk_handshake(self.pg1)
+ p[WireguardInitiation].mac1 = b'foobar'
+ self.send_and_assert_no_replies(self.pg1, [p])
+
+ self.assertEqual(self.base_mac4_err + 1,
+ self.statistics.get_err_counter(self.mac4_error))
+
+ # send a handsake from the peer but signed by the wrong key.
+ p = peer_1.mk_handshake(self.pg1,
+ False,
+ X25519PrivateKey.generate().public_key())
+ self.send_and_assert_no_replies(self.pg1, [p])
+ self.assertEqual(self.base_peer4_err + 1,
+ self.statistics.get_err_counter(self.peer4_error))
+
+ # send a valid handsake init for which we expect a response
+ p = peer_1.mk_handshake(self.pg1)
+
+ rx = self.send_and_expect(self.pg1, [p], self.pg1)
+
+ peer_1.consume_response(rx[0])
+
+ # route a packet into the wg interface
+ # this is dropped because the peer is still not initiated
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
+ UDP(sport=555, dport=556) /
+ Raw())
+ self.send_and_assert_no_replies(self.pg0, [p])
+ self.assertEqual(self.base_kp6_err + 2,
+ self.statistics.get_err_counter(self.kp6_error))
+
+ # send a data packet from the peer through the tunnel
+ # this completes the handshake
+ p = (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) /
+ UDP(sport=222, dport=223) /
+ Raw())
+ d = peer_1.encrypt_transport(p)
+ p = (peer_1.mk_tunnel_header(self.pg1) /
+ (Wireguard(message_type=4, reserved_zero=0) /
+ WireguardTransport(receiver_index=peer_1.sender,
+ counter=0,
+ encrypted_encapsulated_packet=d)))
+ rxs = self.send_and_expect(self.pg1, [p], self.pg0)
+
+ for rx in rxs:
+ self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
+ self.assertEqual(rx[IPv6].hlim, 19)
+
+ # send a packets that are routed into the tunnel
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst="1::3:2") /
+ UDP(sport=555, dport=556) /
+ Raw(b'\x00' * 80))
+
+ rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
+
+ for rx in rxs:
+ rx = IPv6(peer_1.decrypt_transport(rx))
+
+ # chech the oringial packet is present
+ self.assertEqual(rx[IPv6].dst, p[IPv6].dst)
+ self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim-1)
+
+ # send packets into the tunnel, expect to receive them on
+ # the other side
+ p = [(peer_1.mk_tunnel_header(self.pg1) /
+ Wireguard(message_type=4, reserved_zero=0) /
+ WireguardTransport(
+ receiver_index=peer_1.sender,
+ counter=ii+1,
+ encrypted_encapsulated_packet=peer_1.encrypt_transport(
+ (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) /
+ UDP(sport=222, dport=223) /
+ Raw())))) for ii in range(255)]
+
+ rxs = self.send_and_expect(self.pg1, p, self.pg0)
+
+ for rx in rxs:
+ self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6)
+ self.assertEqual(rx[IPv6].hlim, 19)
+
+ r1.remove_vpp_config()
+ peer_1.remove_vpp_config()
+ wg0.remove_vpp_config()
+
+ def test_wg_peer_v4o6(self):
+ """ Test v4o6"""
+
+ port = 12363
+
+ # Create interfaces
+ wg0 = VppWgInterface(self,
+ self.pg1.local_ip6,
+ port).add_vpp_config()
+ wg0.admin_up()
+ wg0.config_ip4()
+
+ peer_1 = VppWgPeer(self,
+ wg0,
+ self.pg1.remote_ip6,
+ port+1,
+ ["10.11.3.0/24"]).add_vpp_config()
+ self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
+
+ r1 = VppIpRoute(self, "10.11.3.0", 24,
+ [VppRoutePath("10.11.3.1",
+ wg0.sw_if_index)]).add_vpp_config()
+
+ # route a packet into the wg interface
+ # use the allowed-ip prefix
+ # this is dropped because the peer is not initiated
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst="10.11.3.2") /
+ UDP(sport=555, dport=556) /
+ Raw())
+ self.send_and_assert_no_replies(self.pg0, [p])
+ self.assertEqual(self.base_kp4_err + 1,
+ self.statistics.get_err_counter(self.kp4_error))
+
+ # send a handsake from the peer with an invalid MAC
+ p = peer_1.mk_handshake(self.pg1, True)
+ p[WireguardInitiation].mac1 = b'foobar'
+ self.send_and_assert_no_replies(self.pg1, [p])
+ self.assertEqual(self.base_mac6_err + 1,
+ self.statistics.get_err_counter(self.mac6_error))
+
+ # send a handsake from the peer but signed by the wrong key.
+ p = peer_1.mk_handshake(self.pg1,
+ True,
+ X25519PrivateKey.generate().public_key())
+ self.send_and_assert_no_replies(self.pg1, [p])
+ self.assertEqual(self.base_peer6_err + 1,
+ self.statistics.get_err_counter(self.peer6_error))
+
+ # send a valid handsake init for which we expect a response
+ p = peer_1.mk_handshake(self.pg1, True)
+
+ rx = self.send_and_expect(self.pg1, [p], self.pg1)
+
+ peer_1.consume_response(rx[0], True)
+
+ # route a packet into the wg interface
+ # this is dropped because the peer is still not initiated
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst="10.11.3.2") /
+ UDP(sport=555, dport=556) /
+ Raw())
+ self.send_and_assert_no_replies(self.pg0, [p])
+ self.assertEqual(self.base_kp4_err + 2,
+ self.statistics.get_err_counter(self.kp4_error))
+
+ # send a data packet from the peer through the tunnel
+ # this completes the handshake
+ p = (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) /
+ UDP(sport=222, dport=223) /
+ Raw())
+ d = peer_1.encrypt_transport(p)
+ p = (peer_1.mk_tunnel_header(self.pg1, True) /
+ (Wireguard(message_type=4, reserved_zero=0) /
+ WireguardTransport(receiver_index=peer_1.sender,
+ counter=0,
+ encrypted_encapsulated_packet=d)))
+ rxs = self.send_and_expect(self.pg1, [p], self.pg0)
+
+ for rx in rxs:
+ self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
+ self.assertEqual(rx[IP].ttl, 19)
+
+ # send a packets that are routed into the tunnel
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst="10.11.3.2") /
+ UDP(sport=555, dport=556) /
+ Raw(b'\x00' * 80))
+
+ rxs = self.send_and_expect(self.pg0, p * 255, self.pg1)
+
+ for rx in rxs:
+ rx = IP(peer_1.decrypt_transport(rx, True))
+
+ # chech the oringial packet is present
+ self.assertEqual(rx[IP].dst, p[IP].dst)
+ self.assertEqual(rx[IP].ttl, p[IP].ttl-1)
+
+ # send packets into the tunnel, expect to receive them on
+ # the other side
+ p = [(peer_1.mk_tunnel_header(self.pg1, True) /
+ Wireguard(message_type=4, reserved_zero=0) /
+ WireguardTransport(
+ receiver_index=peer_1.sender,
+ counter=ii+1,
+ encrypted_encapsulated_packet=peer_1.encrypt_transport(
+ (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) /
+ UDP(sport=222, dport=223) /
+ Raw())))) for ii in range(255)]
+
+ rxs = self.send_and_expect(self.pg1, p, self.pg0)
+
+ for rx in rxs:
+ self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
+ self.assertEqual(rx[IP].ttl, 19)
+
+ r1.remove_vpp_config()
+ peer_1.remove_vpp_config()
+ wg0.remove_vpp_config()
+
def test_wg_multi_peer(self):
""" multiple peer setup """
- port = 12343
+ port = 12373
# Create interfaces
wg0 = VppWgInterface(self,
@@ -784,10 +1183,8 @@ class WireguardHandoffTests(TestWg):
def test_wg_peer_init(self):
""" Handoff """
- wg_output_node_name = '/err/wg-output-tun/'
- wg_input_node_name = '/err/wg-input/'
- port = 12353
+ port = 12383
# Create interfaces
wg0 = VppWgInterface(self,
@@ -844,14 +1241,14 @@ class WireguardHandoffTests(TestWg):
# send packets into the tunnel, from the other worker
p = [(peer_1.mk_tunnel_header(self.pg1) /
- Wireguard(message_type=4, reserved_zero=0) /
- WireguardTransport(
- receiver_index=peer_1.sender,
- counter=ii+1,
- encrypted_encapsulated_packet=peer_1.encrypt_transport(
- (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) /
- UDP(sport=222, dport=223) /
- Raw())))) for ii in range(255)]
+ Wireguard(message_type=4, reserved_zero=0) /
+ WireguardTransport(
+ receiver_index=peer_1.sender,
+ counter=ii+1,
+ encrypted_encapsulated_packet=peer_1.encrypt_transport(
+ (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) /
+ UDP(sport=222, dport=223) /
+ Raw())))) for ii in range(255)]
rxs = self.send_and_expect(self.pg1, p, self.pg0, worker=1)