summaryrefslogtreecommitdiffstats
path: root/test/test_ipsec_ah.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_ipsec_ah.py')
-rw-r--r--test/test_ipsec_ah.py287
1 files changed, 189 insertions, 98 deletions
diff --git a/test/test_ipsec_ah.py b/test/test_ipsec_ah.py
index 729f8716d03..d173b2e3b23 100644
--- a/test/test_ipsec_ah.py
+++ b/test/test_ipsec_ah.py
@@ -1,14 +1,14 @@
import socket
import unittest
-from scapy.layers.ipsec import AH
+from scapy.layers.inet import IP, ICMP
+from scapy.layers.l2 import Ether, Raw
+from scapy.layers.ipsec import SecurityAssociation, AH
-from framework import VppTestRunner
-from template_ipsec import TemplateIpsec, IpsecTraTests, IpsecTunTests
-from template_ipsec import IpsecTcpTests
+from framework import VppTestCase, VppTestRunner
-class TemplateIpsecAh(TemplateIpsec):
+class TestIpsecAh(VppTestCase):
"""
Basic test for IPSEC using AH transport and Tunnel mode
@@ -41,123 +41,214 @@ class TemplateIpsecAh(TemplateIpsec):
Note : IPv6 is not covered
"""
- encryption_type = AH
+ remote_pg0_lb_addr = '1.1.1.1'
+ remote_pg1_lb_addr = '2.2.2.2'
+ payload = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
@classmethod
def setUpClass(cls):
- super(TemplateIpsecAh, cls).setUpClass()
- cls.tun_if = cls.pg0
- cls.tra_if = cls.pg2
- cls.logger.info(cls.vapi.ppcli("show int addr"))
- cls.config_ah_tra()
- cls.logger.info(cls.vapi.ppcli("show ipsec"))
- cls.config_ah_tun()
- cls.logger.info(cls.vapi.ppcli("show ipsec"))
- src4 = socket.inet_pton(socket.AF_INET, cls.remote_tun_if_host)
- cls.vapi.ip_add_del_route(src4, 32, cls.tun_if.remote_ip4n)
+ super(TestIpsecAh, cls).setUpClass()
+ try:
+ cls.create_pg_interfaces(range(3))
+ cls.interfaces = list(cls.pg_interfaces)
+ for i in cls.interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+ cls.logger.info(cls.vapi.ppcli("show int addr"))
+ cls.config_ah_tra()
+ cls.logger.info(cls.vapi.ppcli("show ipsec"))
+ cls.config_ah_tun()
+ cls.logger.info(cls.vapi.ppcli("show ipsec"))
+ except Exception:
+ super(TestIpsecAh, cls).tearDownClass()
+ raise
@classmethod
def config_ah_tun(cls):
- cls.vapi.ipsec_sad_add_del_entry(cls.scapy_tun_sa_id,
- cls.scapy_tun_spi,
- cls.auth_algo_vpp_id, cls.auth_key,
- cls.crypt_algo_vpp_id,
- cls.crypt_key, cls.vpp_ah_protocol,
- cls.tun_if.local_ip4n,
- cls.tun_if.remote_ip4n)
- cls.vapi.ipsec_sad_add_del_entry(cls.vpp_tun_sa_id,
- cls.vpp_tun_spi,
- cls.auth_algo_vpp_id, cls.auth_key,
- cls.crypt_algo_vpp_id,
- cls.crypt_key, cls.vpp_ah_protocol,
- cls.tun_if.remote_ip4n,
- cls.tun_if.local_ip4n)
- cls.vapi.ipsec_spd_add_del(cls.tun_spd_id)
- cls.vapi.ipsec_interface_add_del_spd(cls.tun_spd_id,
- cls.tun_if.sw_if_index)
+ spd_id = 1
+ remote_sa_id = 10
+ local_sa_id = 20
+ remote_tun_spi = 1001
+ local_tun_spi = 1000
+ src4 = socket.inet_pton(socket.AF_INET, cls.remote_pg0_lb_addr)
+ cls.vapi.ip_add_del_route(src4, 32, cls.pg0.remote_ip4n)
+ dst4 = socket.inet_pton(socket.AF_INET, cls.remote_pg1_lb_addr)
+ cls.vapi.ip_add_del_route(dst4, 32, cls.pg1.remote_ip4n)
+ cls.vapi.ipsec_sad_add_del_entry(remote_sa_id, remote_tun_spi,
+ cls.pg0.local_ip4n,
+ cls.pg0.remote_ip4n,
+ integrity_key_length=20)
+ cls.vapi.ipsec_sad_add_del_entry(local_sa_id, local_tun_spi,
+ cls.pg0.remote_ip4n,
+ cls.pg0.local_ip4n,
+ integrity_key_length=20)
+ cls.vapi.ipsec_spd_add_del(spd_id)
+ cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg0.sw_if_index)
l_startaddr = r_startaddr = socket.inet_pton(socket.AF_INET, "0.0.0.0")
l_stopaddr = r_stopaddr = socket.inet_pton(socket.AF_INET,
"255.255.255.255")
- cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.vpp_tun_sa_id,
- l_startaddr, l_stopaddr, r_startaddr,
- r_stopaddr,
+ cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
+ r_startaddr, r_stopaddr,
protocol=socket.IPPROTO_AH)
- cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.vpp_tun_sa_id,
- l_startaddr, l_stopaddr, r_startaddr,
- r_stopaddr, is_outbound=0,
- protocol=socket.IPPROTO_AH)
- l_startaddr = l_stopaddr = socket.inet_pton(socket.AF_INET,
- cls.remote_tun_if_host)
- r_startaddr = r_stopaddr = cls.pg1.remote_ip4n
- cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.vpp_tun_sa_id,
- l_startaddr, l_stopaddr, r_startaddr,
- r_stopaddr, priority=10, policy=3,
+ cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
+ r_startaddr, r_stopaddr,
+ protocol=socket.IPPROTO_AH,
is_outbound=0)
- cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
- r_startaddr, r_stopaddr, l_startaddr,
- l_stopaddr, priority=10, policy=3)
- r_startaddr = r_stopaddr = cls.pg0.local_ip4n
- cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.vpp_tun_sa_id,
- l_startaddr, l_stopaddr, r_startaddr,
- r_stopaddr, priority=20, policy=3,
- is_outbound=0)
- cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, cls.scapy_tun_sa_id,
- r_startaddr, r_stopaddr, l_startaddr,
- l_stopaddr, priority=20, policy=3)
+ l_startaddr = l_stopaddr = socket.inet_pton(socket.AF_INET,
+ cls.remote_pg0_lb_addr)
+ r_startaddr = r_stopaddr = socket.inet_pton(socket.AF_INET,
+ cls.remote_pg1_lb_addr)
+ cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
+ r_startaddr, r_stopaddr,
+ priority=10, policy=3,
+ is_outbound=0, sa_id=local_sa_id)
+ cls.vapi.ipsec_spd_add_del_entry(spd_id, r_startaddr, r_stopaddr,
+ l_startaddr, l_stopaddr, priority=10,
+ policy=3, sa_id=remote_sa_id)
@classmethod
def config_ah_tra(cls):
- cls.vapi.ipsec_sad_add_del_entry(cls.scapy_tra_sa_id,
- cls.scapy_tra_spi,
- cls.auth_algo_vpp_id, cls.auth_key,
- cls.crypt_algo_vpp_id,
- cls.crypt_key, cls.vpp_ah_protocol,
- is_tunnel=0)
- cls.vapi.ipsec_sad_add_del_entry(cls.vpp_tra_sa_id,
- cls.vpp_tra_spi,
- cls.auth_algo_vpp_id, cls.auth_key,
- cls.crypt_algo_vpp_id,
- cls.crypt_key, cls.vpp_ah_protocol,
- is_tunnel=0)
- cls.vapi.ipsec_spd_add_del(cls.tra_spd_id)
- cls.vapi.ipsec_interface_add_del_spd(cls.tra_spd_id,
- cls.tra_if.sw_if_index)
+ spd_id = 2
+ remote_sa_id = 30
+ local_sa_id = 40
+ remote_tra_spi = 2001
+ local_tra_spi = 2000
+ cls.vapi.ipsec_sad_add_del_entry(remote_sa_id, remote_tra_spi,
+ integrity_key_length=20, is_tunnel=0)
+ cls.vapi.ipsec_sad_add_del_entry(local_sa_id, local_tra_spi,
+ integrity_key_length=20, is_tunnel=0)
+ cls.vapi.ipsec_spd_add_del(spd_id)
+ cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg2.sw_if_index)
l_startaddr = r_startaddr = socket.inet_pton(socket.AF_INET, "0.0.0.0")
l_stopaddr = r_stopaddr = socket.inet_pton(socket.AF_INET,
"255.255.255.255")
- cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, cls.vpp_tra_sa_id,
- l_startaddr, l_stopaddr, r_startaddr,
- r_stopaddr,
+ cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
+ r_startaddr, r_stopaddr,
protocol=socket.IPPROTO_AH)
- cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, cls.scapy_tra_sa_id,
- l_startaddr, l_stopaddr, r_startaddr,
- r_stopaddr, is_outbound=0,
- protocol=socket.IPPROTO_AH)
- l_startaddr = l_stopaddr = cls.tra_if.local_ip4n
- r_startaddr = r_stopaddr = cls.tra_if.remote_ip4n
- cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, cls.vpp_tra_sa_id,
- l_startaddr, l_stopaddr, r_startaddr,
- r_stopaddr, priority=10, policy=3,
+ cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
+ r_startaddr, r_stopaddr,
+ protocol=socket.IPPROTO_AH,
is_outbound=0)
- cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, cls.scapy_tra_sa_id,
- l_startaddr, l_stopaddr, r_startaddr,
- r_stopaddr, priority=10,
- policy=3)
+ l_startaddr = l_stopaddr = cls.pg2.local_ip4n
+ r_startaddr = r_stopaddr = cls.pg2.remote_ip4n
+ cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
+ r_startaddr, r_stopaddr,
+ priority=10, policy=3,
+ is_outbound=0, sa_id=local_sa_id)
+ cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr,
+ r_startaddr, r_stopaddr, priority=10,
+ policy=3, sa_id=remote_sa_id)
+
+ def configure_scapy_sa_tun(self):
+ remote_tun_sa = SecurityAssociation(AH, spi=0x000003e8,
+ auth_algo='HMAC-SHA1-96',
+ auth_key='C91KUR9GYMm5GfkEvNjX',
+ tunnel_header=IP(
+ src=self.pg0.remote_ip4,
+ dst=self.pg0.local_ip4))
+ local_tun_sa = SecurityAssociation(AH, spi=0x000003e9,
+ auth_algo='HMAC-SHA1-96',
+ auth_key='C91KUR9GYMm5GfkEvNjX',
+ tunnel_header=IP(
+ dst=self.pg0.remote_ip4,
+ src=self.pg0.local_ip4))
+ return local_tun_sa, remote_tun_sa
+
+ def configure_scapy_sa_tra(self):
+ remote_tra_sa = SecurityAssociation(AH, spi=0x000007d0,
+ auth_algo='HMAC-SHA1-96',
+ auth_key='C91KUR9GYMm5GfkEvNjX')
+ local_tra_sa = SecurityAssociation(AH, spi=0x000007d1,
+ auth_algo='HMAC-SHA1-96',
+ auth_key='C91KUR9GYMm5GfkEvNjX')
+ return local_tra_sa, remote_tra_sa
def tearDown(self):
- super(TemplateIpsecAh, self).tearDown()
+ super(TestIpsecAh, self).tearDown()
if not self.vpp_dead:
self.vapi.cli("show hardware")
-
-class TestIpsecAh1(TemplateIpsecAh, IpsecTraTests, IpsecTunTests):
- """ Ipsec AH - TUN & TRA tests """
- pass
-
-
-class TestIpsecAh2(TemplateIpsecAh, IpsecTcpTests):
- """ Ipsec AH - TCP tests """
- pass
+ def send_and_expect(self, input, pkts, output, count=1):
+ input.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ rx = output.get_capture(count)
+ return rx
+
+ def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
+ return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
+ sa.encrypt(IP(src=src, dst=dst) / ICMP() / self.payload)
+ ] * count
+
+ def gen_pkts(self, sw_intf, src, dst, count=1):
+ return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
+ IP(src=src, dst=dst) / ICMP() / self.payload
+ ] * count
+
+ def test_ipsec_ah_tra_basic(self, count=1):
+ """ ipsec ah v4 transport basic test """
+ try:
+ local_tra_sa, remote_tra_sa = self.configure_scapy_sa_tra()
+ send_pkts = self.gen_encrypt_pkts(remote_tra_sa, self.pg2,
+ src=self.pg2.remote_ip4,
+ dst=self.pg2.local_ip4,
+ count=count)
+ recv_pkts = self.send_and_expect(self.pg2, send_pkts, self.pg2,
+ count=count)
+ # ESP TRA VPP encryption/decryption verification
+ for Pkts in recv_pkts:
+ Pkts[AH].padding = Pkts[AH].icv[12:]
+ Pkts[AH].icv = Pkts[AH].icv[:12]
+ local_tra_sa.decrypt(Pkts[IP])
+ finally:
+ self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.vapi.ppcli("show ipsec"))
+
+ def test_ipsec_ah_tra_burst(self):
+ """ ipsec ah v4 transport burst test """
+ try:
+ self.test_ipsec_ah_tra_basic(count=257)
+ finally:
+ self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.vapi.ppcli("show ipsec"))
+
+ def test_ipsec_ah_tun_basic(self, count=1):
+ """ ipsec ah 4o4 tunnel basic test """
+ try:
+ local_tun_sa, remote_tun_sa = self.configure_scapy_sa_tun()
+ send_pkts = self.gen_encrypt_pkts(remote_tun_sa, self.pg0,
+ src=self.remote_pg0_lb_addr,
+ dst=self.remote_pg1_lb_addr,
+ count=count)
+ recv_pkts = self.send_and_expect(self.pg0, send_pkts, self.pg1,
+ count=count)
+ # ESP TUN VPP decryption verification
+ for recv_pkt in recv_pkts:
+ self.assert_equal(recv_pkt[IP].src, self.remote_pg0_lb_addr)
+ self.assert_equal(recv_pkt[IP].dst, self.remote_pg1_lb_addr)
+ send_pkts = self.gen_pkts(self.pg1, src=self.remote_pg1_lb_addr,
+ dst=self.remote_pg0_lb_addr,
+ count=count)
+ recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.pg0,
+ count=count)
+ # ESP TUN VPP encryption verification
+ for recv_pkt in recv_pkts:
+ decrypt_pkt = local_tun_sa.decrypt(recv_pkt[IP])
+ decrypt_pkt = IP(decrypt_pkt[Raw].load)
+ self.assert_equal(decrypt_pkt.src, self.remote_pg1_lb_addr)
+ self.assert_equal(decrypt_pkt.dst, self.remote_pg0_lb_addr)
+ finally:
+ self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.vapi.ppcli("show ipsec"))
+
+ def test_ipsec_ah_tun_burst(self):
+ """ ipsec ah 4o4 tunnel burst test """
+ try:
+ self.test_ipsec_ah_tun_basic(count=257)
+ finally:
+ self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.vapi.ppcli("show ipsec"))
if __name__ == '__main__':