From 7dd3b5b5e37a4019ae335296ba9c4bd1e465fd17 Mon Sep 17 00:00:00 2001 From: Artem Glazychev Date: Thu, 3 Jun 2021 20:11:54 +0700 Subject: wireguard: add ipv6 support Type: improvement Signed-off-by: Artem Glazychev Change-Id: If1a7e82ce163c4c4acaa5acf45ad2b88371396f6 --- test/test_wireguard.py | 499 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 448 insertions(+), 51 deletions(-) (limited to 'test/test_wireguard.py') diff --git a/test/test_wireguard.py b/test/test_wireguard.py index 96c1bc0f5fd..65ebd8d2c08 100755 --- a/test/test_wireguard.py +++ b/test/test_wireguard.py @@ -9,6 +9,7 @@ from scapy.packet import Packet from scapy.packet import Raw from scapy.layers.l2 import Ether, ARP from scapy.layers.inet import IP, UDP +from scapy.layers.inet6 import IPv6 from scapy.contrib.wireguard import Wireguard, WireguardResponse, \ WireguardInitiation, WireguardTransport from cryptography.hazmat.primitives.asymmetric.x25519 import \ @@ -98,8 +99,8 @@ class VppWgInterface(VppInterface): return "wireguard-%d" % self._sw_if_index -def find_route(test, prefix, table_id=0): - routes = test.vapi.ip_route_dump(table_id, False) +def find_route(test, prefix, is_ip6, table_id=0): + routes = test.vapi.ip_route_dump(table_id, is_ip6) for e in routes: if table_id == e.route.table_id \ @@ -134,7 +135,7 @@ class VppWgPeer(VppObject): self.noise = NoiseConnection.from_name(NOISE_HANDSHAKE_NAME) - def add_vpp_config(self): + def add_vpp_config(self, is_ip6=False): rv = self._test.vapi.wireguard_peer_add( peer={ 'public_key': self.public_key_bytes(), @@ -179,10 +180,15 @@ class VppWgPeer(VppObject): def set_responder(self): self.noise.set_as_responder() - def mk_tunnel_header(self, tx_itf): - return (Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac) / - IP(src=self.endpoint, dst=self.itf.src) / - UDP(sport=self.port, dport=self.itf.port)) + def mk_tunnel_header(self, tx_itf, is_ip6=False): + if is_ip6 is False: + return (Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac) / + IP(src=self.endpoint, dst=self.itf.src) / + UDP(sport=self.port, dport=self.itf.port)) + else: + return (Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac) / + IPv6(src=self.endpoint, dst=self.itf.src) / + UDP(sport=self.port, dport=self.itf.port)) def noise_init(self, public_key=None): self.noise.set_prologue(NOISE_IDENTIFIER_NAME) @@ -202,7 +208,7 @@ class VppWgPeer(VppObject): self.noise.start_handshake() - def mk_handshake(self, tx_itf, public_key=None): + def mk_handshake(self, tx_itf, is_ip6=False, public_key=None): self.noise.set_as_initiator() self.noise_init(public_key) @@ -232,21 +238,25 @@ class VppWgPeer(VppObject): key=mac_key).digest() p[WireguardInitiation].mac2 = bytearray(16) - p = (self.mk_tunnel_header(tx_itf) / p) + p = (self.mk_tunnel_header(tx_itf, is_ip6) / p) return p - def verify_header(self, p): - self._test.assertEqual(p[IP].src, self.itf.src) - self._test.assertEqual(p[IP].dst, self.endpoint) + def verify_header(self, p, is_ip6=False): + if is_ip6 is False: + self._test.assertEqual(p[IP].src, self.itf.src) + self._test.assertEqual(p[IP].dst, self.endpoint) + else: + self._test.assertEqual(p[IPv6].src, self.itf.src) + self._test.assertEqual(p[IPv6].dst, self.endpoint) self._test.assertEqual(p[UDP].sport, self.itf.port) self._test.assertEqual(p[UDP].dport, self.port) self._test.assert_packet_checksums_valid(p) - def consume_init(self, p, tx_itf): + def consume_init(self, p, tx_itf, is_ip6=False): self.noise.set_as_responder() self.noise_init(self.itf.public_key) - self.verify_header(p) + self.verify_header(p, is_ip6) init = Wireguard(p[Raw]) @@ -281,13 +291,13 @@ class VppWgPeer(VppObject): key=mac_key).digest() resp[WireguardResponse].mac1 = mac1 - resp = (self.mk_tunnel_header(tx_itf) / resp) + resp = (self.mk_tunnel_header(tx_itf, is_ip6) / resp) self._test.assertTrue(self.noise.handshake_finished) return resp - def consume_response(self, p): - self.verify_header(p) + def consume_response(self, p, is_ip6=False): + self.verify_header(p, is_ip6) resp = Wireguard(p[Raw]) @@ -302,8 +312,8 @@ class VppWgPeer(VppObject): self._test.assertEqual(payload, b'') self._test.assertTrue(self.noise.handshake_finished) - def decrypt_transport(self, p): - self.verify_header(p) + def decrypt_transport(self, p, is_ip6=False): + self.verify_header(p, is_ip6) p = Wireguard(p[Raw]) self._test.assertEqual(p[Wireguard].message_type, 4) @@ -318,13 +328,20 @@ class VppWgPeer(VppObject): def encrypt_transport(self, p): return self.noise.encrypt(bytes(p)) - def validate_encapped(self, rxs, tx): + def validate_encapped(self, rxs, tx, is_ip6=False): for rx in rxs: - rx = IP(self.decrypt_transport(rx)) + if is_ip6 is False: + rx = IP(self.decrypt_transport(rx)) - # chech the oringial packet is present - self._test.assertEqual(rx[IP].dst, tx[IP].dst) - self._test.assertEqual(rx[IP].ttl, tx[IP].ttl-1) + # chech the oringial packet is present + self._test.assertEqual(rx[IP].dst, tx[IP].dst) + self._test.assertEqual(rx[IP].ttl, tx[IP].ttl-1) + else: + rx = IPv6(self.decrypt_transport(rx)) + + # chech the oringial packet is present + self._test.assertEqual(rx[IPv6].dst, tx[IPv6].dst) + self._test.assertEqual(rx[IPv6].ttl, tx[IPv6].ttl-1) class TestWg(VppTestCase): @@ -332,6 +349,17 @@ class TestWg(VppTestCase): error_str = compile(r"Error") + wg4_output_node_name = '/err/wg4-output-tun/' + wg4_input_node_name = '/err/wg4-input/' + wg6_output_node_name = '/err/wg6-output-tun/' + wg6_input_node_name = '/err/wg6-input/' + kp4_error = wg4_output_node_name + "Keypair error" + mac4_error = wg4_input_node_name + "Invalid MAC handshake" + peer4_error = wg4_input_node_name + "Peer error" + kp6_error = wg6_output_node_name + "Keypair error" + mac6_error = wg6_input_node_name + "Invalid MAC handshake" + peer6_error = wg6_input_node_name + "Peer error" + @classmethod def setUpClass(cls): super(TestWg, cls).setUpClass() @@ -340,7 +368,9 @@ class TestWg(VppTestCase): for i in cls.pg_interfaces: i.admin_up() i.config_ip4() + i.config_ip6() i.resolve_arp() + i.resolve_ndp() except Exception: super(TestWg, cls).tearDownClass() @@ -350,6 +380,15 @@ class TestWg(VppTestCase): def tearDownClass(cls): super(TestWg, cls).tearDownClass() + def setUp(self): + super(VppTestCase, self).setUp() + self.base_kp4_err = self.statistics.get_err_counter(self.kp4_error) + self.base_mac4_err = self.statistics.get_err_counter(self.mac4_error) + self.base_peer4_err = self.statistics.get_err_counter(self.peer4_error) + self.base_kp6_err = self.statistics.get_err_counter(self.kp6_error) + self.base_mac6_err = self.statistics.get_err_counter(self.mac6_error) + self.base_peer6_err = self.statistics.get_err_counter(self.peer6_error) + def test_wg_interface(self): """ Simple interface creation """ port = 12312 @@ -409,9 +448,6 @@ class TestWg(VppTestCase): def test_wg_peer_resp(self): """ Send handshake response """ - wg_output_node_name = '/err/wg-output-tun/' - wg_input_node_name = '/err/wg-input/' - port = 12323 # Create interfaces @@ -481,10 +517,8 @@ class TestWg(VppTestCase): peer_1.remove_vpp_config() wg0.remove_vpp_config() - def test_wg_peer_init(self): - """ Send handshake init """ - wg_output_node_name = '/err/wg-output-tun/' - wg_input_node_name = '/err/wg-input/' + def test_wg_peer_v4o4(self): + """ Test v4o4""" port = 12333 @@ -514,23 +548,23 @@ class TestWg(VppTestCase): UDP(sport=555, dport=556) / Raw()) self.send_and_assert_no_replies(self.pg0, [p]) - - kp_error = wg_output_node_name + "Keypair error" - self.assertEqual(1, self.statistics.get_err_counter(kp_error)) + self.assertEqual(self.base_kp4_err + 1, + self.statistics.get_err_counter(self.kp4_error)) # send a handsake from the peer with an invalid MAC p = peer_1.mk_handshake(self.pg1) p[WireguardInitiation].mac1 = b'foobar' self.send_and_assert_no_replies(self.pg1, [p]) - self.assertEqual(1, self.statistics.get_err_counter( - wg_input_node_name + "Invalid MAC handshake")) + self.assertEqual(self.base_mac4_err + 1, + self.statistics.get_err_counter(self.mac4_error)) # send a handsake from the peer but signed by the wrong key. p = peer_1.mk_handshake(self.pg1, + False, X25519PrivateKey.generate().public_key()) self.send_and_assert_no_replies(self.pg1, [p]) - self.assertEqual(1, self.statistics.get_err_counter( - wg_input_node_name + "Peer error")) + self.assertEqual(self.base_peer4_err + 1, + self.statistics.get_err_counter(self.peer4_error)) # send a valid handsake init for which we expect a response p = peer_1.mk_handshake(self.pg1) @@ -546,7 +580,8 @@ class TestWg(VppTestCase): UDP(sport=555, dport=556) / Raw()) self.send_and_assert_no_replies(self.pg0, [p]) - self.assertEqual(2, self.statistics.get_err_counter(kp_error)) + self.assertEqual(self.base_kp4_err + 2, + self.statistics.get_err_counter(self.kp4_error)) # send a data packet from the peer through the tunnel # this completes the handshake @@ -602,9 +637,373 @@ class TestWg(VppTestCase): peer_1.remove_vpp_config() wg0.remove_vpp_config() + def test_wg_peer_v6o6(self): + """ Test v6o6""" + + port = 12343 + + # Create interfaces + wg0 = VppWgInterface(self, + self.pg1.local_ip6, + port).add_vpp_config() + wg0.admin_up() + wg0.config_ip6() + + peer_1 = VppWgPeer(self, + wg0, + self.pg1.remote_ip6, + port+1, + ["1::3:0/112"]).add_vpp_config(True) + self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1) + + r1 = VppIpRoute(self, "1::3:0", 112, + [VppRoutePath("1::3:1", + wg0.sw_if_index)]).add_vpp_config() + + # route a packet into the wg interface + # use the allowed-ip prefix + # this is dropped because the peer is not initiated + + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IPv6(src=self.pg0.remote_ip6, dst="1::3:2") / + UDP(sport=555, dport=556) / + Raw()) + self.send_and_assert_no_replies(self.pg0, [p]) + + self.assertEqual(self.base_kp6_err + 1, + self.statistics.get_err_counter(self.kp6_error)) + + # send a handsake from the peer with an invalid MAC + p = peer_1.mk_handshake(self.pg1, True) + p[WireguardInitiation].mac1 = b'foobar' + self.send_and_assert_no_replies(self.pg1, [p]) + + self.assertEqual(self.base_mac6_err + 1, + self.statistics.get_err_counter(self.mac6_error)) + + # send a handsake from the peer but signed by the wrong key. + p = peer_1.mk_handshake(self.pg1, + True, + X25519PrivateKey.generate().public_key()) + self.send_and_assert_no_replies(self.pg1, [p]) + self.assertEqual(self.base_peer6_err + 1, + self.statistics.get_err_counter(self.peer6_error)) + + # send a valid handsake init for which we expect a response + p = peer_1.mk_handshake(self.pg1, True) + + rx = self.send_and_expect(self.pg1, [p], self.pg1) + + peer_1.consume_response(rx[0], True) + + # route a packet into the wg interface + # this is dropped because the peer is still not initiated + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IPv6(src=self.pg0.remote_ip6, dst="1::3:2") / + UDP(sport=555, dport=556) / + Raw()) + self.send_and_assert_no_replies(self.pg0, [p]) + self.assertEqual(self.base_kp6_err + 2, + self.statistics.get_err_counter(self.kp6_error)) + + # send a data packet from the peer through the tunnel + # this completes the handshake + p = (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) / + UDP(sport=222, dport=223) / + Raw()) + d = peer_1.encrypt_transport(p) + p = (peer_1.mk_tunnel_header(self.pg1, True) / + (Wireguard(message_type=4, reserved_zero=0) / + WireguardTransport(receiver_index=peer_1.sender, + counter=0, + encrypted_encapsulated_packet=d))) + rxs = self.send_and_expect(self.pg1, [p], self.pg0) + + for rx in rxs: + self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6) + self.assertEqual(rx[IPv6].hlim, 19) + + # send a packets that are routed into the tunnel + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IPv6(src=self.pg0.remote_ip6, dst="1::3:2") / + UDP(sport=555, dport=556) / + Raw(b'\x00' * 80)) + + rxs = self.send_and_expect(self.pg0, p * 255, self.pg1) + + for rx in rxs: + rx = IPv6(peer_1.decrypt_transport(rx, True)) + + # chech the oringial packet is present + self.assertEqual(rx[IPv6].dst, p[IPv6].dst) + self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim-1) + + # send packets into the tunnel, expect to receive them on + # the other side + p = [(peer_1.mk_tunnel_header(self.pg1, True) / + Wireguard(message_type=4, reserved_zero=0) / + WireguardTransport( + receiver_index=peer_1.sender, + counter=ii+1, + encrypted_encapsulated_packet=peer_1.encrypt_transport( + (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) / + UDP(sport=222, dport=223) / + Raw())))) for ii in range(255)] + + rxs = self.send_and_expect(self.pg1, p, self.pg0) + + for rx in rxs: + self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6) + self.assertEqual(rx[IPv6].hlim, 19) + + r1.remove_vpp_config() + peer_1.remove_vpp_config() + wg0.remove_vpp_config() + + def test_wg_peer_v6o4(self): + """ Test v6o4""" + + port = 12353 + + # Create interfaces + wg0 = VppWgInterface(self, + self.pg1.local_ip4, + port).add_vpp_config() + wg0.admin_up() + wg0.config_ip6() + + peer_1 = VppWgPeer(self, + wg0, + self.pg1.remote_ip4, + port+1, + ["1::3:0/112"]).add_vpp_config(True) + self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1) + + r1 = VppIpRoute(self, "1::3:0", 112, + [VppRoutePath("1::3:1", + wg0.sw_if_index)]).add_vpp_config() + + # route a packet into the wg interface + # use the allowed-ip prefix + # this is dropped because the peer is not initiated + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IPv6(src=self.pg0.remote_ip6, dst="1::3:2") / + UDP(sport=555, dport=556) / + Raw()) + self.send_and_assert_no_replies(self.pg0, [p]) + self.assertEqual(self.base_kp6_err + 1, + self.statistics.get_err_counter(self.kp6_error)) + + # send a handsake from the peer with an invalid MAC + p = peer_1.mk_handshake(self.pg1) + p[WireguardInitiation].mac1 = b'foobar' + self.send_and_assert_no_replies(self.pg1, [p]) + + self.assertEqual(self.base_mac4_err + 1, + self.statistics.get_err_counter(self.mac4_error)) + + # send a handsake from the peer but signed by the wrong key. + p = peer_1.mk_handshake(self.pg1, + False, + X25519PrivateKey.generate().public_key()) + self.send_and_assert_no_replies(self.pg1, [p]) + self.assertEqual(self.base_peer4_err + 1, + self.statistics.get_err_counter(self.peer4_error)) + + # send a valid handsake init for which we expect a response + p = peer_1.mk_handshake(self.pg1) + + rx = self.send_and_expect(self.pg1, [p], self.pg1) + + peer_1.consume_response(rx[0]) + + # route a packet into the wg interface + # this is dropped because the peer is still not initiated + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IPv6(src=self.pg0.remote_ip6, dst="1::3:2") / + UDP(sport=555, dport=556) / + Raw()) + self.send_and_assert_no_replies(self.pg0, [p]) + self.assertEqual(self.base_kp6_err + 2, + self.statistics.get_err_counter(self.kp6_error)) + + # send a data packet from the peer through the tunnel + # this completes the handshake + p = (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) / + UDP(sport=222, dport=223) / + Raw()) + d = peer_1.encrypt_transport(p) + p = (peer_1.mk_tunnel_header(self.pg1) / + (Wireguard(message_type=4, reserved_zero=0) / + WireguardTransport(receiver_index=peer_1.sender, + counter=0, + encrypted_encapsulated_packet=d))) + rxs = self.send_and_expect(self.pg1, [p], self.pg0) + + for rx in rxs: + self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6) + self.assertEqual(rx[IPv6].hlim, 19) + + # send a packets that are routed into the tunnel + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IPv6(src=self.pg0.remote_ip6, dst="1::3:2") / + UDP(sport=555, dport=556) / + Raw(b'\x00' * 80)) + + rxs = self.send_and_expect(self.pg0, p * 255, self.pg1) + + for rx in rxs: + rx = IPv6(peer_1.decrypt_transport(rx)) + + # chech the oringial packet is present + self.assertEqual(rx[IPv6].dst, p[IPv6].dst) + self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim-1) + + # send packets into the tunnel, expect to receive them on + # the other side + p = [(peer_1.mk_tunnel_header(self.pg1) / + Wireguard(message_type=4, reserved_zero=0) / + WireguardTransport( + receiver_index=peer_1.sender, + counter=ii+1, + encrypted_encapsulated_packet=peer_1.encrypt_transport( + (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) / + UDP(sport=222, dport=223) / + Raw())))) for ii in range(255)] + + rxs = self.send_and_expect(self.pg1, p, self.pg0) + + for rx in rxs: + self.assertEqual(rx[IPv6].dst, self.pg0.remote_ip6) + self.assertEqual(rx[IPv6].hlim, 19) + + r1.remove_vpp_config() + peer_1.remove_vpp_config() + wg0.remove_vpp_config() + + def test_wg_peer_v4o6(self): + """ Test v4o6""" + + port = 12363 + + # Create interfaces + wg0 = VppWgInterface(self, + self.pg1.local_ip6, + port).add_vpp_config() + wg0.admin_up() + wg0.config_ip4() + + peer_1 = VppWgPeer(self, + wg0, + self.pg1.remote_ip6, + port+1, + ["10.11.3.0/24"]).add_vpp_config() + self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1) + + r1 = VppIpRoute(self, "10.11.3.0", 24, + [VppRoutePath("10.11.3.1", + wg0.sw_if_index)]).add_vpp_config() + + # route a packet into the wg interface + # use the allowed-ip prefix + # this is dropped because the peer is not initiated + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IP(src=self.pg0.remote_ip4, dst="10.11.3.2") / + UDP(sport=555, dport=556) / + Raw()) + self.send_and_assert_no_replies(self.pg0, [p]) + self.assertEqual(self.base_kp4_err + 1, + self.statistics.get_err_counter(self.kp4_error)) + + # send a handsake from the peer with an invalid MAC + p = peer_1.mk_handshake(self.pg1, True) + p[WireguardInitiation].mac1 = b'foobar' + self.send_and_assert_no_replies(self.pg1, [p]) + self.assertEqual(self.base_mac6_err + 1, + self.statistics.get_err_counter(self.mac6_error)) + + # send a handsake from the peer but signed by the wrong key. + p = peer_1.mk_handshake(self.pg1, + True, + X25519PrivateKey.generate().public_key()) + self.send_and_assert_no_replies(self.pg1, [p]) + self.assertEqual(self.base_peer6_err + 1, + self.statistics.get_err_counter(self.peer6_error)) + + # send a valid handsake init for which we expect a response + p = peer_1.mk_handshake(self.pg1, True) + + rx = self.send_and_expect(self.pg1, [p], self.pg1) + + peer_1.consume_response(rx[0], True) + + # route a packet into the wg interface + # this is dropped because the peer is still not initiated + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IP(src=self.pg0.remote_ip4, dst="10.11.3.2") / + UDP(sport=555, dport=556) / + Raw()) + self.send_and_assert_no_replies(self.pg0, [p]) + self.assertEqual(self.base_kp4_err + 2, + self.statistics.get_err_counter(self.kp4_error)) + + # send a data packet from the peer through the tunnel + # this completes the handshake + p = (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) / + UDP(sport=222, dport=223) / + Raw()) + d = peer_1.encrypt_transport(p) + p = (peer_1.mk_tunnel_header(self.pg1, True) / + (Wireguard(message_type=4, reserved_zero=0) / + WireguardTransport(receiver_index=peer_1.sender, + counter=0, + encrypted_encapsulated_packet=d))) + rxs = self.send_and_expect(self.pg1, [p], self.pg0) + + for rx in rxs: + self.assertEqual(rx[IP].dst, self.pg0.remote_ip4) + self.assertEqual(rx[IP].ttl, 19) + + # send a packets that are routed into the tunnel + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IP(src=self.pg0.remote_ip4, dst="10.11.3.2") / + UDP(sport=555, dport=556) / + Raw(b'\x00' * 80)) + + rxs = self.send_and_expect(self.pg0, p * 255, self.pg1) + + for rx in rxs: + rx = IP(peer_1.decrypt_transport(rx, True)) + + # chech the oringial packet is present + self.assertEqual(rx[IP].dst, p[IP].dst) + self.assertEqual(rx[IP].ttl, p[IP].ttl-1) + + # send packets into the tunnel, expect to receive them on + # the other side + p = [(peer_1.mk_tunnel_header(self.pg1, True) / + Wireguard(message_type=4, reserved_zero=0) / + WireguardTransport( + receiver_index=peer_1.sender, + counter=ii+1, + encrypted_encapsulated_packet=peer_1.encrypt_transport( + (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) / + UDP(sport=222, dport=223) / + Raw())))) for ii in range(255)] + + rxs = self.send_and_expect(self.pg1, p, self.pg0) + + for rx in rxs: + self.assertEqual(rx[IP].dst, self.pg0.remote_ip4) + self.assertEqual(rx[IP].ttl, 19) + + r1.remove_vpp_config() + peer_1.remove_vpp_config() + wg0.remove_vpp_config() + def test_wg_multi_peer(self): """ multiple peer setup """ - port = 12343 + port = 12373 # Create interfaces wg0 = VppWgInterface(self, @@ -784,10 +1183,8 @@ class WireguardHandoffTests(TestWg): def test_wg_peer_init(self): """ Handoff """ - wg_output_node_name = '/err/wg-output-tun/' - wg_input_node_name = '/err/wg-input/' - port = 12353 + port = 12383 # Create interfaces wg0 = VppWgInterface(self, @@ -844,14 +1241,14 @@ class WireguardHandoffTests(TestWg): # send packets into the tunnel, from the other worker p = [(peer_1.mk_tunnel_header(self.pg1) / - Wireguard(message_type=4, reserved_zero=0) / - WireguardTransport( - receiver_index=peer_1.sender, - counter=ii+1, - encrypted_encapsulated_packet=peer_1.encrypt_transport( - (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) / - UDP(sport=222, dport=223) / - Raw())))) for ii in range(255)] + Wireguard(message_type=4, reserved_zero=0) / + WireguardTransport( + receiver_index=peer_1.sender, + counter=ii+1, + encrypted_encapsulated_packet=peer_1.encrypt_transport( + (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) / + UDP(sport=222, dport=223) / + Raw())))) for ii in range(255)] rxs = self.send_and_expect(self.pg1, p, self.pg0, worker=1) -- cgit 1.2.3-korg