diff options
author | Neale Ranns <nranns@cisco.com> | 2019-01-24 04:52:25 -0800 |
---|---|---|
committer | Neale Ranns <nranns@cisco.com> | 2019-01-25 19:54:49 +0000 |
commit | 311124e21b9b8ca11073d82408ffb81d98790969 (patch) | |
tree | 22b615488ec5bf0c9aac73a6716bcdeeccd5d80b /test/vpp_ipsec.py | |
parent | 9e47ac54c9fbc6eb7caf0998df50a3114b64ee11 (diff) |
IPSEC: tests use opbject registry
this means we test the dumps - to some extent
Change-Id: I8d90745701012012b41a7b3aaf9be97b4dd2bdf8
Signed-off-by: Neale Ranns <nranns@cisco.com>
Diffstat (limited to 'test/vpp_ipsec.py')
-rw-r--r-- | test/vpp_ipsec.py | 249 |
1 files changed, 249 insertions, 0 deletions
diff --git a/test/vpp_ipsec.py b/test/vpp_ipsec.py new file mode 100644 index 00000000000..8c6da77cba8 --- /dev/null +++ b/test/vpp_ipsec.py @@ -0,0 +1,249 @@ +from vpp_object import * +from ipaddress import ip_address + +try: + text_type = unicode +except NameError: + text_type = str + + +class VppIpsecSpd(VppObject): + """ + VPP SPD DB + """ + + def __init__(self, test, id): + self.test = test + self.id = id + + def add_vpp_config(self): + self.test.vapi.ipsec_spd_add_del(self.id) + self.test.registry.register(self, self.test.logger) + + def remove_vpp_config(self): + self.test.vapi.ipsec_spd_add_del(self.id, is_add=0) + + def __str__(self): + return self.object_id() + + def object_id(self): + return "ipsec-spd-%d" % self.id + + def query_vpp_config(self): + spds = self.test.vapi.ipsec_spds_dump() + for spd in spds: + if spd.spd_id == self.id: + return True + return False + + +class VppIpsecSpdItfBinding(VppObject): + """ + VPP SPD DB to interface binding + (i.e. this SPD is used on this interfce) + """ + + def __init__(self, test, spd, itf): + self.test = test + self.spd = spd + self.itf = itf + + def add_vpp_config(self): + self.test.vapi.ipsec_interface_add_del_spd(self.spd.id, + self.itf.sw_if_index) + self.test.registry.register(self, self.test.logger) + + def remove_vpp_config(self): + self.test.vapi.ipsec_interface_add_del_spd(self.spd.id, + self.itf.sw_if_index, + is_add=0) + + def __str__(self): + return self.object_id() + + def object_id(self): + return "bind-%s-to-%s" % (self.spd.id, self.itf) + + def query_vpp_config(self): + bs = self.test.vapi.ipsec_spd_interface_dump() + for b in bs: + if b.sw_if_index == self.itf.sw_if_index: + return True + return False + + +class VppIpsecSpdEntry(VppObject): + """ + VPP SPD DB Entry + """ + + def __init__(self, test, spd, sa_id, + local_start, local_stop, + remote_start, remote_stop, + proto, + priority=100, + policy=0, + is_outbound=1, + remote_port_start=0, + remote_port_stop=65535, + local_port_start=0, + local_port_stop=65535): + self.test = test + self.spd = spd + self.sa_id = sa_id + self.local_start = ip_address(text_type(local_start)) + self.local_stop = ip_address(text_type(local_stop)) + self.remote_start = ip_address(text_type(remote_start)) + self.remote_stop = ip_address(text_type(remote_stop)) + self.proto = proto + self.is_outbound = is_outbound + self.priority = priority + self.policy = policy + self.is_ipv6 = (0 if self.local_start.version == 4 else 1) + self.local_port_start = local_port_start + self.local_port_stop = local_port_stop + self.remote_port_start = remote_port_start + self.remote_port_stop = remote_port_stop + + def add_vpp_config(self): + self.test.vapi.ipsec_spd_add_del_entry( + self.spd.id, + self.sa_id, + self.local_start.packed, + self.local_stop.packed, + self.remote_start.packed, + self.remote_stop.packed, + protocol=self.proto, + is_ipv6=self.is_ipv6, + is_outbound=self.is_outbound, + priority=self.priority, + policy=self.policy, + local_port_start=self.local_port_start, + local_port_stop=self.local_port_stop, + remote_port_start=self.remote_port_start, + remote_port_stop=self.remote_port_stop) + self.test.registry.register(self, self.test.logger) + + def remove_vpp_config(self): + self.test.vapi.ipsec_spd_add_del_entry( + self.spd.id, + self.sa_id, + self.local_start.packed, + self.local_stop.packed, + self.remote_start.packed, + self.remote_stop.packed, + protocol=self.proto, + is_ipv6=self.is_ipv6, + is_outbound=self.is_outbound, + priority=self.priority, + policy=self.policy, + local_port_start=self.local_port_start, + local_port_stop=self.local_port_stop, + remote_port_start=self.remote_port_start, + remote_port_stop=self.remote_port_stop, + is_add=0) + + def __str__(self): + return self.object_id() + + def object_id(self): + return "spd-entry-%d-%d-%d-%d-%d-%d" % (self.spd.id, + self.priority, + self.policy, + self.is_outbound, + self.is_ipv6, + self.remote_port_start) + + def query_vpp_config(self): + ss = self.test.vapi.ipsec_spd_dump(self.spd.id) + for s in ss: + if s.sa_id == self.sa_id and \ + s.is_outbound == self.is_outbound and \ + s.priority == self.priority and \ + s.policy == self.policy and \ + s.is_ipv6 == self.is_ipv6 and \ + s.remote_start_port == self.remote_port_start: + return True + return False + + +class VppIpsecSA(VppObject): + """ + VPP SAD Entry + """ + + def __init__(self, test, id, spi, + integ_alg, integ_key, + crypto_alg, crypto_key, + proto, + tun_src=None, tun_dst=None, + use_anti_replay=0, + udp_encap=0): + self.test = test + self.id = id + self.spi = spi + self.integ_alg = integ_alg + self.integ_key = integ_key + self.crypto_alg = crypto_alg + self.crypto_key = crypto_key + self.proto = proto + self.is_tunnel = 0 + self.is_tunnel_v6 = 0 + self.tun_src = tun_src + self.tun_dst = tun_dst + if (tun_src): + self.tun_src = ip_address(text_type(tun_src)) + self.is_tunnel = 1 + if (self.tun_src.version == 6): + self.is_tunnel_v6 = 1 + if (tun_dst): + self.tun_dst = ip_address(text_type(tun_dst)) + self.use_anti_replay = use_anti_replay + self.udp_encap = udp_encap + + def add_vpp_config(self): + self.test.vapi.ipsec_sad_add_del_entry( + self.id, + self.spi, + self.integ_alg, + self.integ_key, + self.crypto_alg, + self.crypto_key, + self.proto, + (self.tun_src.packed if self.tun_src else []), + (self.tun_dst.packed if self.tun_dst else []), + is_tunnel=self.is_tunnel, + is_tunnel_ipv6=self.is_tunnel_v6, + use_anti_replay=self.use_anti_replay, + udp_encap=self.udp_encap) + self.test.registry.register(self, self.test.logger) + + def remove_vpp_config(self): + self.test.vapi.ipsec_sad_add_del_entry( + self.id, + self.spi, + self.integ_alg, + self.integ_key, + self.crypto_alg, + self.crypto_key, + self.proto, + (self.tun_src.packed if self.tun_src else []), + (self.tun_dst.packed if self.tun_dst else []), + is_tunnel=self.is_tunnel, + is_tunnel_ipv6=self.is_tunnel_v6, + use_anti_replay=self.use_anti_replay, + udp_encap=self.udp_encap, + is_add=0) + + def __str__(self): + return self.object_id() + + def object_id(self): + return "ipsec-sa-%d" % self.id + + def query_vpp_config(self): + bs = self.test.vapi.ipsec_sa_dump() + for b in bs: + if b.sa_id == self.id: + return True + return False |