aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--test/template_ipsec.py112
-rw-r--r--test/test_ipsec_ah.py5
-rw-r--r--test/test_ipsec_esp.py19
-rw-r--r--test/test_ipsec_tun_if_esp.py127
-rw-r--r--test/vpp_interface.py8
5 files changed, 215 insertions, 56 deletions
diff --git a/test/template_ipsec.py b/test/template_ipsec.py
index 483699c2eac..1b9a3796c15 100644
--- a/test/template_ipsec.py
+++ b/test/template_ipsec.py
@@ -79,6 +79,45 @@ class IPsecIPv6Params(object):
self.nat_header = None
+def config_tun_params(p, encryption_type, tun_if):
+ ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
+ p.scapy_tun_sa = SecurityAssociation(
+ encryption_type, spi=p.vpp_tun_spi,
+ crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
+ auth_algo=p.auth_algo, auth_key=p.auth_key,
+ tunnel_header=ip_class_by_addr_type[p.addr_type](
+ src=tun_if.remote_addr[p.addr_type],
+ dst=tun_if.local_addr[p.addr_type]),
+ nat_t_header=p.nat_header)
+ p.vpp_tun_sa = SecurityAssociation(
+ encryption_type, spi=p.scapy_tun_spi,
+ crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
+ auth_algo=p.auth_algo, auth_key=p.auth_key,
+ tunnel_header=ip_class_by_addr_type[p.addr_type](
+ dst=tun_if.remote_addr[p.addr_type],
+ src=tun_if.local_addr[p.addr_type]),
+ nat_t_header=p.nat_header)
+
+
+def config_tra_params(p, encryption_type):
+ p.scapy_tra_sa = SecurityAssociation(
+ encryption_type,
+ spi=p.vpp_tra_spi,
+ crypt_algo=p.crypt_algo,
+ crypt_key=p.crypt_key,
+ auth_algo=p.auth_algo,
+ auth_key=p.auth_key,
+ nat_t_header=p.nat_header)
+ p.vpp_tra_sa = SecurityAssociation(
+ encryption_type,
+ spi=p.scapy_tra_spi,
+ crypt_algo=p.crypt_algo,
+ crypt_key=p.crypt_key,
+ auth_algo=p.auth_algo,
+ auth_key=p.auth_key,
+ nat_t_header=p.nat_header)
+
+
class TemplateIpsec(VppTestCase):
"""
TRANSPORT MODE:
@@ -164,26 +203,6 @@ class TemplateIpsec(VppTestCase):
ICMPv6EchoRequest(id=0, seq=1, data=self.payload)
for i in range(count)]
- def configure_sa_tun(self, params):
- ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
- scapy_tun_sa = SecurityAssociation(
- self.encryption_type, spi=params.vpp_tun_spi,
- crypt_algo=params.crypt_algo, crypt_key=params.crypt_key,
- auth_algo=params.auth_algo, auth_key=params.auth_key,
- tunnel_header=ip_class_by_addr_type[params.addr_type](
- src=self.tun_if.remote_addr[params.addr_type],
- dst=self.tun_if.local_addr[params.addr_type]),
- nat_t_header=params.nat_header)
- vpp_tun_sa = SecurityAssociation(
- self.encryption_type, spi=params.scapy_tun_spi,
- crypt_algo=params.crypt_algo, crypt_key=params.crypt_key,
- auth_algo=params.auth_algo, auth_key=params.auth_key,
- tunnel_header=ip_class_by_addr_type[params.addr_type](
- dst=self.tun_if.remote_addr[params.addr_type],
- src=self.tun_if.local_addr[params.addr_type]),
- nat_t_header=params.nat_header)
- return vpp_tun_sa, scapy_tun_sa
-
def configure_sa_tra(self, params):
params.scapy_tra_sa = SecurityAssociation(
self.encryption_type,
@@ -208,15 +227,15 @@ class IpsecTcpTests(object):
""" verify checksum correctness for vpp generated packets """
self.vapi.cli("test http server")
p = self.params[socket.AF_INET]
- vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
+ config_tun_params(p, self.encryption_type, self.tun_if)
send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
- scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
- dst=self.tun_if.local_ip4) /
- TCP(flags='S', dport=80)))
+ p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
+ dst=self.tun_if.local_ip4) /
+ TCP(flags='S', dport=80)))
self.logger.debug(ppp("Sending packet:", send))
recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
recv = recv[0]
- decrypted = vpp_tun_sa.decrypt(recv[IP])
+ decrypted = p.vpp_tun_sa.decrypt(recv[IP])
self.assert_packet_checksums_valid(decrypted)
@@ -374,14 +393,13 @@ class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
pass
-class IpsecTun4Tests(object):
- def test_tun_basic44(self, count=1):
- """ ipsec 4o4 tunnel basic test """
+class IpsecTun4(object):
+
+ def verify_tun_44(self, p, count=1):
self.vapi.cli("clear errors")
try:
- p = self.params[socket.AF_INET]
- vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
- send_pkts = self.gen_encrypt_pkts(scapy_tun_sa, self.tun_if,
+ config_tun_params(p, self.encryption_type, self.tun_if)
+ send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host,
dst=self.pg1.remote_ip4,
count=count)
@@ -395,7 +413,7 @@ class IpsecTun4Tests(object):
recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
for recv_pkt in recv_pkts:
try:
- decrypt_pkt = vpp_tun_sa.decrypt(recv_pkt[IP])
+ decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
if not decrypt_pkt.haslayer(IP):
decrypt_pkt = IP(decrypt_pkt[Raw].load)
self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
@@ -432,19 +450,26 @@ class IpsecTun4Tests(object):
self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
+
+class IpsecTun4Tests(IpsecTun4):
+
+ def test_tun_basic44(self):
+ """ ipsec 4o4 tunnel basic test """
+ self.verify_tun_44(self.params[socket.AF_INET], count=1)
+
def test_tun_burst44(self):
""" ipsec 4o4 tunnel burst test """
- self.test_tun_basic44(count=257)
+ self.verify_tun_44(self.params[socket.AF_INET], count=257)
-class IpsecTun6Tests(object):
- def test_tun_basic66(self, count=1):
+class IpsecTun6(object):
+
+ def verify_tun_66(self, p, count=1):
""" ipsec 6o6 tunnel basic test """
self.vapi.cli("clear errors")
try:
- p = self.params[socket.AF_INET6]
- vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
- send_pkts = self.gen_encrypt_pkts6(scapy_tun_sa, self.tun_if,
+ config_tun_params(p, self.encryption_type, self.tun_if)
+ send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
src=p.remote_tun_if_host,
dst=self.pg1.remote_ip6,
count=count)
@@ -459,7 +484,7 @@ class IpsecTun6Tests(object):
recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
for recv_pkt in recv_pkts:
try:
- decrypt_pkt = vpp_tun_sa.decrypt(recv_pkt[IPv6])
+ decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
if not decrypt_pkt.haslayer(IPv6):
decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
@@ -489,9 +514,16 @@ class IpsecTun6Tests(object):
self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
+
+class IpsecTun6Tests(IpsecTun6):
+
+ def test_tun_basic66(self):
+ """ ipsec 6o6 tunnel basic test """
+ self.verify_tun_66(self.params[socket.AF_INET6], count=1)
+
def test_tun_burst66(self):
""" ipsec 6o6 tunnel burst test """
- self.test_tun_basic66(count=257)
+ self.verify_tun_66(self.params[socket.AF_INET6], count=257)
class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
diff --git a/test/test_ipsec_ah.py b/test/test_ipsec_ah.py
index 7498f51275d..21080cad3d6 100644
--- a/test/test_ipsec_ah.py
+++ b/test/test_ipsec_ah.py
@@ -4,7 +4,8 @@ import unittest
from scapy.layers.ipsec import AH
from framework import VppTestRunner
-from template_ipsec import TemplateIpsec, IpsecTra46Tests, IpsecTun46Tests
+from template_ipsec import TemplateIpsec, IpsecTra46Tests, IpsecTun46Tests, \
+ config_tun_params, config_tra_params
from template_ipsec import IpsecTcpTests
from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
VppIpsecSpdItfBinding
@@ -53,7 +54,7 @@ class TemplateIpsecAh(TemplateIpsec):
for _, p in self.params.items():
self.config_ah_tra(p)
- self.configure_sa_tra(p)
+ config_tra_params(p, self.encryption_type)
self.logger.info(self.vapi.ppcli("show ipsec"))
for _, p in self.params.items():
self.config_ah_tun(p)
diff --git a/test/test_ipsec_esp.py b/test/test_ipsec_esp.py
index 09b72401541..468636018b5 100644
--- a/test/test_ipsec_esp.py
+++ b/test/test_ipsec_esp.py
@@ -5,7 +5,7 @@ from scapy.layers.inet import UDP
from framework import VppTestRunner
from template_ipsec import IpsecTra46Tests, IpsecTun46Tests, TemplateIpsec, \
- IpsecTcpTests, IpsecTun4Tests, IpsecTra4Tests
+ IpsecTcpTests, IpsecTun4Tests, IpsecTra4Tests, config_tra_params
from vpp_ipsec import VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSA,\
VppIpsecSpdItfBinding
from vpp_ip_route import VppIpRoute, VppRoutePath
@@ -177,8 +177,6 @@ class TemplateIpsecEsp(TemplateIpsec):
|pg0| -------> |VPP| ------> |pg1|
--- --- ---
"""
- config_esp_tun = config_esp_tun
- config_esp_tra = config_esp_tra
def setUp(self):
super(TemplateIpsecEsp, self).setUp()
@@ -193,8 +191,8 @@ class TemplateIpsecEsp(TemplateIpsec):
self.tra_if).add_vpp_config()
for _, p in self.params.items():
- self.config_esp_tra(p)
- self.configure_sa_tra(p)
+ config_esp_tra(self, p)
+ config_tra_params(p, self.encryption_type)
self.logger.info(self.vapi.ppcli("show ipsec"))
self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
@@ -203,7 +201,7 @@ class TemplateIpsecEsp(TemplateIpsec):
self.tun_if).add_vpp_config()
for _, p in self.params.items():
- self.config_esp_tun(p)
+ config_esp_tun(self, p)
self.logger.info(self.vapi.ppcli("show ipsec"))
for _, p in self.params.items():
@@ -241,9 +239,6 @@ class TemplateIpsecEspUdp(TemplateIpsec):
"""
UDP encapped ESP
"""
- config_esp_tun = config_esp_tun
- config_esp_tra = config_esp_tra
-
def setUp(self):
super(TemplateIpsecEspUdp, self).setUp()
self.encryption_type = ESP
@@ -261,15 +256,15 @@ class TemplateIpsecEspUdp(TemplateIpsec):
VppIpsecSpdItfBinding(self, self.tra_spd,
self.tra_if).add_vpp_config()
- self.config_esp_tra(p)
- self.configure_sa_tra(p)
+ config_esp_tra(self, p)
+ config_tra_params(p, self.encryption_type)
self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
self.tun_spd.add_vpp_config()
VppIpsecSpdItfBinding(self, self.tun_spd,
self.tun_if).add_vpp_config()
- self.config_esp_tun(p)
+ config_esp_tun(self, p)
self.logger.info(self.vapi.ppcli("show ipsec"))
d = DpoProto.DPO_PROTO_IP4
diff --git a/test/test_ipsec_tun_if_esp.py b/test/test_ipsec_tun_if_esp.py
index 06d2c89ebe9..ac96cfb9cc4 100644
--- a/test/test_ipsec_tun_if_esp.py
+++ b/test/test_ipsec_tun_if_esp.py
@@ -1,9 +1,10 @@
import unittest
import socket
+import copy
from scapy.layers.ipsec import ESP
from framework import VppTestRunner
from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
- IpsecTcpTests
+ IpsecTun4, IpsecTun6, IpsecTcpTests, config_tun_params
from vpp_ipsec_tun_interface import VppIpsecTunInterface
from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
@@ -69,7 +70,7 @@ class TemplateIpsec6TunIfEsp(TemplateIpsec):
tun_if.admin_up()
tun_if.config_ip6()
- VppIpRoute(self, p.remote_tun_if_host, 32,
+ VppIpRoute(self, p.remote_tun_if_host, 128,
[VppRoutePath(tun_if.remote_ip6,
0xffffffff,
proto=DpoProto.DPO_PROTO_IP6)],
@@ -87,5 +88,127 @@ class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
tun6_decrypt_node_name = "esp6-decrypt"
+class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
+ """ IPsec IPv4 Multi Tunnel interface """
+
+ encryption_type = ESP
+ tun4_encrypt_node_name = "esp4-encrypt"
+ tun4_decrypt_node_name = "esp4-decrypt"
+
+ def setUp(self):
+ super(TestIpsec4MultiTunIfEsp, self).setUp()
+
+ self.tun_if = self.pg0
+
+ self.multi_params = []
+
+ for ii in range(10):
+ p = copy.copy(self.ipv4_params)
+
+ p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
+ p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
+ p.scapy_tun_spi = p.scapy_tun_spi + ii
+ p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
+ p.vpp_tun_spi = p.vpp_tun_spi + ii
+
+ p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
+ p.scapy_tra_spi = p.scapy_tra_spi + ii
+ p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
+ p.vpp_tra_spi = p.vpp_tra_spi + ii
+
+ config_tun_params(p, self.encryption_type, self.tun_if)
+ self.multi_params.append(p)
+
+ p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
+ p.scapy_tun_spi,
+ p.crypt_algo_vpp_id,
+ p.crypt_key, p.crypt_key,
+ p.auth_algo_vpp_id, p.auth_key,
+ p.auth_key)
+ p.tun_if.add_vpp_config()
+ p.tun_if.admin_up()
+ p.tun_if.config_ip4()
+
+ VppIpRoute(self, p.remote_tun_if_host, 32,
+ [VppRoutePath(p.tun_if.remote_ip4,
+ 0xffffffff)]).add_vpp_config()
+
+ def tearDown(self):
+ if not self.vpp_dead:
+ self.vapi.cli("show hardware")
+ super(TestIpsec4MultiTunIfEsp, self).tearDown()
+
+ def test_tun_44(self):
+ """Multiple IPSEC tunnel interfaces """
+ for p in self.multi_params:
+ self.verify_tun_44(p, count=127)
+ c = p.tun_if.get_rx_stats()
+ self.assertEqual(c['packets'], 127)
+ c = p.tun_if.get_tx_stats()
+ self.assertEqual(c['packets'], 127)
+
+
+class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
+ """ IPsec IPv6 Muitli Tunnel interface """
+
+ encryption_type = ESP
+ tun6_encrypt_node_name = "esp6-encrypt"
+ tun6_decrypt_node_name = "esp6-decrypt"
+
+ def setUp(self):
+ super(TestIpsec6MultiTunIfEsp, self).setUp()
+
+ self.tun_if = self.pg0
+
+ self.multi_params = []
+
+ for ii in range(10):
+ p = copy.copy(self.ipv6_params)
+
+ p.remote_tun_if_host = "1111::%d" % (ii + 1)
+ p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
+ p.scapy_tun_spi = p.scapy_tun_spi + ii
+ p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
+ p.vpp_tun_spi = p.vpp_tun_spi + ii
+
+ p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
+ p.scapy_tra_spi = p.scapy_tra_spi + ii
+ p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
+ p.vpp_tra_spi = p.vpp_tra_spi + ii
+
+ config_tun_params(p, self.encryption_type, self.tun_if)
+ self.multi_params.append(p)
+
+ p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
+ p.scapy_tun_spi,
+ p.crypt_algo_vpp_id,
+ p.crypt_key, p.crypt_key,
+ p.auth_algo_vpp_id, p.auth_key,
+ p.auth_key, is_ip6=True)
+ p.tun_if.add_vpp_config()
+ p.tun_if.admin_up()
+ p.tun_if.config_ip6()
+
+ VppIpRoute(self, p.remote_tun_if_host, 128,
+ [VppRoutePath(p.tun_if.remote_ip6,
+ 0xffffffff,
+ proto=DpoProto.DPO_PROTO_IP6)],
+ is_ip6=1).add_vpp_config()
+
+ def tearDown(self):
+ if not self.vpp_dead:
+ self.vapi.cli("show hardware")
+ super(TestIpsec6MultiTunIfEsp, self).tearDown()
+
+ def test_tun_66(self):
+ """Multiple IPSEC tunnel interfaces """
+ for p in self.multi_params:
+ self.verify_tun_66(p, count=127)
+ c = p.tun_if.get_rx_stats()
+ self.assertEqual(c['packets'], 127)
+ c = p.tun_if.get_tx_stats()
+ self.assertEqual(c['packets'], 127)
+
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)
diff --git a/test/vpp_interface.py b/test/vpp_interface.py
index d586c849b02..7be0d45e39b 100644
--- a/test/vpp_interface.py
+++ b/test/vpp_interface.py
@@ -463,3 +463,11 @@ class VppInterface(object):
def __str__(self):
return self.name
+
+ def get_rx_stats(self):
+ c = self.test.statistics.get_counter("^/if/rx$")
+ return c[0][self.sw_if_index]
+
+ def get_tx_stats(self):
+ c = self.test.statistics.get_counter("^/if/tx$")
+ return c[0][self.sw_if_index]