diff options
author | Klement Sekera <klement.sekera@gmail.com> | 2022-04-26 19:02:15 +0200 |
---|---|---|
committer | Ole Tr�an <otroan@employees.org> | 2022-05-10 18:52:08 +0000 |
commit | d9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch) | |
tree | 4f786cfd8ebc2443cb11e11b74c8657204068898 /test/test_wireguard.py | |
parent | f90348bcb4afd0af2611cefc43b17ef3042b511c (diff) |
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is
much faster, stable PEP8 compliant code style checker offering also
automatic formatting. It aims to be very stable and produce smallest
diffs. It's used by many small and big projects.
Running checkstyle with black takes a few seconds with a terse output.
Thus, test-checkstyle-diff is no longer necessary.
Expand scope of checkstyle to all python files in the repo, replacing
test-checkstyle with checkstyle-python.
Also, fixstyle-python is now available for automatic style formatting.
Note: python virtualenv has been consolidated in test/Makefile,
test/requirements*.txt which will eventually be moved to a central
location. This is required to simply the automated generation of
docker executor images in the CI.
Type: improvement
Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/test_wireguard.py')
-rw-r--r-- | test/test_wireguard.py | 1155 |
1 files changed, 650 insertions, 505 deletions
diff --git a/test/test_wireguard.py b/test/test_wireguard.py index e844b1d5c0a..1a955b162f1 100644 --- a/test/test_wireguard.py +++ b/test/test_wireguard.py @@ -11,12 +11,22 @@ from scapy.packet import Raw from scapy.layers.l2 import Ether, ARP from scapy.layers.inet import IP, UDP from scapy.layers.inet6 import IPv6 -from scapy.contrib.wireguard import Wireguard, WireguardResponse, \ - WireguardInitiation, WireguardTransport -from cryptography.hazmat.primitives.asymmetric.x25519 import \ - X25519PrivateKey, X25519PublicKey -from cryptography.hazmat.primitives.serialization import Encoding, \ - PrivateFormat, PublicFormat, NoEncryption +from scapy.contrib.wireguard import ( + Wireguard, + WireguardResponse, + WireguardInitiation, + WireguardTransport, +) +from cryptography.hazmat.primitives.asymmetric.x25519 import ( + X25519PrivateKey, + X25519PublicKey, +) +from cryptography.hazmat.primitives.serialization import ( + Encoding, + PrivateFormat, + PublicFormat, + NoEncryption, +) from cryptography.hazmat.primitives.hashes import BLAKE2s, Hash from cryptography.hazmat.primitives.hmac import HMAC from cryptography.hazmat.backends import default_backend @@ -39,14 +49,11 @@ Wg test. def private_key_bytes(k): - return k.private_bytes(Encoding.Raw, - PrivateFormat.Raw, - NoEncryption()) + return k.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption()) def public_key_bytes(k): - return k.public_bytes(Encoding.Raw, - PublicFormat.Raw) + return k.public_bytes(Encoding.Raw, PublicFormat.Raw) class VppWgInterface(VppInterface): @@ -69,37 +76,41 @@ class VppWgInterface(VppInterface): return private_key_bytes(self.private_key) def add_vpp_config(self): - r = self.test.vapi.wireguard_interface_create(interface={ - 'user_instance': 0xffffffff, - 'port': self.port, - 'src_ip': self.src, - 'private_key': private_key_bytes(self.private_key), - 'generate_key': False - }) + r = self.test.vapi.wireguard_interface_create( + interface={ + "user_instance": 0xFFFFFFFF, + "port": self.port, + "src_ip": self.src, + "private_key": private_key_bytes(self.private_key), + "generate_key": False, + } + ) self.set_sw_if_index(r.sw_if_index) self.test.registry.register(self, self.test.logger) return self def remove_vpp_config(self): - self.test.vapi.wireguard_interface_delete( - sw_if_index=self._sw_if_index) + self.test.vapi.wireguard_interface_delete(sw_if_index=self._sw_if_index) def query_vpp_config(self): - ts = self.test.vapi.wireguard_interface_dump(sw_if_index=0xffffffff) + ts = self.test.vapi.wireguard_interface_dump(sw_if_index=0xFFFFFFFF) for t in ts: - if t.interface.sw_if_index == self._sw_if_index and \ - str(t.interface.src_ip) == self.src and \ - t.interface.port == self.port and \ - t.interface.private_key == private_key_bytes(self.private_key): + if ( + t.interface.sw_if_index == self._sw_if_index + and str(t.interface.src_ip) == self.src + and t.interface.port == self.port + and t.interface.private_key == private_key_bytes(self.private_key) + ): return True return False - def want_events(self, peer_index=0xffffffff): + def want_events(self, peer_index=0xFFFFFFFF): self.test.vapi.want_wireguard_peer_events( enable_disable=1, pid=os.getpid(), sw_if_index=self._sw_if_index, - peer_index=peer_index) + peer_index=peer_index, + ) def wait_events(self, expect, peers, timeout=5): for i in range(len(peers)): @@ -118,8 +129,7 @@ def find_route(test, prefix, is_ip6, table_id=0): routes = test.vapi.ip_route_dump(table_id, is_ip6) for e in routes: - if table_id == e.route.table_id \ - and str(e.route.prefix) == str(prefix): + if table_id == e.route.table_id and str(e.route.prefix) == str(prefix): return True return False @@ -129,14 +139,7 @@ NOISE_IDENTIFIER_NAME = b"WireGuard v1 zx2c4 Jason@zx2c4.com" class VppWgPeer(VppObject): - - def __init__(self, - test, - itf, - endpoint, - port, - allowed_ips, - persistent_keepalive=15): + def __init__(self, test, itf, endpoint, port, allowed_ips, persistent_keepalive=15): self._test = test self.itf = itf self.endpoint = endpoint @@ -153,13 +156,15 @@ class VppWgPeer(VppObject): def add_vpp_config(self, is_ip6=False): rv = self._test.vapi.wireguard_peer_add( peer={ - 'public_key': self.public_key_bytes(), - 'port': self.port, - 'endpoint': self.endpoint, - 'n_allowed_ips': len(self.allowed_ips), - 'allowed_ips': self.allowed_ips, - 'sw_if_index': self.itf.sw_if_index, - 'persistent_keepalive': self.persistent_keepalive}) + "public_key": self.public_key_bytes(), + "port": self.port, + "endpoint": self.endpoint, + "n_allowed_ips": len(self.allowed_ips), + "allowed_ips": self.allowed_ips, + "sw_if_index": self.itf.sw_if_index, + "persistent_keepalive": self.persistent_keepalive, + } + ) self.index = rv.peer_index self.receiver_index = self.index + 1 self._test.registry.register(self, self._test.logger) @@ -169,7 +174,7 @@ class VppWgPeer(VppObject): self._test.vapi.wireguard_peer_remove(peer_index=self.index) def object_id(self): - return ("wireguard-peer-%s" % self.index) + return "wireguard-peer-%s" % self.index def public_key_bytes(self): return public_key_bytes(self.public_key) @@ -178,11 +183,13 @@ class VppWgPeer(VppObject): peers = self._test.vapi.wireguard_peers_dump() for p in peers: - if p.peer.public_key == self.public_key_bytes() and \ - p.peer.port == self.port and \ - str(p.peer.endpoint) == self.endpoint and \ - p.peer.sw_if_index == self.itf.sw_if_index and \ - len(self.allowed_ips) == p.peer.n_allowed_ips: + if ( + p.peer.public_key == self.public_key_bytes() + and p.peer.port == self.port + and str(p.peer.endpoint) == self.endpoint + and p.peer.sw_if_index == self.itf.sw_if_index + and len(self.allowed_ips) == p.peer.n_allowed_ips + ): self.allowed_ips.sort() p.peer.allowed_ips.sort() @@ -197,13 +204,17 @@ class VppWgPeer(VppObject): def mk_tunnel_header(self, tx_itf, is_ip6=False): if is_ip6 is False: - return (Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac) / - IP(src=self.endpoint, dst=self.itf.src) / - UDP(sport=self.port, dport=self.itf.port)) + return ( + Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac) + / IP(src=self.endpoint, dst=self.itf.src) + / UDP(sport=self.port, dport=self.itf.port) + ) else: - return (Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac) / - IPv6(src=self.endpoint, dst=self.itf.src) / - UDP(sport=self.port, dport=self.itf.port)) + return ( + Ether(dst=tx_itf.local_mac, src=tx_itf.remote_mac) + / IPv6(src=self.endpoint, dst=self.itf.src) + / UDP(sport=self.port, dport=self.itf.port) + ) def noise_init(self, public_key=None): self.noise.set_prologue(NOISE_IDENTIFIER_NAME) @@ -214,12 +225,12 @@ class VppWgPeer(VppObject): # local/this private self.noise.set_keypair_from_private_bytes( - Keypair.STATIC, - private_key_bytes(self.private_key)) + Keypair.STATIC, private_key_bytes(self.private_key) + ) # remote's public self.noise.set_keypair_from_public_bytes( - Keypair.REMOTE_STATIC, - public_key_bytes(public_key)) + Keypair.REMOTE_STATIC, public_key_bytes(public_key) + ) self.noise.start_handshake() @@ -227,7 +238,7 @@ class VppWgPeer(VppObject): self.noise.set_as_initiator() self.noise_init(public_key) - p = (Wireguard() / WireguardInitiation()) + p = Wireguard() / WireguardInitiation() p[Wireguard].message_type = 1 p[Wireguard].reserved_zero = 0 @@ -236,8 +247,11 @@ class VppWgPeer(VppObject): # some random data for the message # lifted from the noise protocol's wireguard example now = datetime.datetime.now() - tai = struct.pack('!qi', 4611686018427387914 + int(now.timestamp()), - int(now.microsecond * 1e3)) + tai = struct.pack( + "!qi", + 4611686018427387914 + int(now.timestamp()), + int(now.microsecond * 1e3), + ) b = self.noise.write_message(payload=tai) # load noise into init message @@ -246,14 +260,13 @@ class VppWgPeer(VppObject): p[WireguardInitiation].encrypted_timestamp = b[80:108] # generate the mac1 hash - mac_key = blake2s(b'mac1----' + - self.itf.public_key_bytes()).digest() - p[WireguardInitiation].mac1 = blake2s(bytes(p)[0:116], - digest_size=16, - key=mac_key).digest() + mac_key = blake2s(b"mac1----" + self.itf.public_key_bytes()).digest() + p[WireguardInitiation].mac1 = blake2s( + bytes(p)[0:116], digest_size=16, key=mac_key + ).digest() p[WireguardInitiation].mac2 = bytearray(16) - p = (self.mk_tunnel_header(tx_itf, is_ip6) / p) + p = self.mk_tunnel_header(tx_itf, is_ip6) / p return p @@ -281,11 +294,8 @@ class VppWgPeer(VppObject): self.sender = init[WireguardInitiation].sender_index # validate the hash - mac_key = blake2s(b'mac1----' + - public_key_bytes(self.public_key)).digest() - mac1 = blake2s(bytes(init)[0:-32], - digest_size=16, - key=mac_key).digest() + mac_key = blake2s(b"mac1----" + public_key_bytes(self.public_key)).digest() + mac1 = blake2s(bytes(init)[0:-32], digest_size=16, key=mac_key).digest() self._test.assertEqual(init[WireguardInitiation].mac1, mac1) # this passes only unencrypted_ephemeral, encrypted_static, @@ -294,19 +304,17 @@ class VppWgPeer(VppObject): # build the response b = self.noise.write_message() - mac_key = blake2s(b'mac1----' + - public_key_bytes(self.itf.public_key)).digest() - resp = (Wireguard(message_type=2, reserved_zero=0) / - WireguardResponse(sender_index=self.receiver_index, - receiver_index=self.sender, - unencrypted_ephemeral=b[0:32], - encrypted_nothing=b[32:])) - mac1 = blake2s(bytes(resp)[:-32], - digest_size=16, - key=mac_key).digest() + mac_key = blake2s(b"mac1----" + public_key_bytes(self.itf.public_key)).digest() + resp = Wireguard(message_type=2, reserved_zero=0) / WireguardResponse( + sender_index=self.receiver_index, + receiver_index=self.sender, + unencrypted_ephemeral=b[0:32], + encrypted_nothing=b[32:], + ) + mac1 = blake2s(bytes(resp)[:-32], digest_size=16, key=mac_key).digest() resp[WireguardResponse].mac1 = mac1 - resp = (self.mk_tunnel_header(tx_itf, is_ip6) / resp) + resp = self.mk_tunnel_header(tx_itf, is_ip6) / resp self._test.assertTrue(self.noise.handshake_finished) return resp @@ -318,13 +326,14 @@ class VppWgPeer(VppObject): self._test.assertEqual(resp[Wireguard].message_type, 2) self._test.assertEqual(resp[Wireguard].reserved_zero, 0) - self._test.assertEqual(resp[WireguardResponse].receiver_index, - self.receiver_index) + self._test.assertEqual( + resp[WireguardResponse].receiver_index, self.receiver_index + ) self.sender = resp[Wireguard].sender_index payload = self.noise.read_message(bytes(resp)[12:60]) - self._test.assertEqual(payload, b'') + self._test.assertEqual(payload, b"") self._test.assertTrue(self.noise.handshake_finished) def decrypt_transport(self, p, is_ip6=False): @@ -333,11 +342,11 @@ class VppWgPeer(VppObject): p = Wireguard(p[Raw]) self._test.assertEqual(p[Wireguard].message_type, 4) self._test.assertEqual(p[Wireguard].reserved_zero, 0) - self._test.assertEqual(p[WireguardTransport].receiver_index, - self.receiver_index) + self._test.assertEqual( + p[WireguardTransport].receiver_index, self.receiver_index + ) - d = self.noise.decrypt( - p[WireguardTransport].encrypted_encapsulated_packet) + d = self.noise.decrypt(p[WireguardTransport].encrypted_encapsulated_packet) return d def encrypt_transport(self, p): @@ -350,20 +359,21 @@ class VppWgPeer(VppObject): # chech the oringial packet is present self._test.assertEqual(rx[IP].dst, tx[IP].dst) - self._test.assertEqual(rx[IP].ttl, tx[IP].ttl-1) + self._test.assertEqual(rx[IP].ttl, tx[IP].ttl - 1) else: rx = IPv6(self.decrypt_transport(rx)) # chech the oringial packet is present self._test.assertEqual(rx[IPv6].dst, tx[IPv6].dst) - self._test.assertEqual(rx[IPv6].ttl, tx[IPv6].ttl-1) + self._test.assertEqual(rx[IPv6].ttl, tx[IPv6].ttl - 1) def want_events(self): self._test.vapi.want_wireguard_peer_events( enable_disable=1, pid=os.getpid(), peer_index=self.index, - sw_if_index=self.itf.sw_if_index) + sw_if_index=self.itf.sw_if_index, + ) def wait_event(self, expect, timeout=5): rv = self._test.vapi.wait_for_event(timeout, "wireguard_peer_event") @@ -372,14 +382,14 @@ class VppWgPeer(VppObject): class TestWg(VppTestCase): - """ Wireguard Test Case """ + """Wireguard Test Case""" error_str = compile(r"Error") - wg4_output_node_name = '/err/wg4-output-tun/' - wg4_input_node_name = '/err/wg4-input/' - wg6_output_node_name = '/err/wg6-output-tun/' - wg6_input_node_name = '/err/wg6-input/' + wg4_output_node_name = "/err/wg4-output-tun/" + wg4_input_node_name = "/err/wg4-input/" + wg6_output_node_name = "/err/wg6-output-tun/" + wg6_input_node_name = "/err/wg6-input/" kp4_error = wg4_output_node_name + "Keypair error" mac4_error = wg4_input_node_name + "Invalid MAC handshake" peer4_error = wg4_input_node_name + "Peer error" @@ -417,13 +427,11 @@ class TestWg(VppTestCase): self.base_peer6_err = self.statistics.get_err_counter(self.peer6_error) def test_wg_interface(self): - """ Simple interface creation """ + """Simple interface creation""" port = 12312 # Create interface - wg0 = VppWgInterface(self, - self.pg1.local_ip4, - port).add_vpp_config() + wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config() self.logger.info(self.vapi.cli("sh int")) @@ -431,27 +439,29 @@ class TestWg(VppTestCase): wg0.remove_vpp_config() def test_handshake_hash(self): - """ test hashing an init message """ + """test hashing an init message""" # a init packet generated by linux given the key below - h = "0100000098b9032b" \ - "55cc4b39e73c3d24" \ - "a2a1ab884b524a81" \ - "1808bb86640fb70d" \ - "e93154fec1879125" \ - "ab012624a27f0b75" \ - "c0a2582f438ddb5f" \ - "8e768af40b4ab444" \ - "02f9ff473e1b797e" \ - "80d39d93c5480c82" \ - "a3d4510f70396976" \ - "586fb67300a5167b" \ - "ae6ca3ff3dfd00eb" \ - "59be198810f5aa03" \ - "6abc243d2155ee4f" \ - "2336483900aef801" \ - "08752cd700000000" \ - "0000000000000000" \ + h = ( + "0100000098b9032b" + "55cc4b39e73c3d24" + "a2a1ab884b524a81" + "1808bb86640fb70d" + "e93154fec1879125" + "ab012624a27f0b75" + "c0a2582f438ddb5f" + "8e768af40b4ab444" + "02f9ff473e1b797e" + "80d39d93c5480c82" + "a3d4510f70396976" + "586fb67300a5167b" + "ae6ca3ff3dfd00eb" + "59be198810f5aa03" + "6abc243d2155ee4f" + "2336483900aef801" + "08752cd700000000" + "0000000000000000" "00000000" + ) b = bytearray.fromhex(h) tgt = Wireguard(b) @@ -463,40 +473,34 @@ class TestWg(VppTestCase): # strip the macs and build a new packet init = b[0:-32] - mac_key = blake2s(b'mac1----' + public_key_bytes(pub)).digest() - init += blake2s(init, - digest_size=16, - key=mac_key).digest() - init += b'\x00' * 16 + mac_key = blake2s(b"mac1----" + public_key_bytes(pub)).digest() + init += blake2s(init, digest_size=16, key=mac_key).digest() + init += b"\x00" * 16 act = Wireguard(init) self.assertEqual(tgt, act) def test_wg_peer_resp(self): - """ Send handshake response """ + """Send handshake response""" port = 12323 # Create interfaces - wg0 = VppWgInterface(self, - self.pg1.local_ip4, - port).add_vpp_config() + wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config() wg0.admin_up() wg0.config_ip4() self.pg_enable_capture(self.pg_interfaces) self.pg_start() - peer_1 = VppWgPeer(self, - wg0, - self.pg1.remote_ip4, - port+1, - ["10.11.3.0/24"]).add_vpp_config() + peer_1 = VppWgPeer( + self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"] + ).add_vpp_config() self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1) - r1 = VppIpRoute(self, "10.11.3.0", 24, - [VppRoutePath("10.11.3.1", - wg0.sw_if_index)]).add_vpp_config() + r1 = VppIpRoute( + self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)] + ).add_vpp_config() # wait for the peer to send a handshake rx = self.pg1.get_capture(1, timeout=2) @@ -513,10 +517,12 @@ class TestWg(VppTestCase): self.assertEqual(0, len(b)) # send a packets that are routed into the tunnel - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst="10.11.3.2") / - UDP(sport=555, dport=556) / - Raw(b'\x00' * 80)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst="10.11.3.2") + / UDP(sport=555, dport=556) + / Raw(b"\x00" * 80) + ) rxs = self.send_and_expect(self.pg0, p * 255, self.pg1) @@ -524,15 +530,24 @@ class TestWg(VppTestCase): # send packets into the tunnel, expect to receive them on # the other side - p = [(peer_1.mk_tunnel_header(self.pg1) / - Wireguard(message_type=4, reserved_zero=0) / - WireguardTransport( - receiver_index=peer_1.sender, - counter=ii, - encrypted_encapsulated_packet=peer_1.encrypt_transport( - (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) / - UDP(sport=222, dport=223) / - Raw())))) for ii in range(255)] + p = [ + ( + peer_1.mk_tunnel_header(self.pg1) + / Wireguard(message_type=4, reserved_zero=0) + / WireguardTransport( + receiver_index=peer_1.sender, + counter=ii, + encrypted_encapsulated_packet=peer_1.encrypt_transport( + ( + IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) + / UDP(sport=222, dport=223) + / Raw() + ) + ), + ) + ) + for ii in range(255) + ] rxs = self.send_and_expect(self.pg1, p, self.pg0) @@ -545,53 +560,54 @@ class TestWg(VppTestCase): wg0.remove_vpp_config() def test_wg_peer_v4o4(self): - """ Test v4o4""" + """Test v4o4""" port = 12333 # Create interfaces - wg0 = VppWgInterface(self, - self.pg1.local_ip4, - port).add_vpp_config() + wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config() wg0.admin_up() wg0.config_ip4() - peer_1 = VppWgPeer(self, - wg0, - self.pg1.remote_ip4, - port+1, - ["10.11.3.0/24"]).add_vpp_config() + peer_1 = VppWgPeer( + self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.3.0/24"] + ).add_vpp_config() self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1) - r1 = VppIpRoute(self, "10.11.3.0", 24, - [VppRoutePath("10.11.3.1", - wg0.sw_if_index)]).add_vpp_config() + r1 = VppIpRoute( + self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)] + ).add_vpp_config() # route a packet into the wg interface # use the allowed-ip prefix # this is dropped because the peer is not initiated - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst="10.11.3.2") / - UDP(sport=555, dport=556) / - Raw()) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst="10.11.3.2") + / UDP(sport=555, dport=556) + / Raw() + ) self.send_and_assert_no_replies(self.pg0, [p]) - self.assertEqual(self.base_kp4_err + 1, - self.statistics.get_err_counter(self.kp4_error)) + self.assertEqual( + self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error) + ) # send a handsake from the peer with an invalid MAC p = peer_1.mk_handshake(self.pg1) - p[WireguardInitiation].mac1 = b'foobar' + p[WireguardInitiation].mac1 = b"foobar" self.send_and_assert_no_replies(self.pg1, [p]) - self.assertEqual(self.base_mac4_err + 1, - self.statistics.get_err_counter(self.mac4_error)) + self.assertEqual( + self.base_mac4_err + 1, self.statistics.get_err_counter(self.mac4_error) + ) # send a handsake from the peer but signed by the wrong key. - p = peer_1.mk_handshake(self.pg1, - False, - X25519PrivateKey.generate().public_key()) + p = peer_1.mk_handshake( + self.pg1, False, X25519PrivateKey.generate().public_key() + ) self.send_and_assert_no_replies(self.pg1, [p]) - self.assertEqual(self.base_peer4_err + 1, - self.statistics.get_err_counter(self.peer4_error)) + self.assertEqual( + self.base_peer4_err + 1, self.statistics.get_err_counter(self.peer4_error) + ) # send a valid handsake init for which we expect a response p = peer_1.mk_handshake(self.pg1) @@ -602,25 +618,31 @@ class TestWg(VppTestCase): # route a packet into the wg interface # this is dropped because the peer is still not initiated - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst="10.11.3.2") / - UDP(sport=555, dport=556) / - Raw()) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst="10.11.3.2") + / UDP(sport=555, dport=556) + / Raw() + ) self.send_and_assert_no_replies(self.pg0, [p]) - self.assertEqual(self.base_kp4_err + 2, - self.statistics.get_err_counter(self.kp4_error)) + self.assertEqual( + self.base_kp4_err + 2, self.statistics.get_err_counter(self.kp4_error) + ) # send a data packet from the peer through the tunnel # this completes the handshake - p = (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) / - UDP(sport=222, dport=223) / - Raw()) + p = ( + IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) + / UDP(sport=222, dport=223) + / Raw() + ) d = peer_1.encrypt_transport(p) - p = (peer_1.mk_tunnel_header(self.pg1) / - (Wireguard(message_type=4, reserved_zero=0) / - WireguardTransport(receiver_index=peer_1.sender, - counter=0, - encrypted_encapsulated_packet=d))) + p = peer_1.mk_tunnel_header(self.pg1) / ( + Wireguard(message_type=4, reserved_zero=0) + / WireguardTransport( + receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d + ) + ) rxs = self.send_and_expect(self.pg1, [p], self.pg0) for rx in rxs: @@ -628,10 +650,12 @@ class TestWg(VppTestCase): self.assertEqual(rx[IP].ttl, 19) # send a packets that are routed into the tunnel - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst="10.11.3.2") / - UDP(sport=555, dport=556) / - Raw(b'\x00' * 80)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst="10.11.3.2") + / UDP(sport=555, dport=556) + / Raw(b"\x00" * 80) + ) rxs = self.send_and_expect(self.pg0, p * 255, self.pg1) @@ -640,19 +664,28 @@ class TestWg(VppTestCase): # chech the oringial packet is present self.assertEqual(rx[IP].dst, p[IP].dst) - self.assertEqual(rx[IP].ttl, p[IP].ttl-1) + self.assertEqual(rx[IP].ttl, p[IP].ttl - 1) # send packets into the tunnel, expect to receive them on # the other side - p = [(peer_1.mk_tunnel_header(self.pg1) / - Wireguard(message_type=4, reserved_zero=0) / - WireguardTransport( - receiver_index=peer_1.sender, - counter=ii+1, - encrypted_encapsulated_packet=peer_1.encrypt_transport( - (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) / - UDP(sport=222, dport=223) / - Raw())))) for ii in range(255)] + p = [ + ( + peer_1.mk_tunnel_header(self.pg1) + / Wireguard(message_type=4, reserved_zero=0) + / WireguardTransport( + receiver_index=peer_1.sender, + counter=ii + 1, + encrypted_encapsulated_packet=peer_1.encrypt_transport( + ( + IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) + / UDP(sport=222, dport=223) + / Raw() + ) + ), + ) + ) + for ii in range(255) + ] rxs = self.send_and_expect(self.pg1, p, self.pg0) @@ -665,56 +698,57 @@ class TestWg(VppTestCase): wg0.remove_vpp_config() def test_wg_peer_v6o6(self): - """ Test v6o6""" + """Test v6o6""" port = 12343 # Create interfaces - wg0 = VppWgInterface(self, - self.pg1.local_ip6, - port).add_vpp_config() + wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config() wg0.admin_up() wg0.config_ip6() - peer_1 = VppWgPeer(self, - wg0, - self.pg1.remote_ip6, - port+1, - ["1::3:0/112"]).add_vpp_config(True) + peer_1 = VppWgPeer( + self, wg0, self.pg1.remote_ip6, port + 1, ["1::3:0/112"] + ).add_vpp_config(True) self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1) - r1 = VppIpRoute(self, "1::3:0", 112, - [VppRoutePath("1::3:1", - wg0.sw_if_index)]).add_vpp_config() + r1 = VppIpRoute( + self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)] + ).add_vpp_config() # route a packet into the wg interface # use the allowed-ip prefix # this is dropped because the peer is not initiated - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IPv6(src=self.pg0.remote_ip6, dst="1::3:2") / - UDP(sport=555, dport=556) / - Raw()) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IPv6(src=self.pg0.remote_ip6, dst="1::3:2") + / UDP(sport=555, dport=556) + / Raw() + ) self.send_and_assert_no_replies(self.pg0, [p]) - self.assertEqual(self.base_kp6_err + 1, - self.statistics.get_err_counter(self.kp6_error)) + self.assertEqual( + self.base_kp6_err + 1, self.statistics.get_err_counter(self.kp6_error) + ) # send a handsake from the peer with an invalid MAC p = peer_1.mk_handshake(self.pg1, True) - p[WireguardInitiation].mac1 = b'foobar' + p[WireguardInitiation].mac1 = b"foobar" self.send_and_assert_no_replies(self.pg1, [p]) - self.assertEqual(self.base_mac6_err + 1, - self.statistics.get_err_counter(self.mac6_error)) + self.assertEqual( + self.base_mac6_err + 1, self.statistics.get_err_counter(self.mac6_error) + ) # send a handsake from the peer but signed by the wrong key. - p = peer_1.mk_handshake(self.pg1, - True, - X25519PrivateKey.generate().public_key()) + p = peer_1.mk_handshake( + self.pg1, True, X25519PrivateKey.generate().public_key() + ) self.send_and_assert_no_replies(self.pg1, [p]) - self.assertEqual(self.base_peer6_err + 1, - self.statistics.get_err_counter(self.peer6_error)) + self.assertEqual( + self.base_peer6_err + 1, self.statistics.get_err_counter(self.peer6_error) + ) # send a valid handsake init for which we expect a response p = peer_1.mk_handshake(self.pg1, True) @@ -725,25 +759,31 @@ class TestWg(VppTestCase): # route a packet into the wg interface # this is dropped because the peer is still not initiated - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IPv6(src=self.pg0.remote_ip6, dst="1::3:2") / - UDP(sport=555, dport=556) / - Raw()) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IPv6(src=self.pg0.remote_ip6, dst="1::3:2") + / UDP(sport=555, dport=556) + / Raw() + ) self.send_and_assert_no_replies(self.pg0, [p]) - self.assertEqual(self.base_kp6_err + 2, - self.statistics.get_err_counter(self.kp6_error)) + self.assertEqual( + self.base_kp6_err + 2, self.statistics.get_err_counter(self.kp6_error) + ) # send a data packet from the peer through the tunnel # this completes the handshake - p = (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) / - UDP(sport=222, dport=223) / - Raw()) + p = ( + IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) + / UDP(sport=222, dport=223) + / Raw() + ) d = peer_1.encrypt_transport(p) - p = (peer_1.mk_tunnel_header(self.pg1, True) / - (Wireguard(message_type=4, reserved_zero=0) / - WireguardTransport(receiver_index=peer_1.sender, - counter=0, - encrypted_encapsulated_packet=d))) + p = peer_1.mk_tunnel_header(self.pg1, True) / ( + Wireguard(message_type=4, reserved_zero=0) + / WireguardTransport( + receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d + ) + ) rxs = self.send_and_expect(self.pg1, [p], self.pg0) for rx in rxs: @@ -751,10 +791,12 @@ class TestWg(VppTestCase): self.assertEqual(rx[IPv6].hlim, 19) # send a packets that are routed into the tunnel - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IPv6(src=self.pg0.remote_ip6, dst="1::3:2") / - UDP(sport=555, dport=556) / - Raw(b'\x00' * 80)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IPv6(src=self.pg0.remote_ip6, dst="1::3:2") + / UDP(sport=555, dport=556) + / Raw(b"\x00" * 80) + ) rxs = self.send_and_expect(self.pg0, p * 255, self.pg1) @@ -763,19 +805,28 @@ class TestWg(VppTestCase): # chech the oringial packet is present self.assertEqual(rx[IPv6].dst, p[IPv6].dst) - self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim-1) + self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim - 1) # send packets into the tunnel, expect to receive them on # the other side - p = [(peer_1.mk_tunnel_header(self.pg1, True) / - Wireguard(message_type=4, reserved_zero=0) / - WireguardTransport( - receiver_index=peer_1.sender, - counter=ii+1, - encrypted_encapsulated_packet=peer_1.encrypt_transport( - (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) / - UDP(sport=222, dport=223) / - Raw())))) for ii in range(255)] + p = [ + ( + peer_1.mk_tunnel_header(self.pg1, True) + / Wireguard(message_type=4, reserved_zero=0) + / WireguardTransport( + receiver_index=peer_1.sender, + counter=ii + 1, + encrypted_encapsulated_packet=peer_1.encrypt_transport( + ( + IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) + / UDP(sport=222, dport=223) + / Raw() + ) + ), + ) + ) + for ii in range(255) + ] rxs = self.send_and_expect(self.pg1, p, self.pg0) @@ -788,54 +839,55 @@ class TestWg(VppTestCase): wg0.remove_vpp_config() def test_wg_peer_v6o4(self): - """ Test v6o4""" + """Test v6o4""" port = 12353 # Create interfaces - wg0 = VppWgInterface(self, - self.pg1.local_ip4, - port).add_vpp_config() + wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config() wg0.admin_up() wg0.config_ip6() - peer_1 = VppWgPeer(self, - wg0, - self.pg1.remote_ip4, - port+1, - ["1::3:0/112"]).add_vpp_config(True) + peer_1 = VppWgPeer( + self, wg0, self.pg1.remote_ip4, port + 1, ["1::3:0/112"] + ).add_vpp_config(True) self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1) - r1 = VppIpRoute(self, "1::3:0", 112, - [VppRoutePath("1::3:1", - wg0.sw_if_index)]).add_vpp_config() + r1 = VppIpRoute( + self, "1::3:0", 112, [VppRoutePath("1::3:1", wg0.sw_if_index)] + ).add_vpp_config() # route a packet into the wg interface # use the allowed-ip prefix # this is dropped because the peer is not initiated - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IPv6(src=self.pg0.remote_ip6, dst="1::3:2") / - UDP(sport=555, dport=556) / - Raw()) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IPv6(src=self.pg0.remote_ip6, dst="1::3:2") + / UDP(sport=555, dport=556) + / Raw() + ) self.send_and_assert_no_replies(self.pg0, [p]) - self.assertEqual(self.base_kp6_err + 1, - self.statistics.get_err_counter(self.kp6_error)) + self.assertEqual( + self.base_kp6_err + 1, self.statistics.get_err_counter(self.kp6_error) + ) # send a handsake from the peer with an invalid MAC p = peer_1.mk_handshake(self.pg1) - p[WireguardInitiation].mac1 = b'foobar' + p[WireguardInitiation].mac1 = b"foobar" self.send_and_assert_no_replies(self.pg1, [p]) - self.assertEqual(self.base_mac4_err + 1, - self.statistics.get_err_counter(self.mac4_error)) + self.assertEqual( + self.base_mac4_err + 1, self.statistics.get_err_counter(self.mac4_error) + ) # send a handsake from the peer but signed by the wrong key. - p = peer_1.mk_handshake(self.pg1, - False, - X25519PrivateKey.generate().public_key()) + p = peer_1.mk_handshake( + self.pg1, False, X25519PrivateKey.generate().public_key() + ) self.send_and_assert_no_replies(self.pg1, [p]) - self.assertEqual(self.base_peer4_err + 1, - self.statistics.get_err_counter(self.peer4_error)) + self.assertEqual( + self.base_peer4_err + 1, self.statistics.get_err_counter(self.peer4_error) + ) # send a valid handsake init for which we expect a response p = peer_1.mk_handshake(self.pg1) @@ -846,25 +898,31 @@ class TestWg(VppTestCase): # route a packet into the wg interface # this is dropped because the peer is still not initiated - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IPv6(src=self.pg0.remote_ip6, dst="1::3:2") / - UDP(sport=555, dport=556) / - Raw()) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IPv6(src=self.pg0.remote_ip6, dst="1::3:2") + / UDP(sport=555, dport=556) + / Raw() + ) self.send_and_assert_no_replies(self.pg0, [p]) - self.assertEqual(self.base_kp6_err + 2, - self.statistics.get_err_counter(self.kp6_error)) + self.assertEqual( + self.base_kp6_err + 2, self.statistics.get_err_counter(self.kp6_error) + ) # send a data packet from the peer through the tunnel # this completes the handshake - p = (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) / - UDP(sport=222, dport=223) / - Raw()) + p = ( + IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) + / UDP(sport=222, dport=223) + / Raw() + ) d = peer_1.encrypt_transport(p) - p = (peer_1.mk_tunnel_header(self.pg1) / - (Wireguard(message_type=4, reserved_zero=0) / - WireguardTransport(receiver_index=peer_1.sender, - counter=0, - encrypted_encapsulated_packet=d))) + p = peer_1.mk_tunnel_header(self.pg1) / ( + Wireguard(message_type=4, reserved_zero=0) + / WireguardTransport( + receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d + ) + ) rxs = self.send_and_expect(self.pg1, [p], self.pg0) for rx in rxs: @@ -872,10 +930,12 @@ class TestWg(VppTestCase): self.assertEqual(rx[IPv6].hlim, 19) # send a packets that are routed into the tunnel - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IPv6(src=self.pg0.remote_ip6, dst="1::3:2") / - UDP(sport=555, dport=556) / - Raw(b'\x00' * 80)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IPv6(src=self.pg0.remote_ip6, dst="1::3:2") + / UDP(sport=555, dport=556) + / Raw(b"\x00" * 80) + ) rxs = self.send_and_expect(self.pg0, p * 255, self.pg1) @@ -884,19 +944,28 @@ class TestWg(VppTestCase): # chech the oringial packet is present self.assertEqual(rx[IPv6].dst, p[IPv6].dst) - self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim-1) + self.assertEqual(rx[IPv6].hlim, p[IPv6].hlim - 1) # send packets into the tunnel, expect to receive them on # the other side - p = [(peer_1.mk_tunnel_header(self.pg1) / - Wireguard(message_type=4, reserved_zero=0) / - WireguardTransport( - receiver_index=peer_1.sender, - counter=ii+1, - encrypted_encapsulated_packet=peer_1.encrypt_transport( - (IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) / - UDP(sport=222, dport=223) / - Raw())))) for ii in range(255)] + p = [ + ( + peer_1.mk_tunnel_header(self.pg1) + / Wireguard(message_type=4, reserved_zero=0) + / WireguardTransport( + receiver_index=peer_1.sender, + counter=ii + 1, + encrypted_encapsulated_packet=peer_1.encrypt_transport( + ( + IPv6(src="1::3:1", dst=self.pg0.remote_ip6, hlim=20) + / UDP(sport=222, dport=223) + / Raw() + ) + ), + ) + ) + for ii in range(255) + ] rxs = self.send_and_expect(self.pg1, p, self.pg0) @@ -909,53 +978,54 @@ class TestWg(VppTestCase): wg0.remove_vpp_config() def test_wg_peer_v4o6(self): - """ Test v4o6""" + """Test v4o6""" port = 12363 # Create interfaces - wg0 = VppWgInterface(self, - self.pg1.local_ip6, - port).add_vpp_config() + wg0 = VppWgInterface(self, self.pg1.local_ip6, port).add_vpp_config() wg0.admin_up() wg0.config_ip4() - peer_1 = VppWgPeer(self, - wg0, - self.pg1.remote_ip6, - port+1, - ["10.11.3.0/24"]).add_vpp_config() + peer_1 = VppWgPeer( + self, wg0, self.pg1.remote_ip6, port + 1, ["10.11.3.0/24"] + ).add_vpp_config() self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1) - r1 = VppIpRoute(self, "10.11.3.0", 24, - [VppRoutePath("10.11.3.1", - wg0.sw_if_index)]).add_vpp_config() + r1 = VppIpRoute( + self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)] + ).add_vpp_config() # route a packet into the wg interface # use the allowed-ip prefix # this is dropped because the peer is not initiated - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst="10.11.3.2") / - UDP(sport=555, dport=556) / - Raw()) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst="10.11.3.2") + / UDP(sport=555, dport=556) + / Raw() + ) self.send_and_assert_no_replies(self.pg0, [p]) - self.assertEqual(self.base_kp4_err + 1, - self.statistics.get_err_counter(self.kp4_error)) + self.assertEqual( + self.base_kp4_err + 1, self.statistics.get_err_counter(self.kp4_error) + ) # send a handsake from the peer with an invalid MAC p = peer_1.mk_handshake(self.pg1, True) - p[WireguardInitiation].mac1 = b'foobar' + p[WireguardInitiation].mac1 = b"foobar" self.send_and_assert_no_replies(self.pg1, [p]) - self.assertEqual(self.base_mac6_err + 1, - self.statistics.get_err_counter(self.mac6_error)) + self.assertEqual( + self.base_mac6_err + 1, self.statistics.get_err_counter(self.mac6_error) + ) # send a handsake from the peer but signed by the wrong key. - p = peer_1.mk_handshake(self.pg1, - True, - X25519PrivateKey.generate().public_key()) + p = peer_1.mk_handshake( + self.pg1, True, X25519PrivateKey.generate().public_key() + ) self.send_and_assert_no_replies(self.pg1, [p]) - self.assertEqual(self.base_peer6_err + 1, - self.statistics.get_err_counter(self.peer6_error)) + self.assertEqual( + self.base_peer6_err + 1, self.statistics.get_err_counter(self.peer6_error) + ) # send a valid handsake init for which we expect a response p = peer_1.mk_handshake(self.pg1, True) @@ -966,25 +1036,31 @@ class TestWg(VppTestCase): # route a packet into the wg interface # this is dropped because the peer is still not initiated - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst="10.11.3.2") / - UDP(sport=555, dport=556) / - Raw()) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst="10.11.3.2") + / UDP(sport=555, dport=556) + / Raw() + ) self.send_and_assert_no_replies(self.pg0, [p]) - self.assertEqual(self.base_kp4_err + 2, - self.statistics.get_err_counter(self.kp4_error)) + self.assertEqual( + self.base_kp4_err + 2, self.statistics.get_err_counter(self.kp4_error) + ) # send a data packet from the peer through the tunnel # this completes the handshake - p = (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) / - UDP(sport=222, dport=223) / - Raw()) + p = ( + IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) + / UDP(sport=222, dport=223) + / Raw() + ) d = peer_1.encrypt_transport(p) - p = (peer_1.mk_tunnel_header(self.pg1, True) / - (Wireguard(message_type=4, reserved_zero=0) / - WireguardTransport(receiver_index=peer_1.sender, - counter=0, - encrypted_encapsulated_packet=d))) + p = peer_1.mk_tunnel_header(self.pg1, True) / ( + Wireguard(message_type=4, reserved_zero=0) + / WireguardTransport( + receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d + ) + ) rxs = self.send_and_expect(self.pg1, [p], self.pg0) for rx in rxs: @@ -992,10 +1068,12 @@ class TestWg(VppTestCase): self.assertEqual(rx[IP].ttl, 19) # send a packets that are routed into the tunnel - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst="10.11.3.2") / - UDP(sport=555, dport=556) / - Raw(b'\x00' * 80)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst="10.11.3.2") + / UDP(sport=555, dport=556) + / Raw(b"\x00" * 80) + ) rxs = self.send_and_expect(self.pg0, p * 255, self.pg1) @@ -1004,19 +1082,28 @@ class TestWg(VppTestCase): # chech the oringial packet is present self.assertEqual(rx[IP].dst, p[IP].dst) - self.assertEqual(rx[IP].ttl, p[IP].ttl-1) + self.assertEqual(rx[IP].ttl, p[IP].ttl - 1) # send packets into the tunnel, expect to receive them on # the other side - p = [(peer_1.mk_tunnel_header(self.pg1, True) / - Wireguard(message_type=4, reserved_zero=0) / - WireguardTransport( - receiver_index=peer_1.sender, - counter=ii+1, - encrypted_encapsulated_packet=peer_1.encrypt_transport( - (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) / - UDP(sport=222, dport=223) / - Raw())))) for ii in range(255)] + p = [ + ( + peer_1.mk_tunnel_header(self.pg1, True) + / Wireguard(message_type=4, reserved_zero=0) + / WireguardTransport( + receiver_index=peer_1.sender, + counter=ii + 1, + encrypted_encapsulated_packet=peer_1.encrypt_transport( + ( + IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) + / UDP(sport=222, dport=223) + / Raw() + ) + ), + ) + ) + for ii in range(255) + ] rxs = self.send_and_expect(self.pg1, p, self.pg0) @@ -1029,16 +1116,12 @@ class TestWg(VppTestCase): wg0.remove_vpp_config() def test_wg_multi_peer(self): - """ multiple peer setup """ + """multiple peer setup""" port = 12373 # Create interfaces - wg0 = VppWgInterface(self, - self.pg1.local_ip4, - port).add_vpp_config() - wg1 = VppWgInterface(self, - self.pg2.local_ip4, - port+1).add_vpp_config() + wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config() + wg1 = VppWgInterface(self, self.pg2.local_ip4, port + 1).add_vpp_config() wg0.admin_up() wg1.admin_up() @@ -1060,25 +1143,43 @@ class TestWg(VppTestCase): routes_1 = [] routes_2 = [] for i in range(NUM_PEERS): - peers_1.append(VppWgPeer(self, - wg0, - self.pg1.remote_hosts[i].ip4, - port+1+i, - ["10.0.%d.4/32" % i]).add_vpp_config()) - routes_1.append(VppIpRoute(self, "10.0.%d.4" % i, 32, - [VppRoutePath(self.pg1.remote_hosts[i].ip4, - wg0.sw_if_index)]).add_vpp_config()) - - peers_2.append(VppWgPeer(self, - wg1, - self.pg2.remote_hosts[i].ip4, - port+100+i, - ["10.100.%d.4/32" % i]).add_vpp_config()) - routes_2.append(VppIpRoute(self, "10.100.%d.4" % i, 32, - [VppRoutePath(self.pg2.remote_hosts[i].ip4, - wg1.sw_if_index)]).add_vpp_config()) - - self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS*2) + peers_1.append( + VppWgPeer( + self, + wg0, + self.pg1.remote_hosts[i].ip4, + port + 1 + i, + ["10.0.%d.4/32" % i], + ).add_vpp_config() + ) + routes_1.append( + VppIpRoute( + self, + "10.0.%d.4" % i, + 32, + [VppRoutePath(self.pg1.remote_hosts[i].ip4, wg0.sw_if_index)], + ).add_vpp_config() + ) + + peers_2.append( + VppWgPeer( + self, + wg1, + self.pg2.remote_hosts[i].ip4, + port + 100 + i, + ["10.100.%d.4/32" % i], + ).add_vpp_config() + ) + routes_2.append( + VppIpRoute( + self, + "10.100.%d.4" % i, + 32, + [VppRoutePath(self.pg2.remote_hosts[i].ip4, wg1.sw_if_index)], + ).add_vpp_config() + ) + + self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS * 2) self.logger.info(self.vapi.cli("show wireguard peer")) self.logger.info(self.vapi.cli("show wireguard interface")) @@ -1104,7 +1205,7 @@ class TestWg(VppTestCase): wg1.remove_vpp_config() def test_wg_multi_interface(self): - """ Multi-tunnel on the same port """ + """Multi-tunnel on the same port""" port = 12500 # Create many wireguard interfaces @@ -1120,21 +1221,28 @@ class TestWg(VppTestCase): wg_ifs = [] for i in range(NUM_IFS): # Use the same port for each interface - wg0 = VppWgInterface(self, - self.pg1.local_ip4, - port).add_vpp_config() + wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config() wg0.admin_up() wg0.config_ip4() wg_ifs.append(wg0) - peers.append(VppWgPeer(self, - wg0, - self.pg1.remote_hosts[i].ip4, - port+1+i, - ["10.0.%d.0/24" % i]).add_vpp_config()) - - routes.append(VppIpRoute(self, "10.0.%d.0" % i, 24, - [VppRoutePath("10.0.%d.4" % i, - wg0.sw_if_index)]).add_vpp_config()) + peers.append( + VppWgPeer( + self, + wg0, + self.pg1.remote_hosts[i].ip4, + port + 1 + i, + ["10.0.%d.0/24" % i], + ).add_vpp_config() + ) + + routes.append( + VppIpRoute( + self, + "10.0.%d.0" % i, + 24, + [VppRoutePath("10.0.%d.4" % i, wg0.sw_if_index)], + ).add_vpp_config() + ) self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_IFS) @@ -1146,16 +1254,20 @@ class TestWg(VppTestCase): # send a data packet from the peer through the tunnel # this completes the handshake - p = (IP(src="10.0.%d.4" % i, - dst=self.pg0.remote_hosts[i].ip4, ttl=20) / - UDP(sport=222, dport=223) / - Raw()) + p = ( + IP(src="10.0.%d.4" % i, dst=self.pg0.remote_hosts[i].ip4, ttl=20) + / UDP(sport=222, dport=223) + / Raw() + ) d = peers[i].encrypt_transport(p) - p = (peers[i].mk_tunnel_header(self.pg1) / - (Wireguard(message_type=4, reserved_zero=0) / - WireguardTransport(receiver_index=peers[i].sender, - counter=0, - encrypted_encapsulated_packet=d))) + p = peers[i].mk_tunnel_header(self.pg1) / ( + Wireguard(message_type=4, reserved_zero=0) + / WireguardTransport( + receiver_index=peers[i].sender, + counter=0, + encrypted_encapsulated_packet=d, + ) + ) rxs = self.send_and_expect(self.pg1, [p], self.pg0) for rx in rxs: self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[i].ip4) @@ -1163,10 +1275,12 @@ class TestWg(VppTestCase): # send a packets that are routed into the tunnel for i in range(NUM_IFS): - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_hosts[i].ip4, dst="10.0.%d.4" % i) / - UDP(sport=555, dport=556) / - Raw(b'\x00' * 80)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_hosts[i].ip4, dst="10.0.%d.4" % i) + / UDP(sport=555, dport=556) + / Raw(b"\x00" * 80) + ) rxs = self.send_and_expect(self.pg0, p * 64, self.pg1) @@ -1175,20 +1289,32 @@ class TestWg(VppTestCase): # check the oringial packet is present self.assertEqual(rx[IP].dst, p[IP].dst) - self.assertEqual(rx[IP].ttl, p[IP].ttl-1) + self.assertEqual(rx[IP].ttl, p[IP].ttl - 1) # send packets into the tunnel for i in range(NUM_IFS): - p = [(peers[i].mk_tunnel_header(self.pg1) / - Wireguard(message_type=4, reserved_zero=0) / - WireguardTransport( - receiver_index=peers[i].sender, - counter=ii+1, - encrypted_encapsulated_packet=peers[i].encrypt_transport( - (IP(src="10.0.%d.4" % i, - dst=self.pg0.remote_hosts[i].ip4, ttl=20) / - UDP(sport=222, dport=223) / - Raw())))) for ii in range(64)] + p = [ + ( + peers[i].mk_tunnel_header(self.pg1) + / Wireguard(message_type=4, reserved_zero=0) + / WireguardTransport( + receiver_index=peers[i].sender, + counter=ii + 1, + encrypted_encapsulated_packet=peers[i].encrypt_transport( + ( + IP( + src="10.0.%d.4" % i, + dst=self.pg0.remote_hosts[i].ip4, + ttl=20, + ) + / UDP(sport=222, dport=223) + / Raw() + ) + ), + ) + ) + for ii in range(64) + ] rxs = self.send_and_expect(self.pg1, p, self.pg0) @@ -1204,22 +1330,16 @@ class TestWg(VppTestCase): i.remove_vpp_config() def test_wg_event(self): - """ Test events """ + """Test events""" port = 12600 - ESTABLISHED_FLAG = VppEnum.\ - vl_api_wireguard_peer_flags_t.\ - WIREGUARD_PEER_ESTABLISHED - DEAD_FLAG = VppEnum.\ - vl_api_wireguard_peer_flags_t.\ - WIREGUARD_PEER_STATUS_DEAD + ESTABLISHED_FLAG = ( + VppEnum.vl_api_wireguard_peer_flags_t.WIREGUARD_PEER_ESTABLISHED + ) + DEAD_FLAG = VppEnum.vl_api_wireguard_peer_flags_t.WIREGUARD_PEER_STATUS_DEAD # Create interfaces - wg0 = VppWgInterface(self, - self.pg1.local_ip4, - port).add_vpp_config() - wg1 = VppWgInterface(self, - self.pg2.local_ip4, - port+1).add_vpp_config() + wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config() + wg1 = VppWgInterface(self, self.pg2.local_ip4, port + 1).add_vpp_config() wg0.admin_up() wg1.admin_up() @@ -1241,25 +1361,43 @@ class TestWg(VppTestCase): routes_0 = [] routes_1 = [] for i in range(NUM_PEERS): - peers_0.append(VppWgPeer(self, - wg0, - self.pg1.remote_hosts[i].ip4, - port+1+i, - ["10.0.%d.4/32" % i]).add_vpp_config()) - routes_0.append(VppIpRoute(self, "10.0.%d.4" % i, 32, - [VppRoutePath(self.pg1.remote_hosts[i].ip4, - wg0.sw_if_index)]).add_vpp_config()) - - peers_1.append(VppWgPeer(self, - wg1, - self.pg2.remote_hosts[i].ip4, - port+100+i, - ["10.100.%d.4/32" % i]).add_vpp_config()) - routes_1.append(VppIpRoute(self, "10.100.%d.4" % i, 32, - [VppRoutePath(self.pg2.remote_hosts[i].ip4, - wg1.sw_if_index)]).add_vpp_config()) - - self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS*2) + peers_0.append( + VppWgPeer( + self, + wg0, + self.pg1.remote_hosts[i].ip4, + port + 1 + i, + ["10.0.%d.4/32" % i], + ).add_vpp_config() + ) + routes_0.append( + VppIpRoute( + self, + "10.0.%d.4" % i, + 32, + [VppRoutePath(self.pg1.remote_hosts[i].ip4, wg0.sw_if_index)], + ).add_vpp_config() + ) + + peers_1.append( + VppWgPeer( + self, + wg1, + self.pg2.remote_hosts[i].ip4, + port + 100 + i, + ["10.100.%d.4/32" % i], + ).add_vpp_config() + ) + routes_1.append( + VppIpRoute( + self, + "10.100.%d.4" % i, + 32, + [VppRoutePath(self.pg2.remote_hosts[i].ip4, wg1.sw_if_index)], + ).add_vpp_config() + ) + + self.assertEqual(len(self.vapi.wireguard_peers_dump()), NUM_PEERS * 2) # Want events from the first perr of wg0 # and from all wg1 peers @@ -1271,16 +1409,14 @@ class TestWg(VppTestCase): p = peers_0[i].mk_handshake(self.pg1) rx = self.send_and_expect(self.pg1, [p], self.pg1) peers_0[i].consume_response(rx[0]) - if (i == 0): + if i == 0: peers_0[0].wait_event(ESTABLISHED_FLAG) p = peers_1[i].mk_handshake(self.pg2) rx = self.send_and_expect(self.pg2, [p], self.pg2) peers_1[i].consume_response(rx[0]) - wg1.wait_events( - ESTABLISHED_FLAG, - [peers_1[0].index, peers_1[1].index]) + wg1.wait_events(ESTABLISHED_FLAG, [peers_1[0].index, peers_1[1].index]) # remove routes for r in routes_0: @@ -1292,7 +1428,7 @@ class TestWg(VppTestCase): for i in range(NUM_PEERS): self.assertTrue(peers_0[i].query_vpp_config()) peers_0[i].remove_vpp_config() - if (i == 0): + if i == 0: peers_0[i].wait_event(0) peers_0[i].wait_event(DEAD_FLAG) for p in peers_1: @@ -1306,32 +1442,28 @@ class TestWg(VppTestCase): class WireguardHandoffTests(TestWg): - """ Wireguard Tests in multi worker setup """ + """Wireguard Tests in multi worker setup""" + vpp_worker_count = 2 def test_wg_peer_init(self): - """ Handoff """ + """Handoff""" port = 12383 # Create interfaces - wg0 = VppWgInterface(self, - self.pg1.local_ip4, - port).add_vpp_config() + wg0 = VppWgInterface(self, self.pg1.local_ip4, port).add_vpp_config() wg0.admin_up() wg0.config_ip4() - peer_1 = VppWgPeer(self, - wg0, - self.pg1.remote_ip4, - port+1, - ["10.11.2.0/24", - "10.11.3.0/24"]).add_vpp_config() + peer_1 = VppWgPeer( + self, wg0, self.pg1.remote_ip4, port + 1, ["10.11.2.0/24", "10.11.3.0/24"] + ).add_vpp_config() self.assertEqual(len(self.vapi.wireguard_peers_dump()), 1) - r1 = VppIpRoute(self, "10.11.3.0", 24, - [VppRoutePath("10.11.3.1", - wg0.sw_if_index)]).add_vpp_config() + r1 = VppIpRoute( + self, "10.11.3.0", 24, [VppRoutePath("10.11.3.1", wg0.sw_if_index)] + ).add_vpp_config() # send a valid handsake init for which we expect a response p = peer_1.mk_handshake(self.pg1) @@ -1342,17 +1474,19 @@ class WireguardHandoffTests(TestWg): # send a data packet from the peer through the tunnel # this completes the handshake and pins the peer to worker 0 - p = (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) / - UDP(sport=222, dport=223) / - Raw()) + p = ( + IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) + / UDP(sport=222, dport=223) + / Raw() + ) d = peer_1.encrypt_transport(p) - p = (peer_1.mk_tunnel_header(self.pg1) / - (Wireguard(message_type=4, reserved_zero=0) / - WireguardTransport(receiver_index=peer_1.sender, - counter=0, - encrypted_encapsulated_packet=d))) - rxs = self.send_and_expect(self.pg1, [p], self.pg0, - worker=0) + p = peer_1.mk_tunnel_header(self.pg1) / ( + Wireguard(message_type=4, reserved_zero=0) + / WireguardTransport( + receiver_index=peer_1.sender, counter=0, encrypted_encapsulated_packet=d + ) + ) + rxs = self.send_and_expect(self.pg1, [p], self.pg0, worker=0) for rx in rxs: self.assertEqual(rx[IP].dst, self.pg0.remote_ip4) @@ -1360,23 +1494,34 @@ class WireguardHandoffTests(TestWg): # send a packets that are routed into the tunnel # and pins the peer tp worker 1 - pe = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst="10.11.3.2") / - UDP(sport=555, dport=556) / - Raw(b'\x00' * 80)) + pe = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst="10.11.3.2") + / UDP(sport=555, dport=556) + / Raw(b"\x00" * 80) + ) rxs = self.send_and_expect(self.pg0, pe * 255, self.pg1, worker=1) peer_1.validate_encapped(rxs, pe) # send packets into the tunnel, from the other worker - p = [(peer_1.mk_tunnel_header(self.pg1) / - Wireguard(message_type=4, reserved_zero=0) / - WireguardTransport( + p = [ + ( + peer_1.mk_tunnel_header(self.pg1) + / Wireguard(message_type=4, reserved_zero=0) + / WireguardTransport( receiver_index=peer_1.sender, - counter=ii+1, + counter=ii + 1, encrypted_encapsulated_packet=peer_1.encrypt_transport( - (IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) / - UDP(sport=222, dport=223) / - Raw())))) for ii in range(255)] + ( + IP(src="10.11.3.1", dst=self.pg0.remote_ip4, ttl=20) + / UDP(sport=222, dport=223) + / Raw() + ) + ), + ) + ) + for ii in range(255) + ] rxs = self.send_and_expect(self.pg1, p, self.pg0, worker=1) @@ -1396,4 +1541,4 @@ class WireguardHandoffTests(TestWg): @unittest.skip("test disabled") def test_wg_multi_interface(self): - """ Multi-tunnel on the same port """ + """Multi-tunnel on the same port""" |