summaryrefslogtreecommitdiffstats
path: root/test/test_wireguard.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_wireguard.py')
-rw-r--r--test/test_wireguard.py70
1 files changed, 48 insertions, 22 deletions
diff --git a/test/test_wireguard.py b/test/test_wireguard.py
index 6da112af0d4..afa6d70a8e7 100644
--- a/test/test_wireguard.py
+++ b/test/test_wireguard.py
@@ -38,6 +38,7 @@ from Crypto.Random import get_random_bytes
from vpp_ipip_tun_interface import VppIpIpTunInterface
from vpp_interface import VppInterface
+from vpp_pg_interface import is_ipv6_misc
from vpp_ip_route import VppIpRoute, VppRoutePath
from vpp_object import VppObject
from vpp_papi import VppEnum
@@ -173,7 +174,7 @@ class VppWgPeer(VppObject):
self.endpoint = endpoint
self.port = port
- def add_vpp_config(self, is_ip6=False):
+ def add_vpp_config(self):
rv = self._test.vapi.wireguard_peer_add(
peer={
"public_key": self.public_key_bytes(),
@@ -495,6 +496,12 @@ class VppWgPeer(VppObject):
self._test.assertEqual(rv.peer_index, self.index)
+def is_handshake_init(p):
+ wg_p = Wireguard(p[Raw])
+
+ return wg_p[Wireguard].message_type == 1
+
+
@tag_fixme_ubuntu2204
@tag_fixme_debian11
class TestWg(VppTestCase):
@@ -566,6 +573,25 @@ class TestWg(VppTestCase):
self.ratelimited6_err
)
+ def send_and_assert_no_replies_ignoring_init(
+ self, intf, pkts, remark="", timeout=None
+ ):
+ self.pg_send(intf, pkts)
+
+ def _filter_out_fn(p):
+ return is_ipv6_misc(p) or is_handshake_init(p)
+
+ try:
+ if not timeout:
+ timeout = 1
+ for i in self.pg_interfaces:
+ i.assert_nothing_captured(
+ timeout=timeout, remark=remark, filter_out_fn=_filter_out_fn
+ )
+ timeout = 0.1
+ finally:
+ pass
+
def test_wg_interface(self):
"""Simple interface creation"""
port = 12312
@@ -1342,7 +1368,7 @@ class TestWg(VppTestCase):
/ UDP(sport=555, dport=556)
/ Raw()
)
- self.send_and_assert_no_replies(self.pg0, [p])
+ self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
self.assertEqual(
self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
)
@@ -1356,7 +1382,7 @@ class TestWg(VppTestCase):
/ UDP(sport=555, dport=556)
/ Raw()
)
- self.send_and_assert_no_replies(self.pg0, [p])
+ self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
self.assertEqual(
self.base_peer4_out_err + 1,
self.statistics.get_err_counter(self.peer4_out_err),
@@ -1365,7 +1391,7 @@ class TestWg(VppTestCase):
# send a handsake from the peer with an invalid MAC
p = peer_1.mk_handshake(self.pg1)
p[WireguardInitiation].mac1 = b"foobar"
- self.send_and_assert_no_replies(self.pg1, [p])
+ self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
self.assertEqual(
self.base_mac4_err + 1, self.statistics.get_err_counter(self.mac4_error)
)
@@ -1374,7 +1400,7 @@ class TestWg(VppTestCase):
p = peer_1.mk_handshake(
self.pg1, False, X25519PrivateKey.generate().public_key()
)
- self.send_and_assert_no_replies(self.pg1, [p])
+ self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
self.assertEqual(
self.base_peer4_in_err + 1,
self.statistics.get_err_counter(self.peer4_in_err),
@@ -1395,7 +1421,7 @@ class TestWg(VppTestCase):
/ UDP(sport=555, dport=556)
/ Raw()
)
- self.send_and_assert_no_replies(self.pg0, [p])
+ self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
self.assertEqual(
self.base_kp4_err + 2, self.statistics.get_err_counter(self.kp4_error)
)
@@ -1481,7 +1507,7 @@ class TestWg(VppTestCase):
peer_1 = VppWgPeer(
self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"]
- ).add_vpp_config(True)
+ ).add_vpp_config()
self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
r1 = VppIpRoute(
@@ -1501,7 +1527,7 @@ class TestWg(VppTestCase):
/ UDP(sport=555, dport=556)
/ Raw()
)
- self.send_and_assert_no_replies(self.pg0, [p])
+ self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
self.assertEqual(
self.base_kp6_err + 1, self.statistics.get_err_counter(self.kp6_error)
@@ -1516,7 +1542,7 @@ class TestWg(VppTestCase):
/ UDP(sport=555, dport=556)
/ Raw()
)
- self.send_and_assert_no_replies(self.pg0, [p])
+ self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
self.assertEqual(
self.base_peer6_out_err + 1,
self.statistics.get_err_counter(self.peer6_out_err),
@@ -1525,7 +1551,7 @@ class TestWg(VppTestCase):
# send a handsake from the peer with an invalid MAC
p = peer_1.mk_handshake(self.pg1, True)
p[WireguardInitiation].mac1 = b"foobar"
- self.send_and_assert_no_replies(self.pg1, [p])
+ self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
self.assertEqual(
self.base_mac6_err + 1, self.statistics.get_err_counter(self.mac6_error)
@@ -1535,7 +1561,7 @@ class TestWg(VppTestCase):
p = peer_1.mk_handshake(
self.pg1, True, X25519PrivateKey.generate().public_key()
)
- self.send_and_assert_no_replies(self.pg1, [p])
+ self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
self.assertEqual(
self.base_peer6_in_err + 1,
self.statistics.get_err_counter(self.peer6_in_err),
@@ -1556,7 +1582,7 @@ class TestWg(VppTestCase):
/ UDP(sport=555, dport=556)
/ Raw()
)
- self.send_and_assert_no_replies(self.pg0, [p])
+ self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
self.assertEqual(
self.base_kp6_err + 2, self.statistics.get_err_counter(self.kp6_error)
)
@@ -1642,7 +1668,7 @@ class TestWg(VppTestCase):
peer_1 = VppWgPeer(
self, wg0, self.pg1.remote_ip4, port + 1, ["1::3:0/112"]
- ).add_vpp_config(True)
+ ).add_vpp_config()
self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1)
r1 = VppIpRoute(
@@ -1658,7 +1684,7 @@ class TestWg(VppTestCase):
/ UDP(sport=555, dport=556)
/ Raw()
)
- self.send_and_assert_no_replies(self.pg0, [p])
+ self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
self.assertEqual(
self.base_kp6_err + 1, self.statistics.get_err_counter(self.kp6_error)
)
@@ -1666,7 +1692,7 @@ class TestWg(VppTestCase):
# send a handsake from the peer with an invalid MAC
p = peer_1.mk_handshake(self.pg1)
p[WireguardInitiation].mac1 = b"foobar"
- self.send_and_assert_no_replies(self.pg1, [p])
+ self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
self.assertEqual(
self.base_mac4_err + 1, self.statistics.get_err_counter(self.mac4_error)
@@ -1676,7 +1702,7 @@ class TestWg(VppTestCase):
p = peer_1.mk_handshake(
self.pg1, False, X25519PrivateKey.generate().public_key()
)
- self.send_and_assert_no_replies(self.pg1, [p])
+ self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
self.assertEqual(
self.base_peer4_in_err + 1,
self.statistics.get_err_counter(self.peer4_in_err),
@@ -1697,7 +1723,7 @@ class TestWg(VppTestCase):
/ UDP(sport=555, dport=556)
/ Raw()
)
- self.send_and_assert_no_replies(self.pg0, [p])
+ self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
self.assertEqual(
self.base_kp6_err + 2, self.statistics.get_err_counter(self.kp6_error)
)
@@ -1798,7 +1824,7 @@ class TestWg(VppTestCase):
/ UDP(sport=555, dport=556)
/ Raw()
)
- self.send_and_assert_no_replies(self.pg0, [p])
+ self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
self.assertEqual(
self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error)
)
@@ -1806,7 +1832,7 @@ class TestWg(VppTestCase):
# send a handsake from the peer with an invalid MAC
p = peer_1.mk_handshake(self.pg1, True)
p[WireguardInitiation].mac1 = b"foobar"
- self.send_and_assert_no_replies(self.pg1, [p])
+ self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
self.assertEqual(
self.base_mac6_err + 1, self.statistics.get_err_counter(self.mac6_error)
)
@@ -1815,7 +1841,7 @@ class TestWg(VppTestCase):
p = peer_1.mk_handshake(
self.pg1, True, X25519PrivateKey.generate().public_key()
)
- self.send_and_assert_no_replies(self.pg1, [p])
+ self.send_and_assert_no_replies_ignoring_init(self.pg1, [p])
self.assertEqual(
self.base_peer6_in_err + 1,
self.statistics.get_err_counter(self.peer6_in_err),
@@ -1836,7 +1862,7 @@ class TestWg(VppTestCase):
/ UDP(sport=555, dport=556)
/ Raw()
)
- self.send_and_assert_no_replies(self.pg0, [p])
+ self.send_and_assert_no_replies_ignoring_init(self.pg0, [p])
self.assertEqual(
self.base_kp4_err + 2, self.statistics.get_err_counter(self.kp4_error)
)
@@ -2326,7 +2352,7 @@ class WireguardHandoffTests(TestWg):
self.assertEqual(rx[IP].ttl, 19)
# send a packets that are routed into the tunnel
- # from owrker 0
+ # from worker 0
rxs = self.send_and_expect(self.pg0, pe * 255, self.pg1, worker=0)
peer_1.validate_encapped(rxs, pe)