From d9b0c6fbf7aa5bd9af84264105b39c82028a4a29 Mon Sep 17 00:00:00 2001 From: Klement Sekera Date: Tue, 26 Apr 2022 19:02:15 +0200 Subject: tests: replace pycodestyle with black Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera Signed-off-by: Dave Wallace --- test/test_ipsec_ah.py | 453 +++++++++++++++++++++++++++++++------------------- 1 file changed, 284 insertions(+), 169 deletions(-) (limited to 'test/test_ipsec_ah.py') diff --git a/test/test_ipsec_ah.py b/test/test_ipsec_ah.py index 8f8b2bf1550..190bde78f56 100644 --- a/test/test_ipsec_ah.py +++ b/test/test_ipsec_ah.py @@ -8,13 +8,23 @@ from scapy.layers.l2 import Ether from scapy.packet import Raw from framework import VppTestRunner -from template_ipsec import TemplateIpsec, IpsecTra46Tests, IpsecTun46Tests, \ - config_tun_params, config_tra_params, IPsecIPv4Params, IPsecIPv6Params, \ - IpsecTra4, IpsecTun4, IpsecTra6, IpsecTun6, \ - IpsecTun6HandoffTests, IpsecTun4HandoffTests +from template_ipsec import ( + TemplateIpsec, + IpsecTra46Tests, + IpsecTun46Tests, + config_tun_params, + config_tra_params, + IPsecIPv4Params, + IPsecIPv6Params, + IpsecTra4, + IpsecTun4, + IpsecTra6, + IpsecTun6, + IpsecTun6HandoffTests, + IpsecTun4HandoffTests, +) from template_ipsec import IpsecTcpTests -from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\ - VppIpsecSpdItfBinding +from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSpdItfBinding from vpp_ip_route import VppIpRoute, VppRoutePath from vpp_ip import DpoProto from vpp_papi import VppEnum @@ -41,6 +51,7 @@ class ConfigIpsecAH(TemplateIpsec): --- --- --- """ + encryption_type = AH net_objs = [] tra4_encrypt_node_name = "ah4-encrypt" @@ -79,13 +90,11 @@ class ConfigIpsecAH(TemplateIpsec): self.tun_spd.add_vpp_config() self.net_objs.append(self.tun_spd) - b = VppIpsecSpdItfBinding(self, self.tra_spd, - self.tra_if) + b = VppIpsecSpdItfBinding(self, self.tra_spd, self.tra_if) b.add_vpp_config() self.net_objs.append(b) - b = VppIpsecSpdItfBinding(self, self.tun_spd, - self.tun_if) + b = VppIpsecSpdItfBinding(self, self.tun_spd, self.tun_if) b.add_vpp_config() self.net_objs.append(b) @@ -97,10 +106,16 @@ class ConfigIpsecAH(TemplateIpsec): config_tun_params(p, self.encryption_type, self.tun_if) for p in params: d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4 - r = VppIpRoute(self, p.remote_tun_if_host, p.addr_len, - [VppRoutePath(self.tun_if.remote_addr[p.addr_type], - 0xffffffff, - proto=d)]) + r = VppIpRoute( + self, + p.remote_tun_if_host, + p.addr_len, + [ + VppRoutePath( + self.tun_if.remote_addr[p.addr_type], 0xFFFFFFFF, proto=d + ) + ], + ) r.add_vpp_config() self.net_objs.append(r) self.logger.info(self.vapi.ppcli("show ipsec all")) @@ -130,74 +145,116 @@ class ConfigIpsecAH(TemplateIpsec): params.outer_hop_limit = 253 params.outer_flow_label = 0x12345 - params.tun_sa_in = VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - self.vpp_ah_protocol, - self.tun_if.local_addr[addr_type], - self.tun_if.remote_addr[addr_type], - tun_flags=tun_flags, - flags=flags, - dscp=params.dscp) - - params.tun_sa_out = VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - self.vpp_ah_protocol, - self.tun_if.remote_addr[addr_type], - self.tun_if.local_addr[addr_type], - tun_flags=tun_flags, - flags=flags, - dscp=params.dscp) + params.tun_sa_in = VppIpsecSA( + self, + scapy_tun_sa_id, + scapy_tun_spi, + auth_algo_vpp_id, + auth_key, + crypt_algo_vpp_id, + crypt_key, + self.vpp_ah_protocol, + self.tun_if.local_addr[addr_type], + self.tun_if.remote_addr[addr_type], + tun_flags=tun_flags, + flags=flags, + dscp=params.dscp, + ) + + params.tun_sa_out = VppIpsecSA( + self, + vpp_tun_sa_id, + vpp_tun_spi, + auth_algo_vpp_id, + auth_key, + crypt_algo_vpp_id, + crypt_key, + self.vpp_ah_protocol, + self.tun_if.remote_addr[addr_type], + self.tun_if.local_addr[addr_type], + tun_flags=tun_flags, + flags=flags, + dscp=params.dscp, + ) objs.append(params.tun_sa_in) objs.append(params.tun_sa_out) - params.spd_policy_in_any = VppIpsecSpdEntry(self, self.tun_spd, - vpp_tun_sa_id, - addr_any, addr_bcast, - addr_any, addr_bcast, - socket.IPPROTO_AH) - params.spd_policy_out_any = VppIpsecSpdEntry(self, self.tun_spd, - vpp_tun_sa_id, - addr_any, addr_bcast, - addr_any, addr_bcast, - socket.IPPROTO_AH, - is_outbound=0) + params.spd_policy_in_any = VppIpsecSpdEntry( + self, + self.tun_spd, + vpp_tun_sa_id, + addr_any, + addr_bcast, + addr_any, + addr_bcast, + socket.IPPROTO_AH, + ) + params.spd_policy_out_any = VppIpsecSpdEntry( + self, + self.tun_spd, + vpp_tun_sa_id, + addr_any, + addr_bcast, + addr_any, + addr_bcast, + socket.IPPROTO_AH, + is_outbound=0, + ) objs.append(params.spd_policy_out_any) objs.append(params.spd_policy_in_any) - e1 = VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id, - remote_tun_if_host, - remote_tun_if_host, - self.pg1.remote_addr[addr_type], - self.pg1.remote_addr[addr_type], - 0, priority=10, - policy=e.IPSEC_API_SPD_ACTION_PROTECT, - is_outbound=0) - e2 = VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id, - self.pg1.remote_addr[addr_type], - self.pg1.remote_addr[addr_type], - remote_tun_if_host, - remote_tun_if_host, - 0, policy=e.IPSEC_API_SPD_ACTION_PROTECT, - priority=10) - e3 = VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id, - remote_tun_if_host, - remote_tun_if_host, - self.pg0.local_addr[addr_type], - self.pg0.local_addr[addr_type], - 0, priority=20, - policy=e.IPSEC_API_SPD_ACTION_PROTECT, - is_outbound=0) - e4 = VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id, - self.pg0.local_addr[addr_type], - self.pg0.local_addr[addr_type], - remote_tun_if_host, - remote_tun_if_host, - 0, policy=e.IPSEC_API_SPD_ACTION_PROTECT, - priority=20) + e1 = VppIpsecSpdEntry( + self, + self.tun_spd, + vpp_tun_sa_id, + remote_tun_if_host, + remote_tun_if_host, + self.pg1.remote_addr[addr_type], + self.pg1.remote_addr[addr_type], + 0, + priority=10, + policy=e.IPSEC_API_SPD_ACTION_PROTECT, + is_outbound=0, + ) + e2 = VppIpsecSpdEntry( + self, + self.tun_spd, + scapy_tun_sa_id, + self.pg1.remote_addr[addr_type], + self.pg1.remote_addr[addr_type], + remote_tun_if_host, + remote_tun_if_host, + 0, + policy=e.IPSEC_API_SPD_ACTION_PROTECT, + priority=10, + ) + e3 = VppIpsecSpdEntry( + self, + self.tun_spd, + vpp_tun_sa_id, + remote_tun_if_host, + remote_tun_if_host, + self.pg0.local_addr[addr_type], + self.pg0.local_addr[addr_type], + 0, + priority=20, + policy=e.IPSEC_API_SPD_ACTION_PROTECT, + is_outbound=0, + ) + e4 = VppIpsecSpdEntry( + self, + self.tun_spd, + scapy_tun_sa_id, + self.pg0.local_addr[addr_type], + self.pg0.local_addr[addr_type], + remote_tun_if_host, + remote_tun_if_host, + 0, + policy=e.IPSEC_API_SPD_ACTION_PROTECT, + priority=20, + ) objs = objs + [e1, e2, e3, e4] @@ -218,49 +275,92 @@ class ConfigIpsecAH(TemplateIpsec): crypt_key = params.crypt_key addr_any = params.addr_any addr_bcast = params.addr_bcast - flags = params.flags | (VppEnum.vl_api_ipsec_sad_flags_t. - IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY) + flags = params.flags | ( + VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY + ) e = VppEnum.vl_api_ipsec_spd_action_t objs = [] - params.tra_sa_in = VppIpsecSA(self, scapy_tra_sa_id, scapy_tra_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - self.vpp_ah_protocol, - flags=flags) - params.tra_sa_out = VppIpsecSA(self, vpp_tra_sa_id, vpp_tra_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - self.vpp_ah_protocol, - flags=flags) + params.tra_sa_in = VppIpsecSA( + self, + scapy_tra_sa_id, + scapy_tra_spi, + auth_algo_vpp_id, + auth_key, + crypt_algo_vpp_id, + crypt_key, + self.vpp_ah_protocol, + flags=flags, + ) + params.tra_sa_out = VppIpsecSA( + self, + vpp_tra_sa_id, + vpp_tra_spi, + auth_algo_vpp_id, + auth_key, + crypt_algo_vpp_id, + crypt_key, + self.vpp_ah_protocol, + flags=flags, + ) objs.append(params.tra_sa_in) objs.append(params.tra_sa_out) - objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id, - addr_any, addr_bcast, - addr_any, addr_bcast, - socket.IPPROTO_AH)) - objs.append(VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id, - addr_any, addr_bcast, - addr_any, addr_bcast, - socket.IPPROTO_AH, - is_outbound=0)) - objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id, - self.tra_if.local_addr[addr_type], - self.tra_if.local_addr[addr_type], - self.tra_if.remote_addr[addr_type], - self.tra_if.remote_addr[addr_type], - 0, priority=10, - policy=e.IPSEC_API_SPD_ACTION_PROTECT, - is_outbound=0)) - objs.append(VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id, - self.tra_if.local_addr[addr_type], - self.tra_if.local_addr[addr_type], - self.tra_if.remote_addr[addr_type], - self.tra_if.remote_addr[addr_type], - 0, policy=e.IPSEC_API_SPD_ACTION_PROTECT, - priority=10)) + objs.append( + VppIpsecSpdEntry( + self, + self.tra_spd, + vpp_tra_sa_id, + addr_any, + addr_bcast, + addr_any, + addr_bcast, + socket.IPPROTO_AH, + ) + ) + objs.append( + VppIpsecSpdEntry( + self, + self.tra_spd, + scapy_tra_sa_id, + addr_any, + addr_bcast, + addr_any, + addr_bcast, + socket.IPPROTO_AH, + is_outbound=0, + ) + ) + objs.append( + VppIpsecSpdEntry( + self, + self.tra_spd, + vpp_tra_sa_id, + self.tra_if.local_addr[addr_type], + self.tra_if.local_addr[addr_type], + self.tra_if.remote_addr[addr_type], + self.tra_if.remote_addr[addr_type], + 0, + priority=10, + policy=e.IPSEC_API_SPD_ACTION_PROTECT, + is_outbound=0, + ) + ) + objs.append( + VppIpsecSpdEntry( + self, + self.tra_spd, + scapy_tra_sa_id, + self.tra_if.local_addr[addr_type], + self.tra_if.local_addr[addr_type], + self.tra_if.remote_addr[addr_type], + self.tra_if.remote_addr[addr_type], + 0, + policy=e.IPSEC_API_SPD_ACTION_PROTECT, + priority=10, + ) + ) for o in objs: o.add_vpp_config() @@ -288,6 +388,7 @@ class TemplateIpsecAh(ConfigIpsecAH): --- --- --- """ + @classmethod def setUpClass(cls): super(TemplateIpsecAh, cls).setUpClass() @@ -306,26 +407,30 @@ class TemplateIpsecAh(ConfigIpsecAH): class TestIpsecAh1(TemplateIpsecAh, IpsecTcpTests): - """ Ipsec AH - TCP tests """ + """Ipsec AH - TCP tests""" + pass class TestIpsecAh2(TemplateIpsecAh, IpsecTra46Tests, IpsecTun46Tests): - """ Ipsec AH w/ SHA1 """ + """Ipsec AH w/ SHA1""" + pass class TestIpsecAhTun(TemplateIpsecAh, IpsecTun46Tests): - """ Ipsec AH - TUN encap tests """ + """Ipsec AH - TUN encap tests""" def setUp(self): self.ipv4_params = IPsecIPv4Params() self.ipv6_params = IPsecIPv6Params() - c = (VppEnum.vl_api_tunnel_encap_decap_flags_t. - TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP) - c1 = c | (VppEnum.vl_api_tunnel_encap_decap_flags_t. - TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN) + c = ( + VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP + ) + c1 = c | ( + VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN + ) self.ipv4_params.tun_flags = c self.ipv6_params.tun_flags = c1 @@ -334,19 +439,23 @@ class TestIpsecAhTun(TemplateIpsecAh, IpsecTun46Tests): def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54): # set the DSCP + ECN - flags are set to copy only DSCP - return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / - IP(src=src, dst=dst, tos=5) / - UDP(sport=4444, dport=4444) / - Raw(b'X' * payload_size) - for i in range(count)] + return [ + Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) + / IP(src=src, dst=dst, tos=5) + / UDP(sport=4444, dport=4444) + / Raw(b"X" * payload_size) + for i in range(count) + ] def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54): # set the DSCP + ECN - flags are set to copy both - return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / - IPv6(src=src, dst=dst, tc=5) / - UDP(sport=4444, dport=4444) / - Raw(b'X' * payload_size) - for i in range(count)] + return [ + Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) + / IPv6(src=src, dst=dst, tc=5) + / UDP(sport=4444, dport=4444) + / Raw(b"X" * payload_size) + for i in range(count) + ] def verify_encrypted(self, p, sa, rxs): # just check that only the DSCP is copied @@ -360,7 +469,7 @@ class TestIpsecAhTun(TemplateIpsecAh, IpsecTun46Tests): class TestIpsecAhTun2(TemplateIpsecAh, IpsecTun46Tests): - """ Ipsec AH - TUN encap tests """ + """Ipsec AH - TUN encap tests""" def setUp(self): self.ipv4_params = IPsecIPv4Params() @@ -373,24 +482,28 @@ class TestIpsecAhTun2(TemplateIpsecAh, IpsecTun46Tests): def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54): # set the DSCP + ECN - flags are set to copy only DSCP - return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / - IP(src=src, dst=dst, tos=0) / - UDP(sport=4444, dport=4444) / - Raw(b'X' * payload_size) - for i in range(count)] + return [ + Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) + / IP(src=src, dst=dst, tos=0) + / UDP(sport=4444, dport=4444) + / Raw(b"X" * payload_size) + for i in range(count) + ] def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54): # set the DSCP + ECN - flags are set to copy both - return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / - IPv6(src=src, dst=dst, tc=0) / - UDP(sport=4444, dport=4444) / - Raw(b'X' * payload_size) - for i in range(count)] + return [ + Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) + / IPv6(src=src, dst=dst, tc=0) + / UDP(sport=4444, dport=4444) + / Raw(b"X" * payload_size) + for i in range(count) + ] def verify_encrypted(self, p, sa, rxs): # just check that only the DSCP is copied for rx in rxs: - self.assertEqual(rx[IP].tos, 0xc) + self.assertEqual(rx[IP].tos, 0xC) def verify_encrypted6(self, p, sa, rxs): # just check that the DSCP & ECN are copied @@ -398,17 +511,14 @@ class TestIpsecAhTun2(TemplateIpsecAh, IpsecTun46Tests): self.assertEqual(rx[IPv6].tc, 0x10) -class TestIpsecAhHandoff(TemplateIpsecAh, - IpsecTun6HandoffTests, - IpsecTun4HandoffTests): - """ Ipsec AH Handoff """ +class TestIpsecAhHandoff(TemplateIpsecAh, IpsecTun6HandoffTests, IpsecTun4HandoffTests): + """Ipsec AH Handoff""" + pass -class TestIpsecAhAll(ConfigIpsecAH, - IpsecTra4, IpsecTra6, - IpsecTun4, IpsecTun6): - """ Ipsec AH all Algos """ +class TestIpsecAhAll(ConfigIpsecAH, IpsecTra4, IpsecTra6, IpsecTun4, IpsecTun6): + """Ipsec AH all Algos""" def setUp(self): super(TestIpsecAhAll, self).setUp() @@ -421,21 +531,26 @@ class TestIpsecAhAll(ConfigIpsecAH, # foreach VPP crypto engine engines = ["ia32", "ipsecmb", "openssl"] - algos = [{'vpp': VppEnum.vl_api_ipsec_integ_alg_t. - IPSEC_API_INTEG_ALG_SHA1_96, - 'scapy': "HMAC-SHA1-96"}, - {'vpp': VppEnum.vl_api_ipsec_integ_alg_t. - IPSEC_API_INTEG_ALG_SHA_256_128, - 'scapy': "SHA2-256-128"}, - {'vpp': VppEnum.vl_api_ipsec_integ_alg_t. - IPSEC_API_INTEG_ALG_SHA_384_192, - 'scapy': "SHA2-384-192"}, - {'vpp': VppEnum.vl_api_ipsec_integ_alg_t. - IPSEC_API_INTEG_ALG_SHA_512_256, - 'scapy': "SHA2-512-256"}] - - flags = [0, (VppEnum.vl_api_ipsec_sad_flags_t. - IPSEC_API_SAD_FLAG_USE_ESN)] + algos = [ + { + "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96, + "scapy": "HMAC-SHA1-96", + }, + { + "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128, + "scapy": "SHA2-256-128", + }, + { + "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_384_192, + "scapy": "SHA2-384-192", + }, + { + "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_512_256, + "scapy": "SHA2-512-256", + }, + ] + + flags = [0, (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)] # # loop through the VPP engines @@ -454,14 +569,14 @@ class TestIpsecAhAll(ConfigIpsecAH, self.ipv4_params = IPsecIPv4Params() self.ipv6_params = IPsecIPv6Params() - self.params = {self.ipv4_params.addr_type: - self.ipv4_params, - self.ipv6_params.addr_type: - self.ipv6_params} + self.params = { + self.ipv4_params.addr_type: self.ipv4_params, + self.ipv6_params.addr_type: self.ipv6_params, + } for _, p in self.params.items(): - p.auth_algo_vpp_id = algo['vpp'] - p.auth_algo = algo['scapy'] + p.auth_algo_vpp_id = algo["vpp"] + p.auth_algo = algo["scapy"] p.flags = p.flags | flag # @@ -484,5 +599,5 @@ class TestIpsecAhAll(ConfigIpsecAH, self.unconfig_network() -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) -- cgit 1.2.3-korg