diff options
author | Klement Sekera <klement.sekera@gmail.com> | 2022-04-26 19:02:15 +0200 |
---|---|---|
committer | Ole Tr�an <otroan@employees.org> | 2022-05-10 18:52:08 +0000 |
commit | d9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch) | |
tree | 4f786cfd8ebc2443cb11e11b74c8657204068898 /test/test_ipsec_nat.py | |
parent | f90348bcb4afd0af2611cefc43b17ef3042b511c (diff) |
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is
much faster, stable PEP8 compliant code style checker offering also
automatic formatting. It aims to be very stable and produce smallest
diffs. It's used by many small and big projects.
Running checkstyle with black takes a few seconds with a terse output.
Thus, test-checkstyle-diff is no longer necessary.
Expand scope of checkstyle to all python files in the repo, replacing
test-checkstyle with checkstyle-python.
Also, fixstyle-python is now available for automatic style formatting.
Note: python virtualenv has been consolidated in test/Makefile,
test/requirements*.txt which will eventually be moved to a central
location. This is required to simply the automated generation of
docker executor images in the CI.
Type: improvement
Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/test_ipsec_nat.py')
-rw-r--r-- | test/test_ipsec_nat.py | 334 |
1 files changed, 204 insertions, 130 deletions
diff --git a/test/test_ipsec_nat.py b/test/test_ipsec_nat.py index b7ccb2befdc..64a2725dda6 100644 --- a/test/test_ipsec_nat.py +++ b/test/test_ipsec_nat.py @@ -9,15 +9,14 @@ from scapy.layers.ipsec import SecurityAssociation, ESP from util import ppp, ppc from template_ipsec import TemplateIpsec -from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\ - VppIpsecSpdItfBinding +from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSpdItfBinding from vpp_ip_route import VppIpRoute, VppRoutePath from vpp_ip import DpoProto from vpp_papi import VppEnum class IPSecNATTestCase(TemplateIpsec): - """ IPSec/NAT + """IPSec/NAT TUNNEL MODE:: @@ -53,18 +52,19 @@ class IPSecNATTestCase(TemplateIpsec): self.tun_spd = VppIpsecSpd(self, self.tun_spd_id) self.tun_spd.add_vpp_config() - VppIpsecSpdItfBinding(self, self.tun_spd, - self.tun_if).add_vpp_config() + VppIpsecSpdItfBinding(self, self.tun_spd, self.tun_if).add_vpp_config() p = self.ipv4_params self.config_esp_tun(p) self.logger.info(self.vapi.ppcli("show ipsec all")) d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4 - VppIpRoute(self, p.remote_tun_if_host, p.addr_len, - [VppRoutePath(self.tun_if.remote_addr[p.addr_type], - 0xffffffff, - proto=d)]).add_vpp_config() + VppIpRoute( + self, + p.remote_tun_if_host, + p.addr_len, + [VppRoutePath(self.tun_if.remote_addr[p.addr_type], 0xFFFFFFFF, proto=d)], + ).add_vpp_config() def tearDown(self): super(IPSecNATTestCase, self).tearDown() @@ -72,65 +72,84 @@ class IPSecNATTestCase(TemplateIpsec): def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip): return [ # TCP - Ether(src=src_mac, dst=dst_mac) / - IP(src=src_ip, dst=dst_ip) / - TCP(sport=self.tcp_port_in, dport=20), + Ether(src=src_mac, dst=dst_mac) + / IP(src=src_ip, dst=dst_ip) + / TCP(sport=self.tcp_port_in, dport=20), # UDP - Ether(src=src_mac, dst=dst_mac) / - IP(src=src_ip, dst=dst_ip) / - UDP(sport=self.udp_port_in, dport=20), + Ether(src=src_mac, dst=dst_mac) + / IP(src=src_ip, dst=dst_ip) + / UDP(sport=self.udp_port_in, dport=20), # ICMP - Ether(src=src_mac, dst=dst_mac) / - IP(src=src_ip, dst=dst_ip) / - ICMP(id=self.icmp_id_in, type='echo-request') + Ether(src=src_mac, dst=dst_mac) + / IP(src=src_ip, dst=dst_ip) + / ICMP(id=self.icmp_id_in, type="echo-request"), ] def create_stream_encrypted(self, src_mac, dst_mac, src_ip, dst_ip, sa): return [ # TCP - Ether(src=src_mac, dst=dst_mac) / - sa.encrypt(IP(src=src_ip, dst=dst_ip) / - TCP(dport=self.tcp_port_out, sport=20)), + Ether(src=src_mac, dst=dst_mac) + / sa.encrypt( + IP(src=src_ip, dst=dst_ip) / TCP(dport=self.tcp_port_out, sport=20) + ), # UDP - Ether(src=src_mac, dst=dst_mac) / - sa.encrypt(IP(src=src_ip, dst=dst_ip) / - UDP(dport=self.udp_port_out, sport=20)), + Ether(src=src_mac, dst=dst_mac) + / sa.encrypt( + IP(src=src_ip, dst=dst_ip) / UDP(dport=self.udp_port_out, sport=20) + ), # ICMP - Ether(src=src_mac, dst=dst_mac) / - sa.encrypt(IP(src=src_ip, dst=dst_ip) / - ICMP(id=self.icmp_id_out, type='echo-request')) + Ether(src=src_mac, dst=dst_mac) + / sa.encrypt( + IP(src=src_ip, dst=dst_ip) + / ICMP(id=self.icmp_id_out, type="echo-request") + ), ] def verify_capture_plain(self, capture): for packet in capture: try: self.assert_packet_checksums_valid(packet) - self.assert_equal(packet[IP].src, self.tun_if.remote_ip4, - "decrypted packet source address") - self.assert_equal(packet[IP].dst, self.pg1.remote_ip4, - "decrypted packet destination address") + self.assert_equal( + packet[IP].src, + self.tun_if.remote_ip4, + "decrypted packet source address", + ) + self.assert_equal( + packet[IP].dst, + self.pg1.remote_ip4, + "decrypted packet destination address", + ) if packet.haslayer(TCP): self.assertFalse( packet.haslayer(UDP), - "unexpected UDP header in decrypted packet") - self.assert_equal(packet[TCP].dport, self.tcp_port_in, - "decrypted packet TCP destination port") + "unexpected UDP header in decrypted packet", + ) + self.assert_equal( + packet[TCP].dport, + self.tcp_port_in, + "decrypted packet TCP destination port", + ) elif packet.haslayer(UDP): if packet[UDP].payload: self.assertFalse( packet[UDP][1].haslayer(UDP), - "unexpected UDP header in decrypted packet") - self.assert_equal(packet[UDP].dport, self.udp_port_in, - "decrypted packet UDP destination port") + "unexpected UDP header in decrypted packet", + ) + self.assert_equal( + packet[UDP].dport, + self.udp_port_in, + "decrypted packet UDP destination port", + ) else: self.assertFalse( packet.haslayer(UDP), - "unexpected UDP header in decrypted packet") - self.assert_equal(packet[ICMP].id, self.icmp_id_in, - "decrypted packet ICMP ID") + "unexpected UDP header in decrypted packet", + ) + self.assert_equal( + packet[ICMP].id, self.icmp_id_in, "decrypted packet ICMP ID" + ) except Exception: - self.logger.error( - ppp("Unexpected or invalid plain packet:", packet)) + self.logger.error(ppp("Unexpected or invalid plain packet:", packet)) raise def verify_capture_encrypted(self, capture, sa): @@ -139,19 +158,25 @@ class IPSecNATTestCase(TemplateIpsec): copy = packet.__class__(scapy.compat.raw(packet)) del copy[UDP].len copy = packet.__class__(scapy.compat.raw(copy)) - self.assert_equal(packet[UDP].len, copy[UDP].len, - "UDP header length") + self.assert_equal(packet[UDP].len, copy[UDP].len, "UDP header length") self.assert_packet_checksums_valid(packet) self.assertIn(ESP, packet[IP]) decrypt_pkt = sa.decrypt(packet[IP]) self.assert_packet_checksums_valid(decrypt_pkt) - self.assert_equal(decrypt_pkt[IP].src, self.pg1.remote_ip4, - "encrypted packet source address") - self.assert_equal(decrypt_pkt[IP].dst, self.tun_if.remote_ip4, - "encrypted packet destination address") + self.assert_equal( + decrypt_pkt[IP].src, + self.pg1.remote_ip4, + "encrypted packet source address", + ) + self.assert_equal( + decrypt_pkt[IP].dst, + self.tun_if.remote_ip4, + "encrypted packet destination address", + ) except Exception: self.logger.error( - ppp("Unexpected or invalid encrypted packet:", packet)) + ppp("Unexpected or invalid encrypted packet:", packet) + ) raise def config_esp_tun(self, params): @@ -166,104 +191,153 @@ class IPSecNATTestCase(TemplateIpsec): crypt_key = params.crypt_key addr_any = params.addr_any addr_bcast = params.addr_bcast - flags = (VppEnum.vl_api_ipsec_sad_flags_t. - IPSEC_API_SAD_FLAG_UDP_ENCAP) + flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP e = VppEnum.vl_api_ipsec_spd_action_t - VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - self.vpp_esp_protocol, - self.pg1.remote_addr[addr_type], - self.tun_if.remote_addr[addr_type], - flags=flags).add_vpp_config() - VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - self.vpp_esp_protocol, - self.tun_if.remote_addr[addr_type], - self.pg1.remote_addr[addr_type], - flags=flags).add_vpp_config() + VppIpsecSA( + self, + scapy_tun_sa_id, + scapy_tun_spi, + auth_algo_vpp_id, + auth_key, + crypt_algo_vpp_id, + crypt_key, + self.vpp_esp_protocol, + self.pg1.remote_addr[addr_type], + self.tun_if.remote_addr[addr_type], + flags=flags, + ).add_vpp_config() + VppIpsecSA( + self, + vpp_tun_sa_id, + vpp_tun_spi, + auth_algo_vpp_id, + auth_key, + crypt_algo_vpp_id, + crypt_key, + self.vpp_esp_protocol, + self.tun_if.remote_addr[addr_type], + self.pg1.remote_addr[addr_type], + flags=flags, + ).add_vpp_config() - VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id, - addr_any, addr_bcast, - addr_any, addr_bcast, - socket.IPPROTO_ESP).add_vpp_config() - VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id, - addr_any, addr_bcast, - addr_any, addr_bcast, - socket.IPPROTO_ESP, - is_outbound=0).add_vpp_config() - VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id, - addr_any, addr_bcast, - addr_any, addr_bcast, - socket.IPPROTO_UDP, - remote_port_start=4500, - remote_port_stop=4500).add_vpp_config() - VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id, - addr_any, addr_bcast, - addr_any, addr_bcast, - socket.IPPROTO_UDP, - remote_port_start=4500, - remote_port_stop=4500, - is_outbound=0).add_vpp_config() - VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id, - self.tun_if.remote_addr[addr_type], - self.tun_if.remote_addr[addr_type], - self.pg1.remote_addr[addr_type], - self.pg1.remote_addr[addr_type], - 0, priority=10, - policy=e.IPSEC_API_SPD_ACTION_PROTECT, - is_outbound=0).add_vpp_config() - VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id, - self.pg1.remote_addr[addr_type], - self.pg1.remote_addr[addr_type], - self.tun_if.remote_addr[addr_type], - self.tun_if.remote_addr[addr_type], - 0, policy=e.IPSEC_API_SPD_ACTION_PROTECT, - priority=10).add_vpp_config() + VppIpsecSpdEntry( + self, + self.tun_spd, + scapy_tun_sa_id, + addr_any, + addr_bcast, + addr_any, + addr_bcast, + socket.IPPROTO_ESP, + ).add_vpp_config() + VppIpsecSpdEntry( + self, + self.tun_spd, + scapy_tun_sa_id, + addr_any, + addr_bcast, + addr_any, + addr_bcast, + socket.IPPROTO_ESP, + is_outbound=0, + ).add_vpp_config() + VppIpsecSpdEntry( + self, + self.tun_spd, + scapy_tun_sa_id, + addr_any, + addr_bcast, + addr_any, + addr_bcast, + socket.IPPROTO_UDP, + remote_port_start=4500, + remote_port_stop=4500, + ).add_vpp_config() + VppIpsecSpdEntry( + self, + self.tun_spd, + scapy_tun_sa_id, + addr_any, + addr_bcast, + addr_any, + addr_bcast, + socket.IPPROTO_UDP, + remote_port_start=4500, + remote_port_stop=4500, + is_outbound=0, + ).add_vpp_config() + VppIpsecSpdEntry( + self, + self.tun_spd, + vpp_tun_sa_id, + self.tun_if.remote_addr[addr_type], + self.tun_if.remote_addr[addr_type], + self.pg1.remote_addr[addr_type], + self.pg1.remote_addr[addr_type], + 0, + priority=10, + policy=e.IPSEC_API_SPD_ACTION_PROTECT, + is_outbound=0, + ).add_vpp_config() + VppIpsecSpdEntry( + self, + self.tun_spd, + scapy_tun_sa_id, + self.pg1.remote_addr[addr_type], + self.pg1.remote_addr[addr_type], + self.tun_if.remote_addr[addr_type], + self.tun_if.remote_addr[addr_type], + 0, + policy=e.IPSEC_API_SPD_ACTION_PROTECT, + priority=10, + ).add_vpp_config() def test_ipsec_nat_tun(self): - """ IPSec/NAT tunnel test case """ + """IPSec/NAT tunnel test case""" p = self.ipv4_params - scapy_tun_sa = SecurityAssociation(ESP, spi=p.scapy_tun_spi, - crypt_algo=p.crypt_algo, - crypt_key=p.crypt_key, - auth_algo=p.auth_algo, - auth_key=p.auth_key, - tunnel_header=IP( - src=self.pg1.remote_ip4, - dst=self.tun_if.remote_ip4), - nat_t_header=UDP( - sport=4500, - dport=4500)) + scapy_tun_sa = SecurityAssociation( + ESP, + spi=p.scapy_tun_spi, + crypt_algo=p.crypt_algo, + crypt_key=p.crypt_key, + auth_algo=p.auth_algo, + auth_key=p.auth_key, + tunnel_header=IP(src=self.pg1.remote_ip4, dst=self.tun_if.remote_ip4), + nat_t_header=UDP(sport=4500, dport=4500), + ) # in2out - from private network to public pkts = self.create_stream_plain( - self.pg1.remote_mac, self.pg1.local_mac, - self.pg1.remote_ip4, self.tun_if.remote_ip4) + self.pg1.remote_mac, + self.pg1.local_mac, + self.pg1.remote_ip4, + self.tun_if.remote_ip4, + ) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.tun_if.get_capture(len(pkts)) self.verify_capture_encrypted(capture, scapy_tun_sa) - vpp_tun_sa = SecurityAssociation(ESP, - spi=p.vpp_tun_spi, - crypt_algo=p.crypt_algo, - crypt_key=p.crypt_key, - auth_algo=p.auth_algo, - auth_key=p.auth_key, - tunnel_header=IP( - src=self.tun_if.remote_ip4, - dst=self.pg1.remote_ip4), - nat_t_header=UDP( - sport=4500, - dport=4500)) + vpp_tun_sa = SecurityAssociation( + ESP, + spi=p.vpp_tun_spi, + crypt_algo=p.crypt_algo, + crypt_key=p.crypt_key, + auth_algo=p.auth_algo, + auth_key=p.auth_key, + tunnel_header=IP(src=self.tun_if.remote_ip4, dst=self.pg1.remote_ip4), + nat_t_header=UDP(sport=4500, dport=4500), + ) # out2in - from public network to private pkts = self.create_stream_encrypted( - self.tun_if.remote_mac, self.tun_if.local_mac, - self.tun_if.remote_ip4, self.pg1.remote_ip4, vpp_tun_sa) + self.tun_if.remote_mac, + self.tun_if.local_mac, + self.tun_if.remote_ip4, + self.pg1.remote_ip4, + vpp_tun_sa, + ) self.logger.info(ppc("Sending packets:", pkts)) self.tun_if.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) |