summaryrefslogtreecommitdiffstats
path: root/test/test_ipsec_esp.py
diff options
context:
space:
mode:
authorKlement Sekera <ksekera@cisco.com>2018-05-16 10:52:45 +0200
committerFlorin Coras <florin.coras@gmail.com>2018-06-21 14:50:10 +0000
commita98346f664aae148d26a8e158008b773d73db96f (patch)
tree2a850b1925f3dd70817fb0e41324baef71fd7a05 /test/test_ipsec_esp.py
parent56ba844d6a8b8f58b18fe51bf22707b0c37d3a87 (diff)
ipsec: VPP-1316 calculate IP/TCP/UDP inner checksums
Calculate IP/TCP/UDP checksums in software before adding authentication. Change-Id: I3e121cb00aeba667764f39ade8d62170f18f8b6b Signed-off-by: Klement Sekera <ksekera@cisco.com>
Diffstat (limited to 'test/test_ipsec_esp.py')
-rw-r--r--test/test_ipsec_esp.py404
1 files changed, 113 insertions, 291 deletions
diff --git a/test/test_ipsec_esp.py b/test/test_ipsec_esp.py
index 15ce4a96878..58d159a7213 100644
--- a/test/test_ipsec_esp.py
+++ b/test/test_ipsec_esp.py
@@ -1,14 +1,13 @@
import socket
import unittest
+from scapy.layers.ipsec import ESP
-from scapy.layers.inet import IP, ICMP
-from scapy.layers.l2 import Ether
-from scapy.layers.ipsec import SecurityAssociation, ESP
+from framework import VppTestRunner
+from template_ipsec import IpsecTraTests, IpsecTunTests
+from template_ipsec import TemplateIpsec, IpsecTcpTests
-from framework import VppTestCase, VppTestRunner
-
-class TestIpsecEsp(VppTestCase):
+class TemplateIpsecEsp(TemplateIpsec):
"""
Basic test for ipsec esp sanity - tunnel and transport modes.
@@ -41,298 +40,121 @@ class TestIpsecEsp(VppTestCase):
Note : IPv6 is not covered
"""
- remote_pg0_lb_addr = '1.1.1.1'
- remote_pg1_lb_addr = '2.2.2.2'
+ encryption_type = ESP
@classmethod
def setUpClass(cls):
- super(TestIpsecEsp, cls).setUpClass()
- try:
- cls.create_pg_interfaces(range(3))
- cls.interfaces = list(cls.pg_interfaces)
- for i in cls.interfaces:
- i.admin_up()
- i.config_ip4()
- i.resolve_arp()
- cls.logger.info(cls.vapi.ppcli("show int addr"))
- cls.configEspTra()
- cls.logger.info(cls.vapi.ppcli("show ipsec"))
- cls.configEspTun()
- cls.logger.info(cls.vapi.ppcli("show ipsec"))
- except Exception:
- super(TestIpsecEsp, cls).tearDownClass()
- raise
+ super(TemplateIpsecEsp, cls).setUpClass()
+ cls.tun_if = cls.pg0
+ cls.tra_if = cls.pg2
+ cls.logger.info(cls.vapi.ppcli("show int addr"))
+ cls.config_esp_tra()
+ cls.logger.info(cls.vapi.ppcli("show ipsec"))
+ cls.config_esp_tun()
+ cls.logger.info(cls.vapi.ppcli("show ipsec"))
+ src4 = socket.inet_pton(socket.AF_INET, cls.remote_tun_if_host)
+ cls.vapi.ip_add_del_route(src4, 32, cls.tun_if.remote_ip4n)
@classmethod
- def configEspTun(cls):
- try:
- spd_id = 1
- remote_sa_id = 10
- local_sa_id = 20
- remote_tun_spi = 1001
- local_tun_spi = 1000
- src4 = socket.inet_pton(socket.AF_INET, cls.remote_pg0_lb_addr)
- cls.vapi.ip_add_del_route(src4, 32, cls.pg0.remote_ip4n)
- dst4 = socket.inet_pton(socket.AF_INET, cls.remote_pg1_lb_addr)
- cls.vapi.ip_add_del_route(dst4, 32, cls.pg1.remote_ip4n)
- cls.vapi.ipsec_sad_add_del_entry(
- remote_sa_id,
- remote_tun_spi,
- cls.pg0.local_ip4n,
- cls.pg0.remote_ip4n,
- integrity_key_length=20,
- crypto_key_length=16,
- protocol=1)
- cls.vapi.ipsec_sad_add_del_entry(
- local_sa_id,
- local_tun_spi,
- cls.pg0.remote_ip4n,
- cls.pg0.local_ip4n,
- integrity_key_length=20,
- crypto_key_length=16,
- protocol=1)
- cls.vapi.ipsec_spd_add_del(spd_id)
- cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg0.sw_if_index)
- l_startaddr = r_startaddr = socket.inet_pton(
- socket.AF_INET, "0.0.0.0")
- l_stopaddr = r_stopaddr = socket.inet_pton(
- socket.AF_INET, "255.255.255.255")
- cls.vapi.ipsec_spd_add_del_entry(
- spd_id,
- l_startaddr,
- l_stopaddr,
- r_startaddr,
- r_stopaddr,
- protocol=socket.IPPROTO_ESP)
- cls.vapi.ipsec_spd_add_del_entry(
- spd_id,
- l_startaddr,
- l_stopaddr,
- r_startaddr,
- r_stopaddr,
- protocol=socket.IPPROTO_ESP,
- is_outbound=0)
- l_startaddr = l_stopaddr = socket.inet_pton(
- socket.AF_INET, cls.remote_pg0_lb_addr)
- r_startaddr = r_stopaddr = socket.inet_pton(
- socket.AF_INET, cls.remote_pg1_lb_addr)
- cls.vapi.ipsec_spd_add_del_entry(
- spd_id,
- l_startaddr,
- l_stopaddr,
- r_startaddr,
- r_stopaddr,
- priority=10,
- policy=3,
- is_outbound=0,
- sa_id=local_sa_id)
- cls.vapi.ipsec_spd_add_del_entry(
- spd_id,
- r_startaddr,
- r_stopaddr,
- l_startaddr,
- l_stopaddr,
- priority=10,
- policy=3,
- sa_id=remote_sa_id)
- except Exception:
- raise
+ def config_esp_tun(cls):
+ cls.vapi.ipsec_sad_add_del_entry(cls.scapy_tun_sa_id,
+ cls.scapy_tun_spi,
+ cls.auth_algo_vpp_id, cls.auth_key,
+ cls.crypt_algo_vpp_id,
+ cls.crypt_key, cls.vpp_esp_protocol,
+ cls.tun_if.local_ip4n,
+ cls.tun_if.remote_ip4n)
+ cls.vapi.ipsec_sad_add_del_entry(cls.vpp_tun_sa_id,
+ cls.vpp_tun_spi,
+ cls.auth_algo_vpp_id, cls.auth_key,
+ cls.crypt_algo_vpp_id,
+ cls.crypt_key, cls.vpp_esp_protocol,
+ cls.tun_if.remote_ip4n,
+ cls.tun_if.local_ip4n)
+ cls.vapi.ipsec_spd_add_del(cls.tun_spd_id)
+ cls.vapi.ipsec_interface_add_del_spd(cls.tun_spd_id,
+ cls.tun_if.sw_if_index)
+ l_startaddr = r_startaddr = socket.inet_pton(socket.AF_INET,
+ "0.0.0.0")
+ l_stopaddr = r_stopaddr = socket.inet_pton(socket.AF_INET,
+ "255.255.255.255")
+ cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
+ l_startaddr, l_stopaddr, r_startaddr,
+ r_stopaddr,
+ protocol=socket.IPPROTO_ESP)
+ cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
+ l_startaddr, l_stopaddr, r_startaddr,
+ r_stopaddr, is_outbound=0,
+ protocol=socket.IPPROTO_ESP)
+ l_startaddr = l_stopaddr = socket.inet_pton(socket.AF_INET,
+ cls.remote_tun_if_host)
+ r_startaddr = r_stopaddr = cls.pg1.remote_ip4n
+ cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.vpp_tun_sa_id,
+ l_startaddr, l_stopaddr, r_startaddr,
+ r_stopaddr, priority=10, policy=3,
+ is_outbound=0)
+ cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
+ r_startaddr, r_stopaddr, l_startaddr,
+ l_stopaddr, priority=10, policy=3)
+ l_startaddr = l_stopaddr = socket.inet_pton(socket.AF_INET,
+ cls.remote_tun_if_host)
+ r_startaddr = r_stopaddr = cls.pg0.local_ip4n
+ cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.vpp_tun_sa_id,
+ l_startaddr, l_stopaddr, r_startaddr,
+ r_stopaddr, priority=20, policy=3,
+ is_outbound=0)
+ cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
+ r_startaddr, r_stopaddr, l_startaddr,
+ l_stopaddr, priority=20, policy=3)
@classmethod
- def configEspTra(cls):
- try:
- spd_id = 2
- remote_sa_id = 30
- local_sa_id = 40
- remote_tra_spi = 2001
- local_tra_spi = 2000
- cls.vapi.ipsec_sad_add_del_entry(
- remote_sa_id,
- remote_tra_spi,
- integrity_key_length=20,
- crypto_key_length=16,
- protocol=1,
- is_tunnel=0)
- cls.vapi.ipsec_sad_add_del_entry(
- local_sa_id,
- local_tra_spi,
- integrity_key_length=20,
- crypto_key_length=16,
- protocol=1,
- is_tunnel=0)
- cls.vapi.ipsec_spd_add_del(spd_id)
- cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg2.sw_if_index)
- l_startaddr = r_startaddr = socket.inet_pton(
- socket.AF_INET, "0.0.0.0")
- l_stopaddr = r_stopaddr = socket.inet_pton(
- socket.AF_INET, "255.255.255.255")
- cls.vapi.ipsec_spd_add_del_entry(
- spd_id,
- l_startaddr,
- l_stopaddr,
- r_startaddr,
- r_stopaddr,
- protocol=socket.IPPROTO_ESP)
- cls.vapi.ipsec_spd_add_del_entry(
- spd_id,
- l_startaddr,
- l_stopaddr,
- r_startaddr,
- r_stopaddr,
- protocol=socket.IPPROTO_ESP,
- is_outbound=0)
- l_startaddr = l_stopaddr = cls.pg2.local_ip4n
- r_startaddr = r_stopaddr = cls.pg2.remote_ip4n
- cls.vapi.ipsec_spd_add_del_entry(
- spd_id,
- l_startaddr,
- l_stopaddr,
- r_startaddr,
- r_stopaddr,
- priority=10,
- policy=3,
- is_outbound=0,
- sa_id=local_sa_id)
- cls.vapi.ipsec_spd_add_del_entry(
- spd_id,
- l_startaddr,
- l_stopaddr,
- r_startaddr,
- r_stopaddr,
- priority=10,
- policy=3,
- sa_id=remote_sa_id)
- except Exception:
- raise
-
- def configScapySA(self, is_tun=False):
- if is_tun:
- self.remote_tun_sa = SecurityAssociation(
- ESP,
- spi=0x000003e8,
- crypt_algo='AES-CBC',
- crypt_key='JPjyOWBeVEQiMe7h',
- auth_algo='HMAC-SHA1-96',
- auth_key='C91KUR9GYMm5GfkEvNjX',
- tunnel_header=IP(
- src=self.pg0.remote_ip4,
- dst=self.pg0.local_ip4))
- self.local_tun_sa = SecurityAssociation(
- ESP,
- spi=0x000003e9,
- crypt_algo='AES-CBC',
- crypt_key='JPjyOWBeVEQiMe7h',
- auth_algo='HMAC-SHA1-96',
- auth_key='C91KUR9GYMm5GfkEvNjX',
- tunnel_header=IP(
- dst=self.pg0.remote_ip4,
- src=self.pg0.local_ip4))
- else:
- self.remote_tra_sa = SecurityAssociation(
- ESP,
- spi=0x000007d0,
- crypt_algo='AES-CBC',
- crypt_key='JPjyOWBeVEQiMe7h',
- auth_algo='HMAC-SHA1-96',
- auth_key='C91KUR9GYMm5GfkEvNjX')
- self.local_tra_sa = SecurityAssociation(
- ESP,
- spi=0x000007d1,
- crypt_algo='AES-CBC',
- crypt_key='JPjyOWBeVEQiMe7h',
- auth_algo='HMAC-SHA1-96',
- auth_key='C91KUR9GYMm5GfkEvNjX')
-
- def tearDown(self):
- super(TestIpsecEsp, self).tearDown()
- if not self.vpp_dead:
- self.vapi.cli("show hardware")
-
- def send_and_expect(self, input, pkts, output, count=1):
- input.add_stream(pkts)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- rx = output.get_capture(count)
- return rx
-
- def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(IP(src=src, dst=dst) / ICMP() /
- "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
- ] * count
-
- def gen_pkts(self, sw_intf, src, dst, count=1):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- IP(src=src, dst=dst) / ICMP() /
- "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
- ] * count
-
- def test_ipsec_esp_tra_basic(self, count=1):
- """ ipsec esp v4 transport basic test """
- try:
- self.configScapySA()
- send_pkts = self.gen_encrypt_pkts(
- self.remote_tra_sa,
- self.pg2,
- src=self.pg2.remote_ip4,
- dst=self.pg2.local_ip4,
- count=count)
- recv_pkts = self.send_and_expect(
- self.pg2, send_pkts, self.pg2, count=count)
- # ESP TRA VPP encryption/decryption verification
- for Pkts in recv_pkts:
- self.local_tra_sa.decrypt(Pkts[IP])
- finally:
- self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
-
- def test_ipsec_esp_tra_burst(self):
- """ ipsec esp v4 transport burst test """
- try:
- self.test_ipsec_esp_tra_basic(count=257)
- finally:
- self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
-
- def test_ipsec_esp_tun_basic(self, count=1):
- """ ipsec esp 4o4 tunnel basic test """
- try:
- self.configScapySA(is_tun=True)
- send_pkts = self.gen_encrypt_pkts(
- self.remote_tun_sa,
- self.pg0,
- src=self.remote_pg0_lb_addr,
- dst=self.remote_pg1_lb_addr,
- count=count)
- recv_pkts = self.send_and_expect(
- self.pg0, send_pkts, self.pg1, count=count)
- # ESP TUN VPP decryption verification
- for recv_pkt in recv_pkts:
- self.assert_equal(recv_pkt[IP].src, self.remote_pg0_lb_addr)
- self.assert_equal(recv_pkt[IP].dst, self.remote_pg1_lb_addr)
- send_pkts = self.gen_pkts(
- self.pg1,
- src=self.remote_pg1_lb_addr,
- dst=self.remote_pg0_lb_addr,
- count=count)
- recv_pkts = self.send_and_expect(
- self.pg1, send_pkts, self.pg0, count=count)
- # ESP TUN VPP encryption verification
- for recv_pkt in recv_pkts:
- decrypt_pkt = self.local_tun_sa.decrypt(recv_pkt[IP])
- self.assert_equal(decrypt_pkt.src, self.remote_pg1_lb_addr)
- self.assert_equal(decrypt_pkt.dst, self.remote_pg0_lb_addr)
- finally:
- self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
-
- def test_ipsec_esp_tun_burst(self):
- """ ipsec esp 4o4 tunnel burst test """
- try:
- self.test_ipsec_esp_tun_basic(count=257)
- finally:
- self.logger.info(self.vapi.ppcli("show error"))
- self.logger.info(self.vapi.ppcli("show ipsec"))
+ def config_esp_tra(cls):
+ cls.vapi.ipsec_sad_add_del_entry(cls.scapy_tra_sa_id,
+ cls.scapy_tra_spi,
+ cls.auth_algo_vpp_id, cls.auth_key,
+ cls.crypt_algo_vpp_id,
+ cls.crypt_key, cls.vpp_esp_protocol,
+ is_tunnel=0)
+ cls.vapi.ipsec_sad_add_del_entry(cls.vpp_tra_sa_id,
+ cls.vpp_tra_spi,
+ cls.auth_algo_vpp_id, cls.auth_key,
+ cls.crypt_algo_vpp_id,
+ cls.crypt_key, cls.vpp_esp_protocol,
+ is_tunnel=0)
+ cls.vapi.ipsec_spd_add_del(cls.tra_spd_id)
+ cls.vapi.ipsec_interface_add_del_spd(cls.tra_spd_id,
+ cls.tra_if.sw_if_index)
+ l_startaddr = r_startaddr = socket.inet_pton(socket.AF_INET,
+ "0.0.0.0")
+ l_stopaddr = r_stopaddr = socket.inet_pton(socket.AF_INET,
+ "255.255.255.255")
+ cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, cls.vpp_tra_sa_id,
+ l_startaddr, l_stopaddr, r_startaddr,
+ r_stopaddr,
+ protocol=socket.IPPROTO_ESP)
+ cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, cls.vpp_tra_sa_id,
+ l_startaddr, l_stopaddr, r_startaddr,
+ r_stopaddr, is_outbound=0,
+ protocol=socket.IPPROTO_ESP)
+ l_startaddr = l_stopaddr = cls.tra_if.local_ip4n
+ r_startaddr = r_stopaddr = cls.tra_if.remote_ip4n
+ cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, cls.vpp_tra_sa_id,
+ l_startaddr, l_stopaddr, r_startaddr,
+ r_stopaddr, priority=10, policy=3,
+ is_outbound=0)
+ cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, cls.scapy_tra_sa_id,
+ l_startaddr, l_stopaddr, r_startaddr,
+ r_stopaddr, priority=10, policy=3)
+
+
+class TestIpsecEsp1(TemplateIpsecEsp, IpsecTraTests, IpsecTunTests):
+ """ Ipsec ESP - TUN & TRA tests """
+ pass
+
+
+class TestIpsecEsp2(TemplateIpsecEsp, IpsecTcpTests):
+ """ Ipsec ESP - TCP tests """
+ pass
if __name__ == '__main__':