summaryrefslogtreecommitdiffstats
path: root/test/test_ipsec_esp.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_ipsec_esp.py')
-rw-r--r--test/test_ipsec_esp.py404
1 files changed, 113 insertions, 291 deletions
diff --git a/test/test_ipsec_esp.py b/test/test_ipsec_esp.py
index 15ce4a96878..58d159a7213 100644
--- a/test/test_ipsec_esp.py
+++ b/test/test_ipsec_esp.py
@@ -1,14 +1,13 @@
import socket
import unittest
+from scapy.layers.ipsec import ESP
-from scapy.layers.inet import IP, ICMP
-from scapy.layers.l2 import Ether
-from scapy.layers.ipsec import SecurityAssociation, ESP
+from framework import VppTestRunner
+from template_ipsec import IpsecTraTests, IpsecTunTests
+from template_ipsec import TemplateIpsec, IpsecTcpTests
-from framework import VppTestCase, VppTestRunner
-
-class TestIpsecEsp(VppTestCase):
+class TemplateIpsecEsp(TemplateIpsec):
"""
Basic test for ipsec esp sanity - tunnel and transport modes.
@@ -41,298 +40,121 @@ class TestIpsecEsp(VppTestCase):
Note : IPv6 is not covered
"""
- remote_pg0_lb_addr = '1.1.1.1'
- remote_pg1_lb_addr = '2.2.2.2'
+ encryption_type = ESP
@classmethod
def setUpClass(cls):
- super(TestIpsecEsp, cls).setUpClass()
- try:
- cls.create_pg_interfaces(range(3))
- cls.interfaces = list(cls.pg_interfaces)
- for i in cls.interfaces:
- i.admin_up()
- i.config_ip4()
- i.resolve_arp()
- cls.logger.info(cls.vapi.ppcli("show int addr"))
- cls.configEspTra()
- cls.logger.info(cls.vapi.ppcli("show ipsec"))
- cls.configEspTun()
- cls.logger.info(cls.vapi.ppcli("show ipsec"))
- except Exception:
- super(TestIpsecEsp, cls).tearDownClass()
- raise
+ super(TemplateIpsecEsp, cls).setUpClass()
+ cls.tun_if = cls.pg0
+ cls.tra_if = cls.pg2
+ cls.logger.info(cls.vapi.ppcli("show int addr"))
+ cls.config_esp_tra()
+ cls.logger.info(cls.vapi.ppcli("show ipsec"))
+ cls.config_esp_tun()
+ cls.logger.info(cls.vapi.ppcli("show ipsec"))
+ src4 = socket.inet_pton(socket.AF_INET, cls.remote_tun_if_host)
+ cls.vapi.ip_add_del_route(src4, 32, cls.tun_if.remote_ip4n)
@classmethod
- def configEspTun(cls):
- try:
- spd_id = 1
- remote_sa_id = 10
- local_sa_id = 20
- remote_tun_spi = 1001
- local_tun_spi = 1000
- src4 = socket.inet_pton(socket.AF_INET, cls.remote_pg0_lb_addr)
- cls.vapi.ip_add_del_route(src4, 32, cls.pg0.remote_ip4n)
- dst4 = socket.inet_pton(socket.AF_INET, cls.remote_pg1_lb_addr)
- cls.vapi.ip_add_del_route(dst4, 32, cls.pg1.remote_ip4n)
- cls.vapi.ipsec_sad_add_del_entry(
- remote_sa_id,
- remote_tun_spi,
- cls.pg0.local_ip4n,
- cls.pg0.remote_ip4n,
- integrity_key_length=20,
- crypto_key_length=16,
- protocol=1)
- cls.vapi.ipsec_sad_add_del_entry(
- local_sa_id,
- local_tun_spi,
- cls.pg0.remote_ip4n,
- cls.pg0.local_ip4n,
- integrity_key_length=20,
- crypto_key_length=16,
- protocol=1)
- cls.vapi.ipsec_spd_add_del(spd_id)
- cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg0.sw_if_index)
- l_startaddr = r_startaddr = socket.inet_pton(
- socket.AF_INET, "0.0.0.0")
- l_stopaddr = r_stopaddr = socket.inet_pton(
- socket.AF_INET, "255.255.255.255")
- cls.vapi.ipsec_spd_add_del_entry(
- spd_id,
- l_startaddr,
- l_stopaddr,
- r_startaddr,
- r_stopaddr,
- protocol=socket.IPPROTO_ESP)
- cls.vapi.ipsec_spd_add_del_entry(
- spd_id,
- l_startaddr,
- l_stopaddr,
- r_startaddr,
- r_stopaddr,
- protocol=socket.IPPROTO_ESP,
- is_outbound=0)
- l_startaddr = l_stopaddr = socket.inet_pton(
- socket.AF_INET, cls.remote_pg0_lb_addr)
- r_startaddr = r_stopaddr = socket.inet_pton(
- socket.AF_INET, cls.remote_pg1_lb_addr)
- cls.vapi.ipsec_spd_add_del_entry(
- spd_id,
- l_startaddr,
- l_stopaddr,
- r_startaddr,
- r_stopaddr,
- priority=10,
- policy=3,
- is_outbound=0,
- sa_id=local_sa_id)
- cls.vapi.ipsec_spd_add_del_entry(
- spd_id,
- r_startaddr,
- r_stopaddr,
- l_startaddr,
- l_stopaddr,
- priority=10,
- policy=3,
- sa_id=remote_sa_id)
- except Exception:
- raise
+ def config_esp_tun(cls):
+ cls.vapi.ipsec_sad_add_del_entry(cls.scapy_tun_sa_id,
+ cls.scapy_tun_spi,
+ cls.auth_algo_vpp_id, cls.auth_key,
+ cls.crypt_algo_vpp_id,
+ cls.crypt_key, cls.vpp_esp_protocol,
+ cls.tun_if.local_ip4n,
+ cls.tun_if.remote_ip4n)
+ cls.vapi.ipsec_sad_add_del_entry(cls.vpp_tun_sa_id,
+ cls.vpp_tun_spi,
+ cls.auth_algo_vpp_id, cls.auth_key,
+ cls.crypt_algo_vpp_id,
+ cls.crypt_key, cls.vpp_esp_protocol,
+ cls.tun_if.remote_ip4n,
+ cls.tun_if.local_ip4n)
+ cls.vapi.ipsec_spd_add_del(cls.tun_spd_id)
+ cls.vapi.ipsec_interface_add_del_spd(cls.tun_spd_id,
+ cls.tun_if.sw_if_index)
+ l_startaddr = r_startaddr = socket.inet_pton(socket.AF_INET,
+ "0.0.0.0")
+ l_stopaddr = r_stopaddr = socket.inet_pton(socket.AF_INET,
+ "255.255.255.255")
+ cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
+ l_startaddr, l_stopaddr, r_startaddr,
+ r_stopaddr,
+ protocol=socket.IPPROTO_ESP)
+ cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
+ l_startaddr, l_stopaddr, r_startaddr,
+ r_stopaddr, is_outbound=0,
+ protocol=socket.IPPROTO_ESP)
+ l_startaddr = l_stopaddr = socket.inet_pton(socket.AF_INET,
+ cls.remote_tun_if_host)
+ r_startaddr = r_stopaddr = cls.pg1.remote_ip4n
+ cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.vpp_tun_sa_id,
+ l_startaddr, l_stopaddr, r_startaddr,
+ r_stopaddr, priority=10, policy=3,
+ is_outbound=0)
+ cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
+ r_startaddr, r_stopaddr, l_startaddr,
+ l_stopaddr, priority=10, policy=3)
+ l_startaddr = l_stopaddr = socket.inet_pton(socket.AF_INET,
+ cls.remote_tun_if_host)
+ r_startaddr = r_stopaddr = cls.pg0.local_ip4n
+ cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.vpp_tun_sa_id,
+ l_startaddr, l_stopaddr, r_startaddr,
+ r_stopaddr, priority=20, policy=3,
+ is_outbound=0)
+ cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
+ r_startaddr, r_stopaddr, l_startaddr,
+ l_stopaddr, priority=20, policy=3)
@classmethod
- def configEspTra(cls):
- try:
- spd_id = 2
- remote_sa_id = 30
- local_sa_id = 40
- remote_tra_spi = 2001
- local_tra_spi = 2000
- cls.vapi.ipsec_sad_add_del_entry(
- remote_sa_id,
- remote_tra_spi,
- integrity_key_length=20,
- crypto_key_length=16,
- protocol=1,
- is_tunnel=0)
- cls.vapi.ipsec_sad_add_del_entry(
- local_sa_id,
- local_tra_spi,
- integrity_key_length=20,
- crypto_key_length=16,
- protocol=1,
- is_tunnel=0)
- cls.vapi.ipsec_spd_add_del(spd_id)
- cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg2.sw_if_index)
- l_startaddr = r_startaddr = socket.inet_pton(
- socket.AF_INET, "0.0.0.0")
- l_stopaddr = r_stopaddr = socket.inet_pton(
- socket.AF_INET, "255.255.255.255")
- cls.vapi.ipsec_spd_add_del_entry(
- spd_id,
- l_startaddr,
- l_stopaddr,
- r_startaddr,
- r_stopaddr,
- protocol=socket.IPPROTO_ESP)
- cls.vapi.ipsec_spd_add_del_entry(
- spd_id,
- l_startaddr,
- l_stopaddr,
- r_startaddr,
- r_stopaddr,
- protocol=socket.IPPROTO_ESP,
- is_outbound=0)
- l_startaddr = l_stopaddr = cls.pg2.local_ip4n
- r_startaddr = r_stopaddr = cls.pg2.remote_ip4n
- cls.vapi.ipsec_spd_add_del_entry(
- spd_id,
- l_startaddr,
- l_stopaddr,
- r_startaddr,
- r_stopaddr,
- priority=10,
- policy=3,
- is_outbound=0,
- sa_id=local_sa_id)
- cls.vapi.ipsec_spd_add_del_entry(
- spd_id,
- l_startaddr,
- l_stopaddr,
- r_startaddr,
- r_stopaddr,
- priority=10,
- policy=3,
- sa_id=remote_sa_id)
- except Exception:
- raise
-
- def configScapySA(self, is_tun=False):
- if is_tun:
- self.remote_tun_sa = SecurityAssociation(
- ESP,
- spi=0x000003e8,
- crypt_algo='AES-CBC',
- crypt_key='JPjyOWBeVEQiMe7h',
- auth_algo='HMAC-SHA1-96',
- auth_key='C91KUR9GYMm5GfkEvNjX',
- tunnel_header=IP(
- src=self.pg0.remote_ip4,
- dst=self.pg0.local_ip4))
- self.local_tun_sa = SecurityAssociation(
- ESP,
- spi=0x000003e9,
- crypt_algo='AES-CBC',
- crypt_key='JPjyOWBeVEQiMe7h',
- auth_algo='HMAC-SHA1-96',
- auth_key='C91KUR9GYMm5GfkEvNjX',
- tunnel_header=IP(
- dst=self.pg0.remote_ip4,
- src=self.pg0.local_ip4))
- else:
- self.remote_tra_sa = SecurityAssociation(
- ESP,
- spi=0x000007d0,
- crypt_algo='AES-CBC',
- crypt_key='JPjyOWBeVEQiMe7h',
- auth_algo='HMAC-SHA1-96',
- auth_key='C91KUR9GYMm5GfkEvNjX')
- self.local_tra_sa = SecurityAssociation(
- ESP,
- spi=0x000007d1,
- crypt_algo='AES-CBC',
- crypt_key='JPjyOWBeVEQiMe7h',
- auth_algo='HMAC-SHA1-96',
- auth_key='C91KUR9GYMm5GfkEvNjX')
-
- def tearDown(self):
- super(TestIpsecEsp, self).tearDown()
- if not self.vpp_dead:
- self.vapi.cli("show hardware")
-
- def send_and_expect(self, input, pkts, output, count=1):
- input.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- rx = output.get_capture(count)
- return rx
-
- def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(IP(src=src, dst=dst) / ICMP() /
- "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
- ] * count
-
- def gen_pkts(self, sw_intf, src, dst, count=1):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- IP(src=src, dst=dst) / ICMP() /
- "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
- ] * count
-
- def test_ipsec_esp_tra_basic(self, count=1):
- """ ipsec esp v4 transport basic test """
- try:
- self.configScapySA()
- send_pkts = self.gen_encrypt_pkts(
- self.remote_tra_sa,
- self.pg2,
- src=self.pg2.remote_ip4,
- dst=self.pg2.local_ip4,
- count=count)
- recv_pkts = self.send_and_expect(
- self.pg2, send_pkts, self.pg2, count=count)
- # ESP TRA VPP encryption/decryption verification
- for Pkts in recv_pkts:
- self.local_tra_sa.decrypt(Pkts[IP])
- finally:
- self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
-
- def test_ipsec_esp_tra_burst(self):
- """ ipsec esp v4 transport burst test """
- try:
- self.test_ipsec_esp_tra_basic(count=257)
- finally:
- self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
-
- def test_ipsec_esp_tun_basic(self, count=1):
- """ ipsec esp 4o4 tunnel basic test """
- try:
- self.configScapySA(is_tun=True)
- send_pkts = self.gen_encrypt_pkts(
- self.remote_tun_sa,
- self.pg0,
- src=self.remote_pg0_lb_addr,
- dst=self.remote_pg1_lb_addr,
- count=count)
- recv_pkts = self.send_and_expect(
- self.pg0, send_pkts, self.pg1, count=count)
- # ESP TUN VPP decryption verification
- for recv_pkt in recv_pkts:
- self.assert_equal(recv_pkt[IP].src, self.remote_pg0_lb_addr)
- self.assert_equal(recv_pkt[IP].dst, self.remote_pg1_lb_addr)
- send_pkts = self.gen_pkts(
- self.pg1,
- src=self.remote_pg1_lb_addr,
- dst=self.remote_pg0_lb_addr,
- count=count)
- recv_pkts = self.send_and_expect(
- self.pg1, send_pkts, self.pg0, count=count)
- # ESP TUN VPP encryption verification
- for recv_pkt in recv_pkts:
- decrypt_pkt = self.local_tun_sa.decrypt(recv_pkt[IP])
- self.assert_equal(decrypt_pkt.src, self.remote_pg1_lb_addr)
- self.assert_equal(decrypt_pkt.dst, self.remote_pg0_lb_addr)
- finally:
- self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
-
- def test_ipsec_esp_tun_burst(self):
- """ ipsec esp 4o4 tunnel burst test """
- try:
- self.test_ipsec_esp_tun_basic(count=257)
- finally:
- self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
+ def config_esp_tra(cls):
+ cls.vapi.ipsec_sad_add_del_entry(cls.scapy_tra_sa_id,
+ cls.scapy_tra_spi,
+ cls.auth_algo_vpp_id, cls.auth_key,
+ cls.crypt_algo_vpp_id,
+ cls.crypt_key, cls.vpp_esp_protocol,
+ is_tunnel=0)
+ cls.vapi.ipsec_sad_add_del_entry(cls.vpp_tra_sa_id,
+ cls.vpp_tra_spi,
+ cls.auth_algo_vpp_id, cls.auth_key,
+ cls.crypt_algo_vpp_id,
+ cls.crypt_key, cls.vpp_esp_protocol,
+ is_tunnel=0)
+ cls.vapi.ipsec_spd_add_del(cls.tra_spd_id)
+ cls.vapi.ipsec_interface_add_del_spd(cls.tra_spd_id,
+ cls.tra_if.sw_if_index)
+ l_startaddr = r_startaddr = socket.inet_pton(socket.AF_INET,
+ "0.0.0.0")
+ l_stopaddr = r_stopaddr = socket.inet_pton(socket.AF_INET,
+ "255.255.255.255")
+ cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, cls.vpp_tra_sa_id,
+ l_startaddr, l_stopaddr, r_startaddr,
+ r_stopaddr,
+ protocol=socket.IPPROTO_ESP)
+ cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, cls.vpp_tra_sa_id,
+ l_startaddr, l_stopaddr, r_startaddr,
+ r_stopaddr, is_outbound=0,
+ protocol=socket.IPPROTO_ESP)
+ l_startaddr = l_stopaddr = cls.tra_if.local_ip4n
+ r_startaddr = r_stopaddr = cls.tra_if.remote_ip4n
+ cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, cls.vpp_tra_sa_id,
+ l_startaddr, l_stopaddr, r_startaddr,
+ r_stopaddr, priority=10, policy=3,
+ is_outbound=0)
+ cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, cls.scapy_tra_sa_id,
+ l_startaddr, l_stopaddr, r_startaddr,
+ r_stopaddr, priority=10, policy=3)
+
+
+class TestIpsecEsp1(TemplateIpsecEsp, IpsecTraTests, IpsecTunTests):
+ """ Ipsec ESP - TUN & TRA tests """
+ pass
+
+
+class TestIpsecEsp2(TemplateIpsecEsp, IpsecTcpTests):
+ """ Ipsec ESP - TCP tests """
+ pass
if __name__ == '__main__':
' href='#n921'>921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952