diff options
author | Neale Ranns <nranns@cisco.com> | 2019-01-23 08:16:17 -0800 |
---|---|---|
committer | Damjan Marion <dmarion@me.com> | 2019-01-24 19:51:37 +0000 |
commit | 8e4a89bf42196308601de7544abe554df7b0df45 (patch) | |
tree | 622d478674fa634efae76a926f7f87bbaeb6bd35 | |
parent | e18b45caeb22b5dfe38b86be6beea55efaecf40d (diff) |
IPSEC Tests: to per-test setup and tearDown
don't do the setup and teardown in class methods so that with
each test the config is added and deleted. that way we test that
delete actually removes state.
more helpful error codes from VPP for existing IPSEC state.
Change-Id: I5de1578f73b935b420d4cdd85aa98d5fdcc682f6
Signed-off-by: Neale Ranns <nranns@cisco.com>
-rw-r--r-- | src/vnet/ipsec/ipsec.c | 8 | ||||
-rw-r--r-- | test/template_ipsec.py | 54 | ||||
-rw-r--r-- | test/test_ipsec_ah.py | 341 | ||||
-rw-r--r-- | test/test_ipsec_api.py | 44 | ||||
-rw-r--r-- | test/test_ipsec_esp.py | 339 | ||||
-rw-r--r-- | test/test_ipsec_nat.py | 112 | ||||
-rw-r--r-- | test/test_ipsec_tun_if_esp.py | 9 |
7 files changed, 598 insertions, 309 deletions
diff --git a/src/vnet/ipsec/ipsec.c b/src/vnet/ipsec/ipsec.c index fdd18c2f8fa..86de522e3d7 100644 --- a/src/vnet/ipsec/ipsec.c +++ b/src/vnet/ipsec/ipsec.c @@ -99,9 +99,9 @@ ipsec_add_del_spd (vlib_main_t * vm, u32 spd_id, int is_add) p = hash_get (im->spd_index_by_spd_id, spd_id); if (p && is_add) - return VNET_API_ERROR_INVALID_VALUE; + return VNET_API_ERROR_ENTRY_ALREADY_EXISTS; if (!p && !is_add) - return VNET_API_ERROR_INVALID_VALUE; + return VNET_API_ERROR_NO_SUCH_ENTRY; if (!is_add) /* delete */ { @@ -441,9 +441,9 @@ ipsec_add_del_sa (vlib_main_t * vm, ipsec_sa_t * new_sa, int is_add) p = hash_get (im->sa_index_by_sa_id, new_sa->id); if (p && is_add) - return VNET_API_ERROR_SYSCALL_ERROR_1; /* already exists */ + return VNET_API_ERROR_ENTRY_ALREADY_EXISTS; if (!p && !is_add) - return VNET_API_ERROR_SYSCALL_ERROR_1; + return VNET_API_ERROR_NO_SUCH_ENTRY; if (!is_add) /* delete */ { diff --git a/test/template_ipsec.py b/test/template_ipsec.py index d35cf420d37..ed7c1a32129 100644 --- a/test/template_ipsec.py +++ b/test/template_ipsec.py @@ -82,39 +82,46 @@ class TemplateIpsec(VppTestCase): |tun_if| -------> |VPP| ------> |pg1| ------ --- --- """ - ipv4_params = IPsecIPv4Params() - ipv6_params = IPsecIPv6Params() - params = {ipv4_params.addr_type: ipv4_params, - ipv6_params.addr_type: ipv6_params} - payload = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX" + def ipsec_select_backend(self): + """ empty method to be overloaded when necessary """ + pass - tun_spd_id = 1 - tra_spd_id = 2 + def setUp(self): + super(TemplateIpsec, self).setUp() - vpp_esp_protocol = 1 - vpp_ah_protocol = 0 + self.ipv4_params = IPsecIPv4Params() + self.ipv6_params = IPsecIPv6Params() + self.params = {self.ipv4_params.addr_type: self.ipv4_params, + self.ipv6_params.addr_type: self.ipv6_params} - @classmethod - def ipsec_select_backend(cls): - """ empty method to be overloaded when necessary """ - pass + self.payload = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"\ + "XXXXXXXXXXXXXXXXXXXXX" - @classmethod - def setUpClass(cls): - super(TemplateIpsec, cls).setUpClass() - cls.create_pg_interfaces(range(3)) - cls.interfaces = list(cls.pg_interfaces) - for i in cls.interfaces: + self.tun_spd_id = 1 + self.tra_spd_id = 2 + + self.vpp_esp_protocol = 1 + self.vpp_ah_protocol = 0 + + self.create_pg_interfaces(range(3)) + self.interfaces = list(self.pg_interfaces) + for i in self.interfaces: i.admin_up() i.config_ip4() i.resolve_arp() i.config_ip6() i.resolve_ndp() - cls.ipsec_select_backend() + self.ipsec_select_backend() def tearDown(self): super(TemplateIpsec, self).tearDown() + + for i in self.interfaces: + i.admin_down() + i.unconfig_ip4() + i.unconfig_ip6() + if not self.vpp_dead: self.vapi.cli("show hardware") @@ -158,15 +165,14 @@ class TemplateIpsec(VppTestCase): src=self.tun_if.local_addr[params.addr_type])) return vpp_tun_sa, scapy_tun_sa - @classmethod - def configure_sa_tra(cls, params): - params.scapy_tra_sa = SecurityAssociation(cls.encryption_type, + def configure_sa_tra(self, params): + params.scapy_tra_sa = SecurityAssociation(self.encryption_type, spi=params.vpp_tra_spi, crypt_algo=params.crypt_algo, crypt_key=params.crypt_key, auth_algo=params.auth_algo, auth_key=params.auth_key) - params.vpp_tra_sa = SecurityAssociation(cls.encryption_type, + params.vpp_tra_sa = SecurityAssociation(self.encryption_type, spi=params.scapy_tra_spi, crypt_algo=params.crypt_algo, crypt_key=params.crypt_key, diff --git a/test/test_ipsec_ah.py b/test/test_ipsec_ah.py index 928cd53c1f1..9a1c32d9260 100644 --- a/test/test_ipsec_ah.py +++ b/test/test_ipsec_ah.py @@ -29,35 +29,33 @@ class TemplateIpsecAh(TemplateIpsec): --- --- --- """ - encryption_type = AH - - @classmethod - def setUpClass(cls): - super(TemplateIpsecAh, cls).setUpClass() - cls.tun_if = cls.pg0 - cls.tra_if = cls.pg2 - cls.logger.info(cls.vapi.ppcli("show int addr")) - cls.vapi.ipsec_spd_add_del(cls.tun_spd_id) - cls.vapi.ipsec_interface_add_del_spd(cls.tun_spd_id, - cls.tun_if.sw_if_index) - cls.vapi.ipsec_spd_add_del(cls.tra_spd_id) - cls.vapi.ipsec_interface_add_del_spd(cls.tra_spd_id, - cls.tra_if.sw_if_index) - for _, p in cls.params.items(): - cls.config_ah_tra(p) - cls.configure_sa_tra(p) - cls.logger.info(cls.vapi.ppcli("show ipsec")) - for _, p in cls.params.items(): - cls.config_ah_tun(p) - cls.logger.info(cls.vapi.ppcli("show ipsec")) - for _, p in cls.params.items(): + def setUp(self): + super(TemplateIpsecAh, self).setUp() + + self.encryption_type = AH + self.tun_if = self.pg0 + self.tra_if = self.pg2 + self.logger.info(self.vapi.ppcli("show int addr")) + self.vapi.ipsec_spd_add_del(self.tun_spd_id) + self.vapi.ipsec_interface_add_del_spd(self.tun_spd_id, + self.tun_if.sw_if_index) + self.vapi.ipsec_spd_add_del(self.tra_spd_id) + self.vapi.ipsec_interface_add_del_spd(self.tra_spd_id, + self.tra_if.sw_if_index) + for _, p in self.params.items(): + self.config_ah_tra(p) + self.configure_sa_tra(p) + self.logger.info(self.vapi.ppcli("show ipsec")) + for _, p in self.params.items(): + self.config_ah_tun(p) + self.logger.info(self.vapi.ppcli("show ipsec")) + for _, p in self.params.items(): src = socket.inet_pton(p.addr_type, p.remote_tun_if_host) - cls.vapi.ip_add_del_route(src, p.addr_len, - cls.tun_if.remote_addr_n[p.addr_type], - is_ipv6=p.is_ipv6) + self.vapi.ip_add_del_route(src, p.addr_len, + self.tun_if.remote_addr_n[p.addr_type], + is_ipv6=p.is_ipv6) - @classmethod - def config_ah_tun(cls, params): + def config_ah_tun(self, params): addr_type = params.addr_type is_ipv6 = params.is_ipv6 scapy_tun_sa_id = params.scapy_tun_sa_id @@ -71,54 +69,120 @@ class TemplateIpsecAh(TemplateIpsec): remote_tun_if_host = params.remote_tun_if_host addr_any = params.addr_any addr_bcast = params.addr_bcast - cls.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - cls.vpp_ah_protocol, - cls.tun_if.local_addr_n[addr_type], - cls.tun_if.remote_addr_n[addr_type], - is_tunnel=1, is_tunnel_ipv6=is_ipv6) - cls.vapi.ipsec_sad_add_del_entry(vpp_tun_sa_id, vpp_tun_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - cls.vpp_ah_protocol, - cls.tun_if.remote_addr_n[addr_type], - cls.tun_if.local_addr_n[addr_type], - is_tunnel=1, is_tunnel_ipv6=is_ipv6) + self.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi, + auth_algo_vpp_id, auth_key, + crypt_algo_vpp_id, crypt_key, + self.vpp_ah_protocol, + self.tun_if.local_addr_n[addr_type], + self.tun_if.remote_addr_n[addr_type], + is_tunnel=1, is_tunnel_ipv6=is_ipv6) + self.vapi.ipsec_sad_add_del_entry(vpp_tun_sa_id, vpp_tun_spi, + auth_algo_vpp_id, auth_key, + crypt_algo_vpp_id, crypt_key, + self.vpp_ah_protocol, + self.tun_if.remote_addr_n[addr_type], + self.tun_if.local_addr_n[addr_type], + is_tunnel=1, is_tunnel_ipv6=is_ipv6) l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any) l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast) - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, vpp_tun_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, is_ipv6=is_ipv6, - protocol=socket.IPPROTO_AH) - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, vpp_tun_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, is_outbound=0, - is_ipv6=is_ipv6, - protocol=socket.IPPROTO_AH) + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, is_ipv6=is_ipv6, + protocol=socket.IPPROTO_AH) + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, is_outbound=0, + is_ipv6=is_ipv6, + protocol=socket.IPPROTO_AH) l_startaddr = l_stopaddr = socket.inet_pton(addr_type, remote_tun_if_host) - r_startaddr = r_stopaddr = cls.pg1.remote_addr_n[addr_type] - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, vpp_tun_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, priority=10, policy=3, - is_outbound=0, is_ipv6=is_ipv6) - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id, - r_startaddr, r_stopaddr, l_startaddr, - l_stopaddr, priority=10, policy=3, - is_ipv6=is_ipv6) - r_startaddr = r_stopaddr = cls.pg0.local_addr_n[addr_type] - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, vpp_tun_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, priority=20, policy=3, - is_outbound=0, is_ipv6=is_ipv6) - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id, - r_startaddr, r_stopaddr, l_startaddr, - l_stopaddr, priority=20, policy=3, - is_ipv6=is_ipv6) - - @classmethod - def config_ah_tra(cls, params): + r_startaddr = r_stopaddr = self.pg1.remote_addr_n[addr_type] + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, priority=10, policy=3, + is_outbound=0, is_ipv6=is_ipv6) + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id, + r_startaddr, r_stopaddr, l_startaddr, + l_stopaddr, priority=10, policy=3, + is_ipv6=is_ipv6) + r_startaddr = r_stopaddr = self.pg0.local_addr_n[addr_type] + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, priority=20, policy=3, + is_outbound=0, is_ipv6=is_ipv6) + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id, + r_startaddr, r_stopaddr, l_startaddr, + l_stopaddr, priority=20, policy=3, + is_ipv6=is_ipv6) + + def unconfig_ah_tun(self, params): + addr_type = params.addr_type + is_ipv6 = params.is_ipv6 + scapy_tun_sa_id = params.scapy_tun_sa_id + scapy_tun_spi = params.scapy_tun_spi + vpp_tun_sa_id = params.vpp_tun_sa_id + vpp_tun_spi = params.vpp_tun_spi + auth_algo_vpp_id = params.auth_algo_vpp_id + auth_key = params.auth_key + crypt_algo_vpp_id = params.crypt_algo_vpp_id + crypt_key = params.crypt_key + remote_tun_if_host = params.remote_tun_if_host + addr_any = params.addr_any + addr_bcast = params.addr_bcast + l_startaddr = l_stopaddr = socket.inet_pton(addr_type, + remote_tun_if_host) + r_startaddr = r_stopaddr = self.pg0.local_addr_n[addr_type] + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, priority=20, policy=3, + is_outbound=0, is_ipv6=is_ipv6, + is_add=0) + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id, + r_startaddr, r_stopaddr, l_startaddr, + l_stopaddr, priority=20, policy=3, + is_ipv6=is_ipv6, + is_add=0) + r_startaddr = r_stopaddr = self.pg1.remote_addr_n[addr_type] + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, priority=10, policy=3, + is_outbound=0, is_ipv6=is_ipv6, + is_add=0) + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id, + r_startaddr, r_stopaddr, l_startaddr, + l_stopaddr, priority=10, policy=3, + is_ipv6=is_ipv6, is_add=0) + l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any) + l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast) + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, is_ipv6=is_ipv6, + protocol=socket.IPPROTO_AH, + is_add=0) + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, is_outbound=0, + is_ipv6=is_ipv6, + protocol=socket.IPPROTO_AH, + is_add=0) + self.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi, + auth_algo_vpp_id, auth_key, + crypt_algo_vpp_id, crypt_key, + self.vpp_ah_protocol, + self.tun_if.local_addr_n[addr_type], + self.tun_if.remote_addr_n[addr_type], + is_tunnel=1, is_tunnel_ipv6=is_ipv6, + is_add=0) + self.vapi.ipsec_sad_add_del_entry(vpp_tun_sa_id, vpp_tun_spi, + auth_algo_vpp_id, auth_key, + crypt_algo_vpp_id, crypt_key, + self.vpp_ah_protocol, + self.tun_if.remote_addr_n[addr_type], + self.tun_if.local_addr_n[addr_type], + is_tunnel=1, is_tunnel_ipv6=is_ipv6, + is_add=0) + + def config_ah_tra(self, params): addr_type = params.addr_type is_ipv6 = params.is_ipv6 scapy_tra_sa_id = params.scapy_tra_sa_id @@ -131,41 +195,114 @@ class TemplateIpsecAh(TemplateIpsec): crypt_key = params.crypt_key addr_any = params.addr_any addr_bcast = params.addr_bcast - cls.vapi.ipsec_sad_add_del_entry(scapy_tra_sa_id, scapy_tra_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - cls.vpp_ah_protocol, is_tunnel=0, - is_tunnel_ipv6=0, - use_anti_replay=1) - cls.vapi.ipsec_sad_add_del_entry(vpp_tra_sa_id, vpp_tra_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - cls.vpp_ah_protocol, is_tunnel=0, - is_tunnel_ipv6=0, - use_anti_replay=1) + self.vapi.ipsec_sad_add_del_entry(scapy_tra_sa_id, scapy_tra_spi, + auth_algo_vpp_id, auth_key, + crypt_algo_vpp_id, crypt_key, + self.vpp_ah_protocol, is_tunnel=0, + is_tunnel_ipv6=0, + use_anti_replay=1) + self.vapi.ipsec_sad_add_del_entry(vpp_tra_sa_id, vpp_tra_spi, + auth_algo_vpp_id, auth_key, + crypt_algo_vpp_id, crypt_key, + self.vpp_ah_protocol, is_tunnel=0, + is_tunnel_ipv6=0, + use_anti_replay=1) l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any) l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast) - cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, vpp_tra_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, is_ipv6=is_ipv6, - protocol=socket.IPPROTO_AH) - cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, scapy_tra_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, is_outbound=0, - is_ipv6=is_ipv6, - protocol=socket.IPPROTO_AH) - l_startaddr = l_stopaddr = cls.tra_if.local_addr_n[addr_type] - r_startaddr = r_stopaddr = cls.tra_if.remote_addr_n[addr_type] - cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, vpp_tra_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, priority=10, policy=3, - is_outbound=0, is_ipv6=is_ipv6) - cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, scapy_tra_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, priority=10, - policy=3, is_ipv6=is_ipv6) + self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, vpp_tra_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, is_ipv6=is_ipv6, + protocol=socket.IPPROTO_AH) + self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, scapy_tra_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, is_outbound=0, + is_ipv6=is_ipv6, + protocol=socket.IPPROTO_AH) + l_startaddr = l_stopaddr = self.tra_if.local_addr_n[addr_type] + r_startaddr = r_stopaddr = self.tra_if.remote_addr_n[addr_type] + self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, vpp_tra_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, priority=10, policy=3, + is_outbound=0, is_ipv6=is_ipv6) + self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, scapy_tra_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, priority=10, + policy=3, is_ipv6=is_ipv6) + + def unconfig_ah_tra(self, params): + addr_type = params.addr_type + is_ipv6 = params.is_ipv6 + scapy_tra_sa_id = params.scapy_tra_sa_id + scapy_tra_spi = params.scapy_tra_spi + vpp_tra_sa_id = params.vpp_tra_sa_id + vpp_tra_spi = params.vpp_tra_spi + auth_algo_vpp_id = params.auth_algo_vpp_id + auth_key = params.auth_key + crypt_algo_vpp_id = params.crypt_algo_vpp_id + crypt_key = params.crypt_key + addr_any = params.addr_any + addr_bcast = params.addr_bcast + l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any) + l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast) + self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, vpp_tra_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, is_ipv6=is_ipv6, + protocol=socket.IPPROTO_AH, + is_add=0) + self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, scapy_tra_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, is_outbound=0, + is_ipv6=is_ipv6, + protocol=socket.IPPROTO_AH, + is_add=0) + l_startaddr = l_stopaddr = self.tra_if.local_addr_n[addr_type] + r_startaddr = r_stopaddr = self.tra_if.remote_addr_n[addr_type] + self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, vpp_tra_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, priority=10, policy=3, + is_outbound=0, is_ipv6=is_ipv6, + is_add=0) + self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, scapy_tra_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, priority=10, + policy=3, is_ipv6=is_ipv6, + is_add=0) + self.vapi.ipsec_sad_add_del_entry(scapy_tra_sa_id, scapy_tra_spi, + auth_algo_vpp_id, auth_key, + crypt_algo_vpp_id, crypt_key, + self.vpp_ah_protocol, is_tunnel=0, + is_tunnel_ipv6=0, + use_anti_replay=1, + is_add=0) + self.vapi.ipsec_sad_add_del_entry(vpp_tra_sa_id, vpp_tra_spi, + auth_algo_vpp_id, auth_key, + crypt_algo_vpp_id, crypt_key, + self.vpp_ah_protocol, is_tunnel=0, + is_tunnel_ipv6=0, + use_anti_replay=1, + is_add=0) def tearDown(self): + for _, p in self.params.items(): + self.unconfig_ah_tun(p) + for _, p in self.params.items(): + self.unconfig_ah_tra(p) + + self.vapi.ipsec_interface_add_del_spd(self.tun_spd_id, + self.tun_if.sw_if_index, + is_add=0) + self.vapi.ipsec_spd_add_del(self.tun_spd_id, is_add=0) + self.vapi.ipsec_interface_add_del_spd(self.tra_spd_id, + self.tra_if.sw_if_index, + is_add=0) + self.vapi.ipsec_spd_add_del(self.tra_spd_id, + is_add=0) + for _, p in self.params.items(): + src = socket.inet_pton(p.addr_type, p.remote_tun_if_host) + self.vapi.ip_add_del_route( + src, p.addr_len, self.tun_if.remote_addr_n[p.addr_type], + is_ipv6=p.is_ipv6, is_add=0) + super(TemplateIpsecAh, self).tearDown() if not self.vpp_dead: self.vapi.cli("show hardware") diff --git a/test/test_ipsec_api.py b/test/test_ipsec_api.py index fed996e6a59..afad2b1de1f 100644 --- a/test/test_ipsec_api.py +++ b/test/test_ipsec_api.py @@ -1,47 +1,55 @@ import unittest from framework import VppTestCase, VppTestRunner -from template_ipsec import TemplateIpsec +from template_ipsec import TemplateIpsec, IPsecIPv4Params class IpsecApiTestCase(VppTestCase): """ IPSec API tests """ - @classmethod - def setUpClass(cls): - super(IpsecApiTestCase, cls).setUpClass() - cls.create_pg_interfaces([0]) - cls.pg0.config_ip4() - cls.pg0.admin_up() + def setUp(self): + super(IpsecApiTestCase, self).setUp() + self.create_pg_interfaces([0]) + self.pg0.config_ip4() + self.pg0.admin_up() + + self.vpp_esp_protocol = 1 + self.vpp_ah_protocol = 0 + self.ipv4_params = IPsecIPv4Params() + + def tearDown(self): + self.pg0.unconfig_ip4() + self.pg0.admin_down() + super(IpsecApiTestCase, self).tearDown() def test_backend_dump(self): """ backend dump """ d = self.vapi.ipsec_backend_dump() self.assert_equal(len(d), 2, "number of ipsec backends in dump") - self.assert_equal(d[0].protocol, TemplateIpsec.vpp_ah_protocol, + self.assert_equal(d[0].protocol, self.vpp_ah_protocol, "ipsec protocol in dump entry") self.assert_equal(d[0].index, 0, "index in dump entry") self.assert_equal(d[0].active, 1, "active flag in dump entry") - self.assert_equal(d[1].protocol, TemplateIpsec.vpp_esp_protocol, + self.assert_equal(d[1].protocol, self.vpp_esp_protocol, "ipsec protocol in dump entry") self.assert_equal(d[1].index, 0, "index in dump entry") self.assert_equal(d[1].active, 1, "active flag in dump entry") def test_select_valid_backend(self): """ select valid backend """ - self.vapi.ipsec_select_backend(TemplateIpsec.vpp_ah_protocol, 0) - self.vapi.ipsec_select_backend(TemplateIpsec.vpp_esp_protocol, 0) + self.vapi.ipsec_select_backend(self.vpp_ah_protocol, 0) + self.vapi.ipsec_select_backend(self.vpp_esp_protocol, 0) def test_select_invalid_backend(self): """ select invalid backend """ with self.vapi.assert_negative_api_retval(): - self.vapi.ipsec_select_backend(TemplateIpsec.vpp_ah_protocol, 200) + self.vapi.ipsec_select_backend(self.vpp_ah_protocol, 200) with self.vapi.assert_negative_api_retval(): - self.vapi.ipsec_select_backend(TemplateIpsec.vpp_esp_protocol, 200) + self.vapi.ipsec_select_backend(self.vpp_esp_protocol, 200) def test_select_backend_in_use(self): """ attempt to change backend while sad configured """ - params = TemplateIpsec.ipv4_params + params = self.ipv4_params addr_type = params.addr_type is_ipv6 = params.is_ipv6 scapy_tun_sa_id = params.scapy_tun_sa_id @@ -54,24 +62,24 @@ class IpsecApiTestCase(VppTestCase): self.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi, auth_algo_vpp_id, auth_key, crypt_algo_vpp_id, crypt_key, - TemplateIpsec.vpp_ah_protocol, + self.vpp_ah_protocol, self.pg0.local_addr_n[addr_type], self.pg0.remote_addr_n[addr_type], is_tunnel=1, is_tunnel_ipv6=is_ipv6) with self.vapi.assert_negative_api_retval(): self.vapi.ipsec_select_backend( - protocol=TemplateIpsec.vpp_ah_protocol, index=0) + protocol=self.vpp_ah_protocol, index=0) self.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi, auth_algo_vpp_id, auth_key, crypt_algo_vpp_id, crypt_key, - TemplateIpsec.vpp_ah_protocol, + self.vpp_ah_protocol, self.pg0.local_addr_n[addr_type], self.pg0.remote_addr_n[addr_type], is_tunnel=1, is_tunnel_ipv6=is_ipv6, is_add=0) self.vapi.ipsec_select_backend( - protocol=TemplateIpsec.vpp_ah_protocol, index=0) + protocol=self.vpp_ah_protocol, index=0) if __name__ == '__main__': diff --git a/test/test_ipsec_esp.py b/test/test_ipsec_esp.py index d22f965a31b..369e9ca1a8d 100644 --- a/test/test_ipsec_esp.py +++ b/test/test_ipsec_esp.py @@ -38,35 +38,119 @@ class TemplateIpsecEsp(TemplateIpsec): --- --- --- """ - encryption_type = ESP - - @classmethod - def setUpClass(cls): - super(TemplateIpsecEsp, cls).setUpClass() - cls.tun_if = cls.pg0 - cls.tra_if = cls.pg2 - cls.logger.info(cls.vapi.ppcli("show int addr")) - cls.vapi.ipsec_spd_add_del(cls.tra_spd_id) - cls.vapi.ipsec_interface_add_del_spd(cls.tra_spd_id, - cls.tra_if.sw_if_index) - for _, p in cls.params.items(): - cls.config_esp_tra(p) - cls.configure_sa_tra(p) - cls.logger.info(cls.vapi.ppcli("show ipsec")) - cls.vapi.ipsec_spd_add_del(cls.tun_spd_id) - cls.vapi.ipsec_interface_add_del_spd(cls.tun_spd_id, - cls.tun_if.sw_if_index) - for _, p in cls.params.items(): - cls.config_esp_tun(p) - cls.logger.info(cls.vapi.ppcli("show ipsec")) - for _, p in cls.params.items(): + def setUp(self): + super(TemplateIpsecEsp, self).setUp() + self.encryption_type = ESP + self.tun_if = self.pg0 + self.tra_if = self.pg2 + self.logger.info(self.vapi.ppcli("show int addr")) + self.vapi.ipsec_spd_add_del(self.tra_spd_id) + self.vapi.ipsec_interface_add_del_spd(self.tra_spd_id, + self.tra_if.sw_if_index) + for _, p in self.params.items(): + self.config_esp_tra(p) + self.configure_sa_tra(p) + self.logger.info(self.vapi.ppcli("show ipsec")) + self.vapi.ipsec_spd_add_del(self.tun_spd_id) + self.vapi.ipsec_interface_add_del_spd(self.tun_spd_id, + self.tun_if.sw_if_index) + for _, p in self.params.items(): + self.config_esp_tun(p) + self.logger.info(self.vapi.ppcli("show ipsec")) + for _, p in self.params.items(): src = socket.inet_pton(p.addr_type, p.remote_tun_if_host) - cls.vapi.ip_add_del_route( - src, p.addr_len, cls.tun_if.remote_addr_n[p.addr_type], + self.vapi.ip_add_del_route( + src, p.addr_len, self.tun_if.remote_addr_n[p.addr_type], is_ipv6=p.is_ipv6) - @classmethod - def config_esp_tun(cls, params): + def tearDown(self): + for _, p in self.params.items(): + self.unconfig_esp_tun(p) + for _, p in self.params.items(): + self.unconfig_esp_tra(p) + + self.vapi.ipsec_interface_add_del_spd(self.tun_spd_id, + self.tun_if.sw_if_index, + is_add=0) + self.vapi.ipsec_spd_add_del(self.tun_spd_id, is_add=0) + self.vapi.ipsec_interface_add_del_spd(self.tra_spd_id, + self.tra_if.sw_if_index, + is_add=0) + self.vapi.ipsec_spd_add_del(self.tra_spd_id, + is_add=0) + for _, p in self.params.items(): + src = socket.inet_pton(p.addr_type, p.remote_tun_if_host) + self.vapi.ip_add_del_route( + src, p.addr_len, self.tun_if.remote_addr_n[p.addr_type], + is_ipv6=p.is_ipv6, is_add=0) + + super(TemplateIpsecEsp, self).tearDown() + if not self.vpp_dead: + self.vapi.cli("show hardware") + + def config_esp_tun(self, params): + addr_type = params.addr_type + is_ipv6 = params.is_ipv6 + scapy_tun_sa_id = params.scapy_tun_sa_id + scapy_tun_spi = params.scapy_tun_spi + vpp_tun_sa_id = params.vpp_tun_sa_id + vpp_tun_spi = params.vpp_tun_spi + auth_algo_vpp_id = params.auth_algo_vpp_id + auth_key = params.auth_key + crypt_algo_vpp_id = params.crypt_algo_vpp_id + crypt_key = params.crypt_key + remote_tun_if_host = params.remote_tun_if_host + addr_any = params.addr_any + addr_bcast = params.addr_bcast + self.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi, + auth_algo_vpp_id, auth_key, + crypt_algo_vpp_id, crypt_key, + self.vpp_esp_protocol, + self.tun_if.local_addr_n[addr_type], + self.tun_if.remote_addr_n[addr_type], + is_tunnel=1, is_tunnel_ipv6=is_ipv6) + self.vapi.ipsec_sad_add_del_entry(vpp_tun_sa_id, vpp_tun_spi, + auth_algo_vpp_id, auth_key, + crypt_algo_vpp_id, crypt_key, + self.vpp_esp_protocol, + self.tun_if.remote_addr_n[addr_type], + self.tun_if.local_addr_n[addr_type], + is_tunnel=1, is_tunnel_ipv6=is_ipv6) + l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any) + l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast) + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, is_ipv6=is_ipv6, + protocol=socket.IPPROTO_ESP) + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, is_outbound=0, + protocol=socket.IPPROTO_ESP, + is_ipv6=is_ipv6) + l_startaddr = l_stopaddr = socket.inet_pton(addr_type, + remote_tun_if_host) + r_startaddr = r_stopaddr = self.pg1.remote_addr_n[addr_type] + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, priority=10, policy=3, + is_ipv6=is_ipv6, is_outbound=0) + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id, + r_startaddr, r_stopaddr, l_startaddr, + l_stopaddr, priority=10, policy=3, + is_ipv6=is_ipv6) + l_startaddr = l_stopaddr = socket.inet_pton(addr_type, + remote_tun_if_host) + r_startaddr = r_stopaddr = self.pg0.local_addr_n[addr_type] + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, priority=20, policy=3, + is_outbound=0, is_ipv6=is_ipv6) + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id, + r_startaddr, r_stopaddr, l_startaddr, + l_stopaddr, priority=20, policy=3, + is_ipv6=is_ipv6) + + def unconfig_esp_tun(self, params): addr_type = params.addr_type is_ipv6 = params.is_ipv6 scapy_tun_sa_id = params.scapy_tun_sa_id @@ -80,56 +164,107 @@ class TemplateIpsecEsp(TemplateIpsec): remote_tun_if_host = params.remote_tun_if_host addr_any = params.addr_any addr_bcast = params.addr_bcast - cls.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - cls.vpp_esp_protocol, - cls.tun_if.local_addr_n[addr_type], - cls.tun_if.remote_addr_n[addr_type], - is_tunnel=1, is_tunnel_ipv6=is_ipv6) - cls.vapi.ipsec_sad_add_del_entry(vpp_tun_sa_id, vpp_tun_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - cls.vpp_esp_protocol, - cls.tun_if.remote_addr_n[addr_type], - cls.tun_if.local_addr_n[addr_type], - is_tunnel=1, is_tunnel_ipv6=is_ipv6) l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any) l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast) - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, is_ipv6=is_ipv6, - protocol=socket.IPPROTO_ESP) - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, is_outbound=0, - protocol=socket.IPPROTO_ESP, - is_ipv6=is_ipv6) + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, is_ipv6=is_ipv6, + protocol=socket.IPPROTO_ESP, + is_add=0) + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, is_outbound=0, + protocol=socket.IPPROTO_ESP, + is_ipv6=is_ipv6, + is_add=0) l_startaddr = l_stopaddr = socket.inet_pton(addr_type, remote_tun_if_host) - r_startaddr = r_stopaddr = cls.pg1.remote_addr_n[addr_type] - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, vpp_tun_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, priority=10, policy=3, - is_ipv6=is_ipv6, is_outbound=0) - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id, - r_startaddr, r_stopaddr, l_startaddr, - l_stopaddr, priority=10, policy=3, - is_ipv6=is_ipv6) + r_startaddr = r_stopaddr = self.pg1.remote_addr_n[addr_type] + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, priority=10, policy=3, + is_ipv6=is_ipv6, is_outbound=0, + is_add=0) + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id, + r_startaddr, r_stopaddr, l_startaddr, + l_stopaddr, priority=10, policy=3, + is_ipv6=is_ipv6, is_add=0) l_startaddr = l_stopaddr = socket.inet_pton(addr_type, remote_tun_if_host) - r_startaddr = r_stopaddr = cls.pg0.local_addr_n[addr_type] - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, vpp_tun_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, priority=20, policy=3, - is_outbound=0, is_ipv6=is_ipv6) - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id, - r_startaddr, r_stopaddr, l_startaddr, - l_stopaddr, priority=20, policy=3, - is_ipv6=is_ipv6) - - @classmethod - def config_esp_tra(cls, params): + r_startaddr = r_stopaddr = self.pg0.local_addr_n[addr_type] + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, priority=20, policy=3, + is_outbound=0, is_ipv6=is_ipv6, + is_add=0) + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id, + r_startaddr, r_stopaddr, l_startaddr, + l_stopaddr, priority=20, policy=3, + is_ipv6=is_ipv6, + is_add=0) + self.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi, + auth_algo_vpp_id, auth_key, + crypt_algo_vpp_id, crypt_key, + self.vpp_esp_protocol, + self.tun_if.local_addr_n[addr_type], + self.tun_if.remote_addr_n[addr_type], + is_tunnel=1, is_tunnel_ipv6=is_ipv6, + is_add=0) + self.vapi.ipsec_sad_add_del_entry(vpp_tun_sa_id, vpp_tun_spi, + auth_algo_vpp_id, auth_key, + crypt_algo_vpp_id, crypt_key, + self.vpp_esp_protocol, + self.tun_if.remote_addr_n[addr_type], + self.tun_if.local_addr_n[addr_type], + is_tunnel=1, is_tunnel_ipv6=is_ipv6, + is_add=0) + + def config_esp_tra(self, params): + addr_type = params.addr_type + is_ipv6 = params.is_ipv6 + scapy_tra_sa_id = params.scapy_tra_sa_id + scapy_tra_spi = params.scapy_tra_spi + vpp_tra_sa_id = params.vpp_tra_sa_id + vpp_tra_spi = params.vpp_tra_spi + auth_algo_vpp_id = params.auth_algo_vpp_id + auth_key = params.auth_key + crypt_algo_vpp_id = params.crypt_algo_vpp_id + crypt_key = params.crypt_key + addr_any = params.addr_any + addr_bcast = params.addr_bcast + self.vapi.ipsec_sad_add_del_entry(scapy_tra_sa_id, scapy_tra_spi, + auth_algo_vpp_id, auth_key, + crypt_algo_vpp_id, crypt_key, + self.vpp_esp_protocol, is_tunnel=0, + use_anti_replay=1) + self.vapi.ipsec_sad_add_del_entry(vpp_tra_sa_id, vpp_tra_spi, + auth_algo_vpp_id, auth_key, + crypt_algo_vpp_id, crypt_key, + self.vpp_esp_protocol, is_tunnel=0, + use_anti_replay=1) + l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any) + l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast) + self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, vpp_tra_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, is_ipv6=is_ipv6, + protocol=socket.IPPROTO_ESP) + self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, vpp_tra_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, is_outbound=0, + is_ipv6=is_ipv6, + protocol=socket.IPPROTO_ESP) + l_startaddr = l_stopaddr = self.tra_if.local_addr_n[addr_type] + r_startaddr = r_stopaddr = self.tra_if.remote_addr_n[addr_type] + self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, vpp_tra_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, priority=10, policy=3, + is_outbound=0, is_ipv6=is_ipv6) + self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, scapy_tra_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, priority=10, policy=3, + is_ipv6=is_ipv6) + + def unconfig_esp_tra(self, params): addr_type = params.addr_type is_ipv6 = params.is_ipv6 scapy_tra_sa_id = params.scapy_tra_sa_id @@ -142,37 +277,43 @@ class TemplateIpsecEsp(TemplateIpsec): crypt_key = params.crypt_key addr_any = params.addr_any addr_bcast = params.addr_bcast - cls.vapi.ipsec_sad_add_del_entry(scapy_tra_sa_id, scapy_tra_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - cls.vpp_esp_protocol, is_tunnel=0, - use_anti_replay=1) - cls.vapi.ipsec_sad_add_del_entry(vpp_tra_sa_id, vpp_tra_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - cls.vpp_esp_protocol, is_tunnel=0, - use_anti_replay=1) l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any) l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast) - cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, vpp_tra_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, is_ipv6=is_ipv6, - protocol=socket.IPPROTO_ESP) - cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, vpp_tra_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, is_outbound=0, - is_ipv6=is_ipv6, - protocol=socket.IPPROTO_ESP) - l_startaddr = l_stopaddr = cls.tra_if.local_addr_n[addr_type] - r_startaddr = r_stopaddr = cls.tra_if.remote_addr_n[addr_type] - cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, vpp_tra_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, priority=10, policy=3, - is_outbound=0, is_ipv6=is_ipv6) - cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, scapy_tra_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, priority=10, policy=3, - is_ipv6=is_ipv6) + self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, vpp_tra_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, is_ipv6=is_ipv6, + protocol=socket.IPPROTO_ESP, + is_add=0) + self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, vpp_tra_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, is_outbound=0, + is_ipv6=is_ipv6, + protocol=socket.IPPROTO_ESP, + is_add=0) + l_startaddr = l_stopaddr = self.tra_if.local_addr_n[addr_type] + r_startaddr = r_stopaddr = self.tra_if.remote_addr_n[addr_type] + self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, vpp_tra_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, priority=10, policy=3, + is_outbound=0, is_ipv6=is_ipv6, + is_add=0) + self.vapi.ipsec_spd_add_del_entry(self.tra_spd_id, scapy_tra_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, priority=10, policy=3, + is_ipv6=is_ipv6, + is_add=0) + self.vapi.ipsec_sad_add_del_entry(scapy_tra_sa_id, scapy_tra_spi, + auth_algo_vpp_id, auth_key, + crypt_algo_vpp_id, crypt_key, + self.vpp_esp_protocol, is_tunnel=0, + use_anti_replay=1, + is_add=0) + self.vapi.ipsec_sad_add_del_entry(vpp_tra_sa_id, vpp_tra_spi, + auth_algo_vpp_id, auth_key, + crypt_algo_vpp_id, crypt_key, + self.vpp_esp_protocol, is_tunnel=0, + use_anti_replay=1, + is_add=0) class TestIpsecEsp1(TemplateIpsecEsp, IpsecTraTests, IpsecTunTests): diff --git a/test/test_ipsec_nat.py b/test/test_ipsec_nat.py index e9efa032a13..05befe481b4 100644 --- a/test/test_ipsec_nat.py +++ b/test/test_ipsec_nat.py @@ -31,20 +31,19 @@ class IPSecNATTestCase(TemplateIpsec): icmp_id_in = 6305 icmp_id_out = 6305 - @classmethod - def setUpClass(cls): - super(IPSecNATTestCase, cls).setUpClass() - cls.tun_if = cls.pg0 - cls.vapi.ipsec_spd_add_del(cls.tun_spd_id) - cls.vapi.ipsec_interface_add_del_spd(cls.tun_spd_id, - cls.tun_if.sw_if_index) - p = cls.ipv4_params - cls.config_esp_tun(p) - cls.logger.info(cls.vapi.ppcli("show ipsec")) + def setUp(self): + super(IPSecNATTestCase, self).setUp() + self.tun_if = self.pg0 + self.vapi.ipsec_spd_add_del(self.tun_spd_id) + self.vapi.ipsec_interface_add_del_spd(self.tun_spd_id, + self.tun_if.sw_if_index) + p = self.ipv4_params + self.config_esp_tun(p) + self.logger.info(self.vapi.ppcli("show ipsec")) src = socket.inet_pton(p.addr_type, p.remote_tun_if_host) - cls.vapi.ip_add_del_route(src, p.addr_len, - cls.tun_if.remote_addr_n[p.addr_type], - is_ipv6=p.is_ipv6) + self.vapi.ip_add_del_route(src, p.addr_len, + self.tun_if.remote_addr_n[p.addr_type], + is_ipv6=p.is_ipv6) def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip): return [ @@ -131,8 +130,7 @@ class IPSecNATTestCase(TemplateIpsec): ppp("Unexpected or invalid encrypted packet:", packet)) raise - @classmethod - def config_esp_tun(cls, params): + def config_esp_tun(self, params): addr_type = params.addr_type scapy_tun_sa_id = params.scapy_tun_sa_id scapy_tun_spi = params.scapy_tun_spi @@ -144,50 +142,50 @@ class IPSecNATTestCase(TemplateIpsec): crypt_key = params.crypt_key addr_any = params.addr_any addr_bcast = params.addr_bcast - cls.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - cls.vpp_esp_protocol, - cls.pg1.remote_addr_n[addr_type], - cls.tun_if.remote_addr_n[addr_type], - udp_encap=1) - cls.vapi.ipsec_sad_add_del_entry(vpp_tun_sa_id, vpp_tun_spi, - auth_algo_vpp_id, auth_key, - crypt_algo_vpp_id, crypt_key, - cls.vpp_esp_protocol, - cls.tun_if.remote_addr_n[addr_type], - cls.pg1.remote_addr_n[addr_type], - udp_encap=1) + self.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi, + auth_algo_vpp_id, auth_key, + crypt_algo_vpp_id, crypt_key, + self.vpp_esp_protocol, + self.pg1.remote_addr_n[addr_type], + self.tun_if.remote_addr_n[addr_type], + udp_encap=1) + self.vapi.ipsec_sad_add_del_entry(vpp_tun_sa_id, vpp_tun_spi, + auth_algo_vpp_id, auth_key, + crypt_algo_vpp_id, crypt_key, + self.vpp_esp_protocol, + self.tun_if.remote_addr_n[addr_type], + self.pg1.remote_addr_n[addr_type], + udp_encap=1) l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any) l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast) - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, - protocol=socket.IPPROTO_ESP) - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, is_outbound=0, - protocol=socket.IPPROTO_ESP) - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, remote_port_start=4500, - remote_port_stop=4500, - protocol=socket.IPPROTO_UDP) - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, remote_port_start=4500, - remote_port_stop=4500, - protocol=socket.IPPROTO_UDP, - is_outbound=0) - l_startaddr = l_stopaddr = cls.tun_if.remote_addr_n[addr_type] - r_startaddr = r_stopaddr = cls.pg1.remote_addr_n[addr_type] - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, vpp_tun_sa_id, - l_startaddr, l_stopaddr, r_startaddr, - r_stopaddr, priority=10, policy=3, - is_outbound=0) - cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id, - r_startaddr, r_stopaddr, l_startaddr, - l_stopaddr, priority=10, policy=3) + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, + protocol=socket.IPPROTO_ESP) + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, is_outbound=0, + protocol=socket.IPPROTO_ESP) + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, remote_port_start=4500, + remote_port_stop=4500, + protocol=socket.IPPROTO_UDP) + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, remote_port_start=4500, + remote_port_stop=4500, + protocol=socket.IPPROTO_UDP, + is_outbound=0) + l_startaddr = l_stopaddr = self.tun_if.remote_addr_n[addr_type] + r_startaddr = r_stopaddr = self.pg1.remote_addr_n[addr_type] + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, vpp_tun_sa_id, + l_startaddr, l_stopaddr, r_startaddr, + r_stopaddr, priority=10, policy=3, + is_outbound=0) + self.vapi.ipsec_spd_add_del_entry(self.tun_spd_id, scapy_tun_sa_id, + r_startaddr, r_stopaddr, l_startaddr, + l_stopaddr, priority=10, policy=3) def test_ipsec_nat_tun(self): """ IPSec/NAT tunnel test case """ diff --git a/test/test_ipsec_tun_if_esp.py b/test/test_ipsec_tun_if_esp.py index e10e2a3cfcb..561f1099715 100644 --- a/test/test_ipsec_tun_if_esp.py +++ b/test/test_ipsec_tun_if_esp.py @@ -11,12 +11,11 @@ class TemplateIpsecTunIfEsp(TemplateIpsec): encryption_type = ESP - @classmethod - def setUpClass(cls): - super(TemplateIpsecTunIfEsp, cls).setUpClass() - cls.tun_if = cls.pg0 - def setUp(self): + super(TemplateIpsecTunIfEsp, self).setUp() + + self.tun_if = self.pg0 + p = self.ipv4_params tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi, p.scapy_tun_spi, p.crypt_algo_vpp_id, |