aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_ipsec_esp.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_ipsec_esp.py')
-rw-r--r--test/test_ipsec_esp.py204
1 files changed, 165 insertions, 39 deletions
diff --git a/test/test_ipsec_esp.py b/test/test_ipsec_esp.py
index 50c6f5c8db5..dcbbb826775 100644
--- a/test/test_ipsec_esp.py
+++ b/test/test_ipsec_esp.py
@@ -22,6 +22,7 @@ from vpp_papi import VppEnum
NUM_PKTS = 67
engines_supporting_chain_bufs = ["openssl"]
+engines = ["ia32", "ipsecmb", "openssl"]
class ConfigIpsecESP(TemplateIpsec):
@@ -474,56 +475,112 @@ class TestIpsecEspAsync(TemplateIpsecEsp):
def setUp(self):
super(TestIpsecEspAsync, self).setUp()
- self.vapi.ipsec_set_async_mode(async_enable=True)
- self.p4 = IPsecIPv4Params()
-
- self.p4.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
- IPSEC_API_CRYPTO_ALG_AES_CBC_256)
- self.p4.crypt_algo = 'AES-CBC' # scapy name
- self.p4.crypt_key = b'JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h'
-
- self.p4.scapy_tun_sa_id += 0xf0000
- self.p4.scapy_tun_spi += 0xf0000
- self.p4.vpp_tun_sa_id += 0xf0000
- self.p4.vpp_tun_spi += 0xf0000
- self.p4.remote_tun_if_host = "2.2.2.2"
+ self.p_sync = IPsecIPv4Params()
+
+ self.p_sync.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
+ IPSEC_API_CRYPTO_ALG_AES_CBC_256)
+ self.p_sync.crypt_algo = 'AES-CBC' # scapy name
+ self.p_sync.crypt_key = b'JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h'
+
+ self.p_sync.scapy_tun_sa_id += 0xf0000
+ self.p_sync.scapy_tun_spi += 0xf0000
+ self.p_sync.vpp_tun_sa_id += 0xf0000
+ self.p_sync.vpp_tun_spi += 0xf0000
+ self.p_sync.remote_tun_if_host = "2.2.2.2"
e = VppEnum.vl_api_ipsec_spd_action_t
- self.p4.sa = VppIpsecSA(
+ self.p_sync.sa = VppIpsecSA(
self,
- self.p4.vpp_tun_sa_id,
- self.p4.vpp_tun_spi,
- self.p4.auth_algo_vpp_id,
- self.p4.auth_key,
- self.p4.crypt_algo_vpp_id,
- self.p4.crypt_key,
+ self.p_sync.vpp_tun_sa_id,
+ self.p_sync.vpp_tun_spi,
+ self.p_sync.auth_algo_vpp_id,
+ self.p_sync.auth_key,
+ self.p_sync.crypt_algo_vpp_id,
+ self.p_sync.crypt_key,
self.vpp_esp_protocol,
- self.tun_if.local_addr[self.p4.addr_type],
- self.tun_if.remote_addr[self.p4.addr_type]).add_vpp_config()
- self.p4.spd = VppIpsecSpdEntry(
+ self.tun_if.local_addr[self.p_sync.addr_type],
+ self.tun_if.remote_addr[self.p_sync.addr_type]).add_vpp_config()
+ self.p_sync.spd = VppIpsecSpdEntry(
self,
self.tun_spd,
- self.p4.vpp_tun_sa_id,
- self.pg1.remote_addr[self.p4.addr_type],
- self.pg1.remote_addr[self.p4.addr_type],
- self.p4.remote_tun_if_host,
- self.p4.remote_tun_if_host,
+ self.p_sync.vpp_tun_sa_id,
+ self.pg1.remote_addr[self.p_sync.addr_type],
+ self.pg1.remote_addr[self.p_sync.addr_type],
+ self.p_sync.remote_tun_if_host,
+ self.p_sync.remote_tun_if_host,
0,
priority=1,
policy=e.IPSEC_API_SPD_ACTION_PROTECT,
is_outbound=1).add_vpp_config()
- VppIpRoute(self, self.p4.remote_tun_if_host, self.p4.addr_len,
- [VppRoutePath(self.tun_if.remote_addr[self.p4.addr_type],
- 0xffffffff)]).add_vpp_config()
- config_tun_params(self.p4, self.encryption_type, self.tun_if)
+ VppIpRoute(self,
+ self.p_sync.remote_tun_if_host,
+ self.p_sync.addr_len,
+ [VppRoutePath(
+ self.tun_if.remote_addr[self.p_sync.addr_type],
+ 0xffffffff)]).add_vpp_config()
+ config_tun_params(self.p_sync, self.encryption_type, self.tun_if)
+
+ self.p_async = IPsecIPv4Params()
+
+ self.p_async.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
+ IPSEC_API_CRYPTO_ALG_AES_GCM_256)
+ self.p_async.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
+ IPSEC_API_INTEG_ALG_NONE)
+ self.p_async.crypt_algo = 'AES-GCM' # scapy name
+ self.p_async.crypt_key = b'JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h'
+ self.p_async.auth_algo = 'NULL'
+
+ self.p_async.scapy_tun_sa_id += 0xe0000
+ self.p_async.scapy_tun_spi += 0xe0000
+ self.p_async.vpp_tun_sa_id += 0xe0000
+ self.p_async.vpp_tun_spi += 0xe0000
+ self.p_async.remote_tun_if_host = "2.2.2.3"
+
+ iflags = VppEnum.vl_api_ipsec_sad_flags_t
+ self.p_async.flags = (iflags.IPSEC_API_SAD_FLAG_USE_ESN |
+ iflags.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY |
+ iflags.IPSEC_API_SAD_FLAG_ASYNC)
+
+ self.p_async.sa = VppIpsecSA(
+ self,
+ self.p_async.vpp_tun_sa_id,
+ self.p_async.vpp_tun_spi,
+ self.p_async.auth_algo_vpp_id,
+ self.p_async.auth_key,
+ self.p_async.crypt_algo_vpp_id,
+ self.p_async.crypt_key,
+ self.vpp_esp_protocol,
+ self.tun_if.local_addr[self.p_async.addr_type],
+ self.tun_if.remote_addr[self.p_async.addr_type],
+ flags=self.p_async.flags).add_vpp_config()
+ self.p_async.spd = VppIpsecSpdEntry(
+ self,
+ self.tun_spd,
+ self.p_async.vpp_tun_sa_id,
+ self.pg1.remote_addr[self.p_async.addr_type],
+ self.pg1.remote_addr[self.p_async.addr_type],
+ self.p_async.remote_tun_if_host,
+ self.p_async.remote_tun_if_host,
+ 0,
+ priority=2,
+ policy=e.IPSEC_API_SPD_ACTION_PROTECT,
+ is_outbound=1).add_vpp_config()
+ VppIpRoute(self,
+ self.p_async.remote_tun_if_host,
+ self.p_async.addr_len,
+ [VppRoutePath(
+ self.tun_if.remote_addr[self.p_async.addr_type],
+ 0xffffffff)]).add_vpp_config()
+ config_tun_params(self.p_async, self.encryption_type, self.tun_if)
def test_dual_stream(self):
""" Alternating SAs """
- p = self.params[self.p4.addr_type]
+ p = self.params[self.p_sync.addr_type]
+ self.vapi.ipsec_set_async_mode(async_enable=True)
pkts = [(Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IP(src=self.pg1.remote_ip4,
- dst=self.p4.remote_tun_if_host) /
+ dst=self.p_sync.remote_tun_if_host) /
UDP(sport=4444, dport=4444) /
Raw(b'0x0' * 200)),
(Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
@@ -540,14 +597,76 @@ class TestIpsecEspAsync(TemplateIpsecEsp):
for rx in rxs:
if rx[ESP].spi == p.scapy_tun_spi:
decrypted = p.vpp_tun_sa.decrypt(rx[IP])
- elif rx[ESP].spi == self.p4.vpp_tun_spi:
- decrypted = self.p4.scapy_tun_sa.decrypt(rx[IP])
+ elif rx[ESP].spi == self.p_sync.vpp_tun_spi:
+ decrypted = self.p_sync.scapy_tun_sa.decrypt(rx[IP])
+ else:
+ rx.show()
+ self.assertTrue(False)
+
+ self.p_sync.spd.remove_vpp_config()
+ self.p_sync.sa.remove_vpp_config()
+ self.p_async.spd.remove_vpp_config()
+ self.p_async.sa.remove_vpp_config()
+ self.vapi.ipsec_set_async_mode(async_enable=False)
+
+ def test_sync_async_noop_stream(self):
+ """ Alternating SAs sync/async/noop """
+ p = self.params[self.p_sync.addr_type]
+
+ # first pin the default/noop SA to worker 0
+ pkts = [(Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4,
+ dst=p.remote_tun_if_host) /
+ UDP(sport=4444, dport=4444) /
+ Raw(b'0x0' * 200))]
+ rxs = self.send_and_expect(self.pg1, pkts, self.pg0, worker=0)
+
+ self.logger.info(self.vapi.cli("sh ipsec sa"))
+ self.logger.info(self.vapi.cli("sh crypto async status"))
+
+ # then use all the other SAs on worker 1.
+ # some will handoff, other take the sync and async paths
+ pkts = [(Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4,
+ dst=self.p_sync.remote_tun_if_host) /
+ UDP(sport=4444, dport=4444) /
+ Raw(b'0x0' * 200)),
+ (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4,
+ dst=p.remote_tun_if_host) /
+ UDP(sport=4444, dport=4444) /
+ Raw(b'0x0' * 200)),
+ (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4,
+ dst=self.p_async.remote_tun_if_host) /
+ UDP(sport=4444, dport=4444) /
+ Raw(b'0x0' * 200))]
+ pkts *= 1023
+
+ rxs = self.send_and_expect(self.pg1, pkts, self.pg0, worker=1)
+
+ self.assertEqual(len(rxs), len(pkts))
+
+ for rx in rxs:
+ if rx[ESP].spi == p.scapy_tun_spi:
+ decrypted = p.vpp_tun_sa.decrypt(rx[IP])
+ elif rx[ESP].spi == self.p_sync.vpp_tun_spi:
+ decrypted = self.p_sync.scapy_tun_sa.decrypt(rx[IP])
+ elif rx[ESP].spi == self.p_async.vpp_tun_spi:
+ decrypted = self.p_async.scapy_tun_sa.decrypt(rx[IP])
else:
rx.show()
self.assertTrue(False)
- self.p4.spd.remove_vpp_config()
- self.p4.sa.remove_vpp_config()
+ self.p_sync.spd.remove_vpp_config()
+ self.p_sync.sa.remove_vpp_config()
+ self.p_async.spd.remove_vpp_config()
+ self.p_async.sa.remove_vpp_config()
+
+ # async mode should have been disabled now that there are
+ # no async SAs. there's no API for this, so a reluctant
+ # screen scrape.
+ self.assertTrue("DISABLED" in self.vapi.cli("sh crypto async status"))
class TestIpsecEspHandoff(TemplateIpsecEsp,
@@ -618,7 +737,6 @@ class TestIpsecEspUdp(TemplateIpsecEspUdp, IpsecTra4Tests):
class MyParameters():
def __init__(self):
- self.engines = ["ia32", "ipsecmb", "openssl"]
flag_esn = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN
self.flags = [0, flag_esn]
# foreach crypto algorithm
@@ -829,6 +947,14 @@ class RunTestIpsecEspAll(ConfigIpsecESP,
count=NUM_PKTS, payload_size=sz)
#
+ # swap the handlers while SAs are up
+ #
+ for e in engines:
+ if e != engine:
+ self.vapi.cli("set crypto handler all %s" % e)
+ self.verify_tra_basic4(count=NUM_PKTS)
+
+ #
# remove the SPDs, SAs, etc
#
self.unconfig_network()