diff options
Diffstat (limited to 'test/test_ipsec_api.py')
-rw-r--r-- | test/test_ipsec_api.py | 139 |
1 files changed, 89 insertions, 50 deletions
diff --git a/test/test_ipsec_api.py b/test/test_ipsec_api.py index b5b4adac66b..7208d2887b5 100644 --- a/test/test_ipsec_api.py +++ b/test/test_ipsec_api.py @@ -1,12 +1,17 @@ import unittest -from framework import VppTestCase, VppTestRunner -from template_ipsec import TemplateIpsec, IPsecIPv4Params +from framework import VppTestCase +from asfframework import VppTestRunner +from template_ipsec import IPsecIPv4Params from vpp_papi import VppEnum +from vpp_ipsec import VppIpsecSA + class IpsecApiTestCase(VppTestCase): - """ IPSec API tests """ + """IPSec API tests""" + + vpp_worker_count = 2 @classmethod def setUpClass(cls): @@ -22,10 +27,8 @@ class IpsecApiTestCase(VppTestCase): self.pg0.config_ip4() self.pg0.admin_up() - self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t. - IPSEC_API_PROTO_ESP) - self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t. - IPSEC_API_PROTO_AH) + self.vpp_esp_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP + self.vpp_ah_protocol = VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_AH self.ipv4_params = IPsecIPv4Params() def tearDown(self): @@ -34,32 +37,34 @@ class IpsecApiTestCase(VppTestCase): super(IpsecApiTestCase, self).tearDown() def test_backend_dump(self): - """ backend dump """ + """backend dump""" d = self.vapi.ipsec_backend_dump() self.assert_equal(len(d), 2, "number of ipsec backends in dump") - self.assert_equal(d[0].protocol, self.vpp_ah_protocol, - "ipsec protocol in dump entry") + self.assert_equal( + d[0].protocol, self.vpp_ah_protocol, "ipsec protocol in dump entry" + ) self.assert_equal(d[0].index, 0, "index in dump entry") self.assert_equal(d[0].active, 1, "active flag in dump entry") - self.assert_equal(d[1].protocol, self.vpp_esp_protocol, - "ipsec protocol in dump entry") + self.assert_equal( + d[1].protocol, self.vpp_esp_protocol, "ipsec protocol in dump entry" + ) self.assert_equal(d[1].index, 0, "index in dump entry") self.assert_equal(d[1].active, 1, "active flag in dump entry") def test_select_valid_backend(self): - """ select valid backend """ + """select valid backend""" self.vapi.ipsec_select_backend(self.vpp_ah_protocol, 0) self.vapi.ipsec_select_backend(self.vpp_esp_protocol, 0) def test_select_invalid_backend(self): - """ select invalid backend """ + """select invalid backend""" with self.vapi.assert_negative_api_retval(): self.vapi.ipsec_select_backend(self.vpp_ah_protocol, 200) with self.vapi.assert_negative_api_retval(): self.vapi.ipsec_select_backend(self.vpp_esp_protocol, 200) def test_select_backend_in_use(self): - """ attempt to change backend while sad configured """ + """attempt to change backend while sad configured""" params = self.ipv4_params addr_type = params.addr_type is_ipv6 = params.is_ipv6 @@ -73,48 +78,82 @@ class IpsecApiTestCase(VppTestCase): self.vapi.ipsec_sad_entry_add_del( is_add=1, entry={ - 'sad_id': scapy_tun_sa_id, - 'spi': scapy_tun_spi, - 'integrity_algorithm': auth_algo_vpp_id, - 'integrity_key': { - 'data': auth_key, - 'length': len(auth_key), + "sad_id": scapy_tun_sa_id, + "spi": scapy_tun_spi, + "integrity_algorithm": auth_algo_vpp_id, + "integrity_key": { + "data": auth_key, + "length": len(auth_key), }, - 'crypto_algorithm': crypt_algo_vpp_id, - 'crypto_key': { - 'data': crypt_key, - 'length': len(crypt_key), + "crypto_algorithm": crypt_algo_vpp_id, + "crypto_key": { + "data": crypt_key, + "length": len(crypt_key), }, - 'protocol': self.vpp_ah_protocol, - 'tunnel_src': self.pg0.local_addr[addr_type], - 'tunnel_dst': self.pg0.remote_addr[addr_type] - }) + "protocol": self.vpp_ah_protocol, + "tunnel_src": self.pg0.local_addr[addr_type], + "tunnel_dst": self.pg0.remote_addr[addr_type], + }, + ) with self.vapi.assert_negative_api_retval(): - self.vapi.ipsec_select_backend( - protocol=self.vpp_ah_protocol, index=0) + self.vapi.ipsec_select_backend(protocol=self.vpp_ah_protocol, index=0) self.vapi.ipsec_sad_entry_add_del( is_add=0, entry={ - 'sad_id': scapy_tun_sa_id, - 'spi': scapy_tun_spi, - 'integrity_algorithm': auth_algo_vpp_id, - 'integrity_key': { - 'data': auth_key, - 'length': len(auth_key), + "sad_id": scapy_tun_sa_id, + "spi": scapy_tun_spi, + "integrity_algorithm": auth_algo_vpp_id, + "integrity_key": { + "data": auth_key, + "length": len(auth_key), }, - 'crypto_algorithm': crypt_algo_vpp_id, - 'crypto_key': { - 'data': crypt_key, - 'length': len(crypt_key), + "crypto_algorithm": crypt_algo_vpp_id, + "crypto_key": { + "data": crypt_key, + "length": len(crypt_key), }, - 'protocol': self.vpp_ah_protocol, - 'tunnel_src': self.pg0.local_addr[addr_type], - 'tunnel_dst': self.pg0.remote_addr[addr_type] - }) - self.vapi.ipsec_select_backend( - protocol=self.vpp_ah_protocol, index=0) - - -if __name__ == '__main__': + "protocol": self.vpp_ah_protocol, + "tunnel_src": self.pg0.local_addr[addr_type], + "tunnel_dst": self.pg0.remote_addr[addr_type], + }, + ) + self.vapi.ipsec_select_backend(protocol=self.vpp_ah_protocol, index=0) + + def __check_sa_binding(self, sa_id, thread_index): + found_sa = False + sa_dumps = self.vapi.ipsec_sa_v5_dump() + for dump in sa_dumps: + if dump.entry.sad_id == sa_id: + self.assertEqual(dump.thread_index, thread_index) + found_sa = True + break + + if not found_sa: + self.fail("SA not found in VPP") + + def test_sa_worker_bind(self): + """Bind an SA to a worker""" + sa = VppIpsecSA( + self, + self.ipv4_params.scapy_tun_sa_id, + self.ipv4_params.scapy_tun_spi, + self.ipv4_params.auth_algo_vpp_id, + self.ipv4_params.auth_key, + self.ipv4_params.crypt_algo_vpp_id, + self.ipv4_params.crypt_key, + VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP, + ) + sa.add_vpp_config() + + self.__check_sa_binding(sa.id, 0xFFFF) + + self.vapi.ipsec_sad_bind(sa_id=sa.id, worker=1) + + self.__check_sa_binding(sa.id, 2) + + sa.remove_vpp_config() + + +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) |