diff options
author | Klement Sekera <klement.sekera@gmail.com> | 2022-04-26 19:02:15 +0200 |
---|---|---|
committer | Ole Tr�an <otroan@employees.org> | 2022-05-10 18:52:08 +0000 |
commit | d9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch) | |
tree | 4f786cfd8ebc2443cb11e11b74c8657204068898 /test/test_ipsec_ah.py | |
parent | f90348bcb4afd0af2611cefc43b17ef3042b511c (diff) |
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is
much faster, stable PEP8 compliant code style checker offering also
automatic formatting. It aims to be very stable and produce smallest
diffs. It's used by many small and big projects.
Running checkstyle with black takes a few seconds with a terse output.
Thus, test-checkstyle-diff is no longer necessary.
Expand scope of checkstyle to all python files in the repo, replacing
test-checkstyle with checkstyle-python.
Also, fixstyle-python is now available for automatic style formatting.
Note: python virtualenv has been consolidated in test/Makefile,
test/requirements*.txt which will eventually be moved to a central
location. This is required to simply the automated generation of
docker executor images in the CI.
Type: improvement
Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/test_ipsec_ah.py')
-rw-r--r-- | test/test_ipsec_ah.py | 453 |
1 files changed, 284 insertions, 169 deletions
diff --git a/test/test_ipsec_ah.py b/test/test_ipsec_ah.py index 8f8b2bf1550..190bde78f56 100644 --- a/test/test_ipsec_ah.py +++ b/test/test_ipsec_ah.py @@ -8,13 +8,23 @@ from scapy.layers.l2 import Ether from scapy.packet import Raw from framework import VppTestRunner -from template_ipsec import TemplateIpsec, IpsecTra46Tests, IpsecTun46Tests, \ - config_tun_params, config_tra_params, IPsecIPv4Params, IPsecIPv6Params, \ - IpsecTra4, IpsecTun4, IpsecTra6, IpsecTun6, \ - IpsecTun6HandoffTests, IpsecTun4HandoffTests +from template_ipsec import ( + TemplateIpsec, + IpsecTra46Tests, + IpsecTun46Tests, + config_tun_params, + config_tra_params, + IPsecIPv4Params, + IPsecIPv6Params, + IpsecTra4, + IpsecTun4, + IpsecTra6, + IpsecTun6, + IpsecTun6HandoffTests, + IpsecTun4HandoffTests, +) from template_ipsec import IpsecTcpTests -from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\ - VppIpsecSpdItfBinding +from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSpdItfBinding from vpp_ip_route import VppIpRoute, VppRoutePath from vpp_ip import DpoProto from vpp_papi import VppEnum @@ -41,6 +51,7 @@ class ConfigIpsecAH(TemplateIpsec): --- --- --- """ + encryption_type = AH net_objs = [] tra4_encrypt_node_name = "ah4-encrypt" @@ -79,13 +90,11 @@ class ConfigIpsecAH(TemplateIpsec): self.tun_spd.add_vpp_config() self.net_objs.append(self.tun_spd) - b = VppIpsecSpdItfBinding(self, self.tra_spd, - self.tra_if) + b = VppIpsecSpdItfBinding(self, self.tra_spd, self.tra_if) b.add_vpp_config() self.net_objs.append(b) - b = VppIpsecSpdItfBinding(self, self.tun_spd, - self.tun_if) + b = VppIpsecSpdItfBinding(self, self.tun_spd, self.tun_if) b.add_vpp_config() self.net_objs.append(b) @@ -97,10 +106,16 @@ class ConfigIpsecAH(TemplateIpsec): config_tun_params(p, self.encryption_type, self.tun_if) for p in params: d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4 - r = VppIpRoute(self, p.remote_tun_if_host, p.addr_len, - [VppRoutePath(self.tun_if.remote_addr[p.addr_type], - 0xffffffff, - proto=d)]) + r = VppIpRoute( + self, + p.remote_tun_if_host, + p.addr_len, + [ + VppRoutePath( + self.tun_if.remote_addr[p.addr_type], 0xFFFFFFFF, proto=d + ) + ], + ) r.add_vpp_config() self.net_objs.append(r) self.logger.info(self.vapi.ppcli("show ipsec all")) @@ -130,74 +145,116 @@ class ConfigIpsecAH(TemplateIpsec): params.outer_hop_limit = 253 params.outer_flow_label = 0x12345 - params.tun_sa_in = VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - self.vpp_ah_protocol, - self.tun_if.local_addr[addr_type], - self.tun_if.remote_addr[addr_type], - tun_flags=tun_flags, - flags=flags, - dscp=params.dscp) - - params.tun_sa_out = VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - self.vpp_ah_protocol, - self.tun_if.remote_addr[addr_type], - self.tun_if.local_addr[addr_type], - tun_flags=tun_flags, - flags=flags, - dscp=params.dscp) + params.tun_sa_in = VppIpsecSA( + self, + scapy_tun_sa_id, + scapy_tun_spi, + auth_algo_vpp_id, + auth_key, + crypt_algo_vpp_id, + crypt_key, + self.vpp_ah_protocol, + self.tun_if.local_addr[addr_type], + self.tun_if.remote_addr[addr_type], + tun_flags=tun_flags, + flags=flags, + dscp=params.dscp, + ) + + params.tun_sa_out = VppIpsecSA( + self, + vpp_tun_sa_id, + vpp_tun_spi, + auth_algo_vpp_id, + auth_key, + crypt_algo_vpp_id, + crypt_key, + self.vpp_ah_protocol, + self.tun_if.remote_addr[addr_type], + self.tun_if.local_addr[addr_type], + tun_flags=tun_flags, + flags=flags, + dscp=params.dscp, + ) objs.append(params.tun_sa_in) objs.append(params.tun_sa_out) - params.spd_policy_in_any = VppIpsecSpdEntry(self, self.tun_spd, - vpp_tun_sa_id, - addr_any, addr_bcast, - addr_any, addr_bcast, - socket.IPPROTO_AH) - params.spd_policy_out_any = VppIpsecSpdEntry(self, self.tun_spd, - vpp_tun_sa_id, - addr_any, addr_bcast, - addr_any, addr_bcast, - socket.IPPROTO_AH, - is_outbound=0) + params.spd_policy_in_any = VppIpsecSpdEntry( + self, + self.tun_spd, + vpp_tun_sa_id, + addr_any, + addr_bcast, + addr_any, + addr_bcast, + socket.IPPROTO_AH, + ) + params.spd_policy_out_any = VppIpsecSpdEntry( + self, + self.tun_spd, + vpp_tun_sa_id, + addr_any, + addr_bcast, + addr_any, + addr_bcast, + socket.IPPROTO_AH, + is_outbound=0, + ) objs.append(params.spd_policy_out_any) objs.append(params.spd_policy_in_any) - e1 = VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id, - remote_tun_if_host, - remote_tun_if_host, - self.pg1.remote_addr[addr_type], - self.pg1.remote_addr[addr_type], - 0, priority=10, - policy=e.IPSEC_API_SPD_ACTION_PROTECT, - is_outbound=0) - e2 = VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id, - self.pg1.remote_addr[addr_type], - self.pg1.remote_addr[addr_type], - remote_tun_if_host, - remote_tun_if_host, - 0, policy=e.IPSEC_API_SPD_ACTION_PROTECT, - priority=10) - e3 = VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id, - remote_tun_if_host, - remote_tun_if_host, - self.pg0.local_addr[addr_type], - self.pg0.local_addr[addr_type], - 0, priority=20, - policy=e.IPSEC_API_SPD_ACTION_PROTECT, - is_outbound=0) - e4 = VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id, - self.pg0.local_addr[addr_type], - self.pg0.local_addr[addr_type], - remote_tun_if_host, - remote_tun_if_host, - 0, policy=e.IPSEC_API_SPD_ACTION_PROTECT, - priority=20) + e1 = VppIpsecSpdEntry( + self, + self.tun_spd, + vpp_tun_sa_id, + remote_tun_if_host, + remote_tun_if_host, + self.pg1.remote_addr[addr_type], + self.pg1.remote_addr[addr_type], + 0, + priority=10, + policy=e.IPSEC_API_SPD_ACTION_PROTECT, + is_outbound=0, + ) + e2 = VppIpsecSpdEntry( + self, + self.tun_spd, + scapy_tun_sa_id, + self.pg1.remote_addr[addr_type], + self.pg1.remote_addr[addr_type], + remote_tun_if_host, + remote_tun_if_host, + 0, + policy=e.IPSEC_API_SPD_ACTION_PROTECT, + priority=10, + ) + e3 = VppIpsecSpdEntry( + self, + self.tun_spd, + vpp_tun_sa_id, + remote_tun_if_host, + remote_tun_if_host, + self.pg0.local_addr[addr_type], + self.pg0.local_addr[addr_type], + 0, + priority=20, + policy=e.IPSEC_API_SPD_ACTION_PROTECT, + is_outbound=0, + ) + e4 = VppIpsecSpdEntry( + self, + self.tun_spd, + scapy_tun_sa_id, + self.pg0.local_addr[addr_type], + self.pg0.local_addr[addr_type], + remote_tun_if_host, + remote_tun_if_host, + 0, + policy=e.IPSEC_API_SPD_ACTION_PROTECT, + priority=20, + ) objs = objs + [e1, e2, e3, e4] @@ -218,49 +275,92 @@ class ConfigIpsecAH(TemplateIpsec): crypt_key = params.crypt_key addr_any = params.addr_any addr_bcast = params.addr_bcast - flags = params.flags | (VppEnum.vl_api_ipsec_sad_flags_t. - IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY) + flags = params.flags | ( + VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY + ) e = VppEnum.vl_api_ipsec_spd_action_t objs = [] - params.tra_sa_in = VppIpsecSA(self, scapy_tra_sa_id, scapy_tra_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - self.vpp_ah_protocol, - flags=flags) - params.tra_sa_out = VppIpsecSA(self, vpp_tra_sa_id, vpp_tra_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - self.vpp_ah_protocol, - flags=flags) + params.tra_sa_in = VppIpsecSA( + self, + scapy_tra_sa_id, + scapy_tra_spi, + auth_algo_vpp_id, + auth_key, + crypt_algo_vpp_id, + crypt_key, + self.vpp_ah_protocol, + flags=flags, + ) + params.tra_sa_out = VppIpsecSA( + self, + vpp_tra_sa_id, + vpp_tra_spi, + auth_algo_vpp_id, + auth_key, + crypt_algo_vpp_id, + crypt_key, + self.vpp_ah_protocol, + flags=flags, + ) objs.append(params.tra_sa_in) objs.append(params.tra_sa_out) - objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id, - addr_any, addr_bcast, - addr_any, addr_bcast, - socket.IPPROTO_AH)) - objs.append(VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id, - addr_any, addr_bcast, - addr_any, addr_bcast, - socket.IPPROTO_AH, - is_outbound=0)) - objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id, - self.tra_if.local_addr[addr_type], - self.tra_if.local_addr[addr_type], - self.tra_if.remote_addr[addr_type], - self.tra_if.remote_addr[addr_type], - 0, priority=10, - policy=e.IPSEC_API_SPD_ACTION_PROTECT, - is_outbound=0)) - objs.append(VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id, - self.tra_if.local_addr[addr_type], - self.tra_if.local_addr[addr_type], - self.tra_if.remote_addr[addr_type], - self.tra_if.remote_addr[addr_type], - 0, policy=e.IPSEC_API_SPD_ACTION_PROTECT, - priority=10)) + objs.append( + VppIpsecSpdEntry( + self, + self.tra_spd, + vpp_tra_sa_id, + addr_any, + addr_bcast, + addr_any, + addr_bcast, + socket.IPPROTO_AH, + ) + ) + objs.append( + VppIpsecSpdEntry( + self, + self.tra_spd, + scapy_tra_sa_id, + addr_any, + addr_bcast, + addr_any, + addr_bcast, + socket.IPPROTO_AH, + is_outbound=0, + ) + ) + objs.append( + VppIpsecSpdEntry( + self, + self.tra_spd, + vpp_tra_sa_id, + self.tra_if.local_addr[addr_type], + self.tra_if.local_addr[addr_type], + self.tra_if.remote_addr[addr_type], + self.tra_if.remote_addr[addr_type], + 0, + priority=10, + policy=e.IPSEC_API_SPD_ACTION_PROTECT, + is_outbound=0, + ) + ) + objs.append( + VppIpsecSpdEntry( + self, + self.tra_spd, + scapy_tra_sa_id, + self.tra_if.local_addr[addr_type], + self.tra_if.local_addr[addr_type], + self.tra_if.remote_addr[addr_type], + self.tra_if.remote_addr[addr_type], + 0, + policy=e.IPSEC_API_SPD_ACTION_PROTECT, + priority=10, + ) + ) for o in objs: o.add_vpp_config() @@ -288,6 +388,7 @@ class TemplateIpsecAh(ConfigIpsecAH): --- --- --- """ + @classmethod def setUpClass(cls): super(TemplateIpsecAh, cls).setUpClass() @@ -306,26 +407,30 @@ class TemplateIpsecAh(ConfigIpsecAH): class TestIpsecAh1(TemplateIpsecAh, IpsecTcpTests): - """ Ipsec AH - TCP tests """ + """Ipsec AH - TCP tests""" + pass class TestIpsecAh2(TemplateIpsecAh, IpsecTra46Tests, IpsecTun46Tests): - """ Ipsec AH w/ SHA1 """ + """Ipsec AH w/ SHA1""" + pass class TestIpsecAhTun(TemplateIpsecAh, IpsecTun46Tests): - """ Ipsec AH - TUN encap tests """ + """Ipsec AH - TUN encap tests""" def setUp(self): self.ipv4_params = IPsecIPv4Params() self.ipv6_params = IPsecIPv6Params() - c = (VppEnum.vl_api_tunnel_encap_decap_flags_t. - TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP) - c1 = c | (VppEnum.vl_api_tunnel_encap_decap_flags_t. - TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN) + c = ( + VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP + ) + c1 = c | ( + VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN + ) self.ipv4_params.tun_flags = c self.ipv6_params.tun_flags = c1 @@ -334,19 +439,23 @@ class TestIpsecAhTun(TemplateIpsecAh, IpsecTun46Tests): def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54): # set the DSCP + ECN - flags are set to copy only DSCP - return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / - IP(src=src, dst=dst, tos=5) / - UDP(sport=4444, dport=4444) / - Raw(b'X' * payload_size) - for i in range(count)] + return [ + Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) + / IP(src=src, dst=dst, tos=5) + / UDP(sport=4444, dport=4444) + / Raw(b"X" * payload_size) + for i in range(count) + ] def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54): # set the DSCP + ECN - flags are set to copy both - return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / - IPv6(src=src, dst=dst, tc=5) / - UDP(sport=4444, dport=4444) / - Raw(b'X' * payload_size) - for i in range(count)] + return [ + Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) + / IPv6(src=src, dst=dst, tc=5) + / UDP(sport=4444, dport=4444) + / Raw(b"X" * payload_size) + for i in range(count) + ] def verify_encrypted(self, p, sa, rxs): # just check that only the DSCP is copied @@ -360,7 +469,7 @@ class TestIpsecAhTun(TemplateIpsecAh, IpsecTun46Tests): class TestIpsecAhTun2(TemplateIpsecAh, IpsecTun46Tests): - """ Ipsec AH - TUN encap tests """ + """Ipsec AH - TUN encap tests""" def setUp(self): self.ipv4_params = IPsecIPv4Params() @@ -373,24 +482,28 @@ class TestIpsecAhTun2(TemplateIpsecAh, IpsecTun46Tests): def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54): # set the DSCP + ECN - flags are set to copy only DSCP - return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / - IP(src=src, dst=dst, tos=0) / - UDP(sport=4444, dport=4444) / - Raw(b'X' * payload_size) - for i in range(count)] + return [ + Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) + / IP(src=src, dst=dst, tos=0) + / UDP(sport=4444, dport=4444) + / Raw(b"X" * payload_size) + for i in range(count) + ] def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54): # set the DSCP + ECN - flags are set to copy both - return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / - IPv6(src=src, dst=dst, tc=0) / - UDP(sport=4444, dport=4444) / - Raw(b'X' * payload_size) - for i in range(count)] + return [ + Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) + / IPv6(src=src, dst=dst, tc=0) + / UDP(sport=4444, dport=4444) + / Raw(b"X" * payload_size) + for i in range(count) + ] def verify_encrypted(self, p, sa, rxs): # just check that only the DSCP is copied for rx in rxs: - self.assertEqual(rx[IP].tos, 0xc) + self.assertEqual(rx[IP].tos, 0xC) def verify_encrypted6(self, p, sa, rxs): # just check that the DSCP & ECN are copied @@ -398,17 +511,14 @@ class TestIpsecAhTun2(TemplateIpsecAh, IpsecTun46Tests): self.assertEqual(rx[IPv6].tc, 0x10) -class TestIpsecAhHandoff(TemplateIpsecAh, - IpsecTun6HandoffTests, - IpsecTun4HandoffTests): - """ Ipsec AH Handoff """ +class TestIpsecAhHandoff(TemplateIpsecAh, IpsecTun6HandoffTests, IpsecTun4HandoffTests): + """Ipsec AH Handoff""" + pass -class TestIpsecAhAll(ConfigIpsecAH, - IpsecTra4, IpsecTra6, - IpsecTun4, IpsecTun6): - """ Ipsec AH all Algos """ +class TestIpsecAhAll(ConfigIpsecAH, IpsecTra4, IpsecTra6, IpsecTun4, IpsecTun6): + """Ipsec AH all Algos""" def setUp(self): super(TestIpsecAhAll, self).setUp() @@ -421,21 +531,26 @@ class TestIpsecAhAll(ConfigIpsecAH, # foreach VPP crypto engine engines = ["ia32", "ipsecmb", "openssl"] - algos = [{'vpp': VppEnum.vl_api_ipsec_integ_alg_t. - IPSEC_API_INTEG_ALG_SHA1_96, - 'scapy': "HMAC-SHA1-96"}, - {'vpp': VppEnum.vl_api_ipsec_integ_alg_t. - IPSEC_API_INTEG_ALG_SHA_256_128, - 'scapy': "SHA2-256-128"}, - {'vpp': VppEnum.vl_api_ipsec_integ_alg_t. - IPSEC_API_INTEG_ALG_SHA_384_192, - 'scapy': "SHA2-384-192"}, - {'vpp': VppEnum.vl_api_ipsec_integ_alg_t. - IPSEC_API_INTEG_ALG_SHA_512_256, - 'scapy': "SHA2-512-256"}] - - flags = [0, (VppEnum.vl_api_ipsec_sad_flags_t. - IPSEC_API_SAD_FLAG_USE_ESN)] + algos = [ + { + "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96, + "scapy": "HMAC-SHA1-96", + }, + { + "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128, + "scapy": "SHA2-256-128", + }, + { + "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_384_192, + "scapy": "SHA2-384-192", + }, + { + "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_512_256, + "scapy": "SHA2-512-256", + }, + ] + + flags = [0, (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)] # # loop through the VPP engines @@ -454,14 +569,14 @@ class TestIpsecAhAll(ConfigIpsecAH, self.ipv4_params = IPsecIPv4Params() self.ipv6_params = IPsecIPv6Params() - self.params = {self.ipv4_params.addr_type: - self.ipv4_params, - self.ipv6_params.addr_type: - self.ipv6_params} + self.params = { + self.ipv4_params.addr_type: self.ipv4_params, + self.ipv6_params.addr_type: self.ipv6_params, + } for _, p in self.params.items(): - p.auth_algo_vpp_id = algo['vpp'] - p.auth_algo = algo['scapy'] + p.auth_algo_vpp_id = algo["vpp"] + p.auth_algo = algo["scapy"] p.flags = p.flags | flag # @@ -484,5 +599,5 @@ class TestIpsecAhAll(ConfigIpsecAH, self.unconfig_network() -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) |