aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_ipsec_ah.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_ipsec_ah.py')
-rw-r--r--test/test_ipsec_ah.py453
1 files changed, 284 insertions, 169 deletions
diff --git a/test/test_ipsec_ah.py b/test/test_ipsec_ah.py
index 8f8b2bf1550..190bde78f56 100644
--- a/test/test_ipsec_ah.py
+++ b/test/test_ipsec_ah.py
@@ -8,13 +8,23 @@ from scapy.layers.l2 import Ether
from scapy.packet import Raw
from framework import VppTestRunner
-from template_ipsec import TemplateIpsec, IpsecTra46Tests, IpsecTun46Tests, \
- config_tun_params, config_tra_params, IPsecIPv4Params, IPsecIPv6Params, \
- IpsecTra4, IpsecTun4, IpsecTra6, IpsecTun6, \
- IpsecTun6HandoffTests, IpsecTun4HandoffTests
+from template_ipsec import (
+ TemplateIpsec,
+ IpsecTra46Tests,
+ IpsecTun46Tests,
+ config_tun_params,
+ config_tra_params,
+ IPsecIPv4Params,
+ IPsecIPv6Params,
+ IpsecTra4,
+ IpsecTun4,
+ IpsecTra6,
+ IpsecTun6,
+ IpsecTun6HandoffTests,
+ IpsecTun4HandoffTests,
+)
from template_ipsec import IpsecTcpTests
-from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
- VppIpsecSpdItfBinding
+from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSpdItfBinding
from vpp_ip_route import VppIpRoute, VppRoutePath
from vpp_ip import DpoProto
from vpp_papi import VppEnum
@@ -41,6 +51,7 @@ class ConfigIpsecAH(TemplateIpsec):
--- --- ---
"""
+
encryption_type = AH
net_objs = []
tra4_encrypt_node_name = "ah4-encrypt"
@@ -79,13 +90,11 @@ class ConfigIpsecAH(TemplateIpsec):
self.tun_spd.add_vpp_config()
self.net_objs.append(self.tun_spd)
- b = VppIpsecSpdItfBinding(self, self.tra_spd,
- self.tra_if)
+ b = VppIpsecSpdItfBinding(self, self.tra_spd, self.tra_if)
b.add_vpp_config()
self.net_objs.append(b)
- b = VppIpsecSpdItfBinding(self, self.tun_spd,
- self.tun_if)
+ b = VppIpsecSpdItfBinding(self, self.tun_spd, self.tun_if)
b.add_vpp_config()
self.net_objs.append(b)
@@ -97,10 +106,16 @@ class ConfigIpsecAH(TemplateIpsec):
config_tun_params(p, self.encryption_type, self.tun_if)
for p in params:
d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
- r = VppIpRoute(self, p.remote_tun_if_host, p.addr_len,
- [VppRoutePath(self.tun_if.remote_addr[p.addr_type],
- 0xffffffff,
- proto=d)])
+ r = VppIpRoute(
+ self,
+ p.remote_tun_if_host,
+ p.addr_len,
+ [
+ VppRoutePath(
+ self.tun_if.remote_addr[p.addr_type], 0xFFFFFFFF, proto=d
+ )
+ ],
+ )
r.add_vpp_config()
self.net_objs.append(r)
self.logger.info(self.vapi.ppcli("show ipsec all"))
@@ -130,74 +145,116 @@ class ConfigIpsecAH(TemplateIpsec):
params.outer_hop_limit = 253
params.outer_flow_label = 0x12345
- params.tun_sa_in = VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi,
- auth_algo_vpp_id, auth_key,
- crypt_algo_vpp_id, crypt_key,
- self.vpp_ah_protocol,
- self.tun_if.local_addr[addr_type],
- self.tun_if.remote_addr[addr_type],
- tun_flags=tun_flags,
- flags=flags,
- dscp=params.dscp)
-
- params.tun_sa_out = VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi,
- auth_algo_vpp_id, auth_key,
- crypt_algo_vpp_id, crypt_key,
- self.vpp_ah_protocol,
- self.tun_if.remote_addr[addr_type],
- self.tun_if.local_addr[addr_type],
- tun_flags=tun_flags,
- flags=flags,
- dscp=params.dscp)
+ params.tun_sa_in = VppIpsecSA(
+ self,
+ scapy_tun_sa_id,
+ scapy_tun_spi,
+ auth_algo_vpp_id,
+ auth_key,
+ crypt_algo_vpp_id,
+ crypt_key,
+ self.vpp_ah_protocol,
+ self.tun_if.local_addr[addr_type],
+ self.tun_if.remote_addr[addr_type],
+ tun_flags=tun_flags,
+ flags=flags,
+ dscp=params.dscp,
+ )
+
+ params.tun_sa_out = VppIpsecSA(
+ self,
+ vpp_tun_sa_id,
+ vpp_tun_spi,
+ auth_algo_vpp_id,
+ auth_key,
+ crypt_algo_vpp_id,
+ crypt_key,
+ self.vpp_ah_protocol,
+ self.tun_if.remote_addr[addr_type],
+ self.tun_if.local_addr[addr_type],
+ tun_flags=tun_flags,
+ flags=flags,
+ dscp=params.dscp,
+ )
objs.append(params.tun_sa_in)
objs.append(params.tun_sa_out)
- params.spd_policy_in_any = VppIpsecSpdEntry(self, self.tun_spd,
- vpp_tun_sa_id,
- addr_any, addr_bcast,
- addr_any, addr_bcast,
- socket.IPPROTO_AH)
- params.spd_policy_out_any = VppIpsecSpdEntry(self, self.tun_spd,
- vpp_tun_sa_id,
- addr_any, addr_bcast,
- addr_any, addr_bcast,
- socket.IPPROTO_AH,
- is_outbound=0)
+ params.spd_policy_in_any = VppIpsecSpdEntry(
+ self,
+ self.tun_spd,
+ vpp_tun_sa_id,
+ addr_any,
+ addr_bcast,
+ addr_any,
+ addr_bcast,
+ socket.IPPROTO_AH,
+ )
+ params.spd_policy_out_any = VppIpsecSpdEntry(
+ self,
+ self.tun_spd,
+ vpp_tun_sa_id,
+ addr_any,
+ addr_bcast,
+ addr_any,
+ addr_bcast,
+ socket.IPPROTO_AH,
+ is_outbound=0,
+ )
objs.append(params.spd_policy_out_any)
objs.append(params.spd_policy_in_any)
- e1 = VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
- remote_tun_if_host,
- remote_tun_if_host,
- self.pg1.remote_addr[addr_type],
- self.pg1.remote_addr[addr_type],
- 0, priority=10,
- policy=e.IPSEC_API_SPD_ACTION_PROTECT,
- is_outbound=0)
- e2 = VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
- self.pg1.remote_addr[addr_type],
- self.pg1.remote_addr[addr_type],
- remote_tun_if_host,
- remote_tun_if_host,
- 0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
- priority=10)
- e3 = VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
- remote_tun_if_host,
- remote_tun_if_host,
- self.pg0.local_addr[addr_type],
- self.pg0.local_addr[addr_type],
- 0, priority=20,
- policy=e.IPSEC_API_SPD_ACTION_PROTECT,
- is_outbound=0)
- e4 = VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
- self.pg0.local_addr[addr_type],
- self.pg0.local_addr[addr_type],
- remote_tun_if_host,
- remote_tun_if_host,
- 0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
- priority=20)
+ e1 = VppIpsecSpdEntry(
+ self,
+ self.tun_spd,
+ vpp_tun_sa_id,
+ remote_tun_if_host,
+ remote_tun_if_host,
+ self.pg1.remote_addr[addr_type],
+ self.pg1.remote_addr[addr_type],
+ 0,
+ priority=10,
+ policy=e.IPSEC_API_SPD_ACTION_PROTECT,
+ is_outbound=0,
+ )
+ e2 = VppIpsecSpdEntry(
+ self,
+ self.tun_spd,
+ scapy_tun_sa_id,
+ self.pg1.remote_addr[addr_type],
+ self.pg1.remote_addr[addr_type],
+ remote_tun_if_host,
+ remote_tun_if_host,
+ 0,
+ policy=e.IPSEC_API_SPD_ACTION_PROTECT,
+ priority=10,
+ )
+ e3 = VppIpsecSpdEntry(
+ self,
+ self.tun_spd,
+ vpp_tun_sa_id,
+ remote_tun_if_host,
+ remote_tun_if_host,
+ self.pg0.local_addr[addr_type],
+ self.pg0.local_addr[addr_type],
+ 0,
+ priority=20,
+ policy=e.IPSEC_API_SPD_ACTION_PROTECT,
+ is_outbound=0,
+ )
+ e4 = VppIpsecSpdEntry(
+ self,
+ self.tun_spd,
+ scapy_tun_sa_id,
+ self.pg0.local_addr[addr_type],
+ self.pg0.local_addr[addr_type],
+ remote_tun_if_host,
+ remote_tun_if_host,
+ 0,
+ policy=e.IPSEC_API_SPD_ACTION_PROTECT,
+ priority=20,
+ )
objs = objs + [e1, e2, e3, e4]
@@ -218,49 +275,92 @@ class ConfigIpsecAH(TemplateIpsec):
crypt_key = params.crypt_key
addr_any = params.addr_any
addr_bcast = params.addr_bcast
- flags = params.flags | (VppEnum.vl_api_ipsec_sad_flags_t.
- IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY)
+ flags = params.flags | (
+ VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
+ )
e = VppEnum.vl_api_ipsec_spd_action_t
objs = []
- params.tra_sa_in = VppIpsecSA(self, scapy_tra_sa_id, scapy_tra_spi,
- auth_algo_vpp_id, auth_key,
- crypt_algo_vpp_id, crypt_key,
- self.vpp_ah_protocol,
- flags=flags)
- params.tra_sa_out = VppIpsecSA(self, vpp_tra_sa_id, vpp_tra_spi,
- auth_algo_vpp_id, auth_key,
- crypt_algo_vpp_id, crypt_key,
- self.vpp_ah_protocol,
- flags=flags)
+ params.tra_sa_in = VppIpsecSA(
+ self,
+ scapy_tra_sa_id,
+ scapy_tra_spi,
+ auth_algo_vpp_id,
+ auth_key,
+ crypt_algo_vpp_id,
+ crypt_key,
+ self.vpp_ah_protocol,
+ flags=flags,
+ )
+ params.tra_sa_out = VppIpsecSA(
+ self,
+ vpp_tra_sa_id,
+ vpp_tra_spi,
+ auth_algo_vpp_id,
+ auth_key,
+ crypt_algo_vpp_id,
+ crypt_key,
+ self.vpp_ah_protocol,
+ flags=flags,
+ )
objs.append(params.tra_sa_in)
objs.append(params.tra_sa_out)
- objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
- addr_any, addr_bcast,
- addr_any, addr_bcast,
- socket.IPPROTO_AH))
- objs.append(VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
- addr_any, addr_bcast,
- addr_any, addr_bcast,
- socket.IPPROTO_AH,
- is_outbound=0))
- objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
- self.tra_if.local_addr[addr_type],
- self.tra_if.local_addr[addr_type],
- self.tra_if.remote_addr[addr_type],
- self.tra_if.remote_addr[addr_type],
- 0, priority=10,
- policy=e.IPSEC_API_SPD_ACTION_PROTECT,
- is_outbound=0))
- objs.append(VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
- self.tra_if.local_addr[addr_type],
- self.tra_if.local_addr[addr_type],
- self.tra_if.remote_addr[addr_type],
- self.tra_if.remote_addr[addr_type],
- 0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
- priority=10))
+ objs.append(
+ VppIpsecSpdEntry(
+ self,
+ self.tra_spd,
+ vpp_tra_sa_id,
+ addr_any,
+ addr_bcast,
+ addr_any,
+ addr_bcast,
+ socket.IPPROTO_AH,
+ )
+ )
+ objs.append(
+ VppIpsecSpdEntry(
+ self,
+ self.tra_spd,
+ scapy_tra_sa_id,
+ addr_any,
+ addr_bcast,
+ addr_any,
+ addr_bcast,
+ socket.IPPROTO_AH,
+ is_outbound=0,
+ )
+ )
+ objs.append(
+ VppIpsecSpdEntry(
+ self,
+ self.tra_spd,
+ vpp_tra_sa_id,
+ self.tra_if.local_addr[addr_type],
+ self.tra_if.local_addr[addr_type],
+ self.tra_if.remote_addr[addr_type],
+ self.tra_if.remote_addr[addr_type],
+ 0,
+ priority=10,
+ policy=e.IPSEC_API_SPD_ACTION_PROTECT,
+ is_outbound=0,
+ )
+ )
+ objs.append(
+ VppIpsecSpdEntry(
+ self,
+ self.tra_spd,
+ scapy_tra_sa_id,
+ self.tra_if.local_addr[addr_type],
+ self.tra_if.local_addr[addr_type],
+ self.tra_if.remote_addr[addr_type],
+ self.tra_if.remote_addr[addr_type],
+ 0,
+ policy=e.IPSEC_API_SPD_ACTION_PROTECT,
+ priority=10,
+ )
+ )
for o in objs:
o.add_vpp_config()
@@ -288,6 +388,7 @@ class TemplateIpsecAh(ConfigIpsecAH):
--- --- ---
"""
+
@classmethod
def setUpClass(cls):
super(TemplateIpsecAh, cls).setUpClass()
@@ -306,26 +407,30 @@ class TemplateIpsecAh(ConfigIpsecAH):
class TestIpsecAh1(TemplateIpsecAh, IpsecTcpTests):
- """ Ipsec AH - TCP tests """
+ """Ipsec AH - TCP tests"""
+
pass
class TestIpsecAh2(TemplateIpsecAh, IpsecTra46Tests, IpsecTun46Tests):
- """ Ipsec AH w/ SHA1 """
+ """Ipsec AH w/ SHA1"""
+
pass
class TestIpsecAhTun(TemplateIpsecAh, IpsecTun46Tests):
- """ Ipsec AH - TUN encap tests """
+ """Ipsec AH - TUN encap tests"""
def setUp(self):
self.ipv4_params = IPsecIPv4Params()
self.ipv6_params = IPsecIPv6Params()
- c = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
- TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP)
- c1 = c | (VppEnum.vl_api_tunnel_encap_decap_flags_t.
- TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN)
+ c = (
+ VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP
+ )
+ c1 = c | (
+ VppEnum.vl_api_tunnel_encap_decap_flags_t.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN
+ )
self.ipv4_params.tun_flags = c
self.ipv6_params.tun_flags = c1
@@ -334,19 +439,23 @@ class TestIpsecAhTun(TemplateIpsecAh, IpsecTun46Tests):
def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
# set the DSCP + ECN - flags are set to copy only DSCP
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- IP(src=src, dst=dst, tos=5) /
- UDP(sport=4444, dport=4444) /
- Raw(b'X' * payload_size)
- for i in range(count)]
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / IP(src=src, dst=dst, tos=5)
+ / UDP(sport=4444, dport=4444)
+ / Raw(b"X" * payload_size)
+ for i in range(count)
+ ]
def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
# set the DSCP + ECN - flags are set to copy both
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- IPv6(src=src, dst=dst, tc=5) /
- UDP(sport=4444, dport=4444) /
- Raw(b'X' * payload_size)
- for i in range(count)]
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / IPv6(src=src, dst=dst, tc=5)
+ / UDP(sport=4444, dport=4444)
+ / Raw(b"X" * payload_size)
+ for i in range(count)
+ ]
def verify_encrypted(self, p, sa, rxs):
# just check that only the DSCP is copied
@@ -360,7 +469,7 @@ class TestIpsecAhTun(TemplateIpsecAh, IpsecTun46Tests):
class TestIpsecAhTun2(TemplateIpsecAh, IpsecTun46Tests):
- """ Ipsec AH - TUN encap tests """
+ """Ipsec AH - TUN encap tests"""
def setUp(self):
self.ipv4_params = IPsecIPv4Params()
@@ -373,24 +482,28 @@ class TestIpsecAhTun2(TemplateIpsecAh, IpsecTun46Tests):
def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
# set the DSCP + ECN - flags are set to copy only DSCP
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- IP(src=src, dst=dst, tos=0) /
- UDP(sport=4444, dport=4444) /
- Raw(b'X' * payload_size)
- for i in range(count)]
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / IP(src=src, dst=dst, tos=0)
+ / UDP(sport=4444, dport=4444)
+ / Raw(b"X" * payload_size)
+ for i in range(count)
+ ]
def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
# set the DSCP + ECN - flags are set to copy both
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- IPv6(src=src, dst=dst, tc=0) /
- UDP(sport=4444, dport=4444) /
- Raw(b'X' * payload_size)
- for i in range(count)]
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / IPv6(src=src, dst=dst, tc=0)
+ / UDP(sport=4444, dport=4444)
+ / Raw(b"X" * payload_size)
+ for i in range(count)
+ ]
def verify_encrypted(self, p, sa, rxs):
# just check that only the DSCP is copied
for rx in rxs:
- self.assertEqual(rx[IP].tos, 0xc)
+ self.assertEqual(rx[IP].tos, 0xC)
def verify_encrypted6(self, p, sa, rxs):
# just check that the DSCP & ECN are copied
@@ -398,17 +511,14 @@ class TestIpsecAhTun2(TemplateIpsecAh, IpsecTun46Tests):
self.assertEqual(rx[IPv6].tc, 0x10)
-class TestIpsecAhHandoff(TemplateIpsecAh,
- IpsecTun6HandoffTests,
- IpsecTun4HandoffTests):
- """ Ipsec AH Handoff """
+class TestIpsecAhHandoff(TemplateIpsecAh, IpsecTun6HandoffTests, IpsecTun4HandoffTests):
+ """Ipsec AH Handoff"""
+
pass
-class TestIpsecAhAll(ConfigIpsecAH,
- IpsecTra4, IpsecTra6,
- IpsecTun4, IpsecTun6):
- """ Ipsec AH all Algos """
+class TestIpsecAhAll(ConfigIpsecAH, IpsecTra4, IpsecTra6, IpsecTun4, IpsecTun6):
+ """Ipsec AH all Algos"""
def setUp(self):
super(TestIpsecAhAll, self).setUp()
@@ -421,21 +531,26 @@ class TestIpsecAhAll(ConfigIpsecAH,
# foreach VPP crypto engine
engines = ["ia32", "ipsecmb", "openssl"]
- algos = [{'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
- IPSEC_API_INTEG_ALG_SHA1_96,
- 'scapy': "HMAC-SHA1-96"},
- {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
- IPSEC_API_INTEG_ALG_SHA_256_128,
- 'scapy': "SHA2-256-128"},
- {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
- IPSEC_API_INTEG_ALG_SHA_384_192,
- 'scapy': "SHA2-384-192"},
- {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
- IPSEC_API_INTEG_ALG_SHA_512_256,
- 'scapy': "SHA2-512-256"}]
-
- flags = [0, (VppEnum.vl_api_ipsec_sad_flags_t.
- IPSEC_API_SAD_FLAG_USE_ESN)]
+ algos = [
+ {
+ "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96,
+ "scapy": "HMAC-SHA1-96",
+ },
+ {
+ "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128,
+ "scapy": "SHA2-256-128",
+ },
+ {
+ "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_384_192,
+ "scapy": "SHA2-384-192",
+ },
+ {
+ "vpp": VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_512_256,
+ "scapy": "SHA2-512-256",
+ },
+ ]
+
+ flags = [0, (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)]
#
# loop through the VPP engines
@@ -454,14 +569,14 @@ class TestIpsecAhAll(ConfigIpsecAH,
self.ipv4_params = IPsecIPv4Params()
self.ipv6_params = IPsecIPv6Params()
- self.params = {self.ipv4_params.addr_type:
- self.ipv4_params,
- self.ipv6_params.addr_type:
- self.ipv6_params}
+ self.params = {
+ self.ipv4_params.addr_type: self.ipv4_params,
+ self.ipv6_params.addr_type: self.ipv6_params,
+ }
for _, p in self.params.items():
- p.auth_algo_vpp_id = algo['vpp']
- p.auth_algo = algo['scapy']
+ p.auth_algo_vpp_id = algo["vpp"]
+ p.auth_algo = algo["scapy"]
p.flags = p.flags | flag
#
@@ -484,5 +599,5 @@ class TestIpsecAhAll(ConfigIpsecAH,
self.unconfig_network()
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)