summaryrefslogtreecommitdiffstats
path: root/test/test_ipsec_tun_if_esp.py
diff options
context:
space:
mode:
authorKlement Sekera <klement.sekera@gmail.com>2022-04-26 19:02:15 +0200
committerOle Tr�an <otroan@employees.org>2022-05-10 18:52:08 +0000
commitd9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch)
tree4f786cfd8ebc2443cb11e11b74c8657204068898 /test/test_ipsec_tun_if_esp.py
parentf90348bcb4afd0af2611cefc43b17ef3042b511c (diff)
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera <klement.sekera@gmail.com> Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/test_ipsec_tun_if_esp.py')
-rw-r--r--test/test_ipsec_tun_if_esp.py2444
1 files changed, 1389 insertions, 1055 deletions
diff --git a/test/test_ipsec_tun_if_esp.py b/test/test_ipsec_tun_if_esp.py
index 14c9b3e3f11..9da75f0a4aa 100644
--- a/test/test_ipsec_tun_if_esp.py
+++ b/test/test_ipsec_tun_if_esp.py
@@ -10,13 +10,29 @@ from scapy.layers.inet6 import IPv6
from scapy.contrib.mpls import MPLS
from framework import tag_fixme_vpp_workers
from framework import VppTestRunner
-from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
- IpsecTun4, IpsecTun6, IpsecTcpTests, mk_scapy_crypt_key, \
- IpsecTun6HandoffTests, IpsecTun4HandoffTests, config_tun_params
+from template_ipsec import (
+ TemplateIpsec,
+ IpsecTun4Tests,
+ IpsecTun6Tests,
+ IpsecTun4,
+ IpsecTun6,
+ IpsecTcpTests,
+ mk_scapy_crypt_key,
+ IpsecTun6HandoffTests,
+ IpsecTun4HandoffTests,
+ config_tun_params,
+)
from vpp_gre_interface import VppGreInterface
from vpp_ipip_tun_interface import VppIpIpTunInterface
-from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppMplsLabel, \
- VppMplsTable, VppMplsRoute, FibPathProto
+from vpp_ip_route import (
+ VppIpRoute,
+ VppRoutePath,
+ DpoProto,
+ VppMplsLabel,
+ VppMplsTable,
+ VppMplsRoute,
+ FibPathProto,
+)
from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface
from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
@@ -30,8 +46,9 @@ from vpp_policer import PolicerAction, VppPolicer, Dir
def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
- esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
- IPSEC_API_SAD_FLAG_USE_ESN))
+ esn_en = bool(
+ p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
+ )
crypt_key = mk_scapy_crypt_key(p)
if tun_if:
p.tun_dst = tun_if.remote_ip
@@ -41,77 +58,84 @@ def config_tun_params(p, encryption_type, tun_if, src=None, dst=None):
p.tun_src = src
if p.nat_header:
- is_default_port = (p.nat_header.dport == 4500)
+ is_default_port = p.nat_header.dport == 4500
else:
is_default_port = True
if is_default_port:
outbound_nat_header = p.nat_header
else:
- outbound_nat_header = UDP(sport=p.nat_header.dport,
- dport=p.nat_header.sport)
+ outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
bind_layers(UDP, ESP, dport=p.nat_header.dport)
p.scapy_tun_sa = SecurityAssociation(
- encryption_type, spi=p.vpp_tun_spi,
+ encryption_type,
+ spi=p.vpp_tun_spi,
crypt_algo=p.crypt_algo,
crypt_key=crypt_key,
- auth_algo=p.auth_algo, auth_key=p.auth_key,
- tunnel_header=ip_class_by_addr_type[p.addr_type](
- src=p.tun_dst,
- dst=p.tun_src),
+ auth_algo=p.auth_algo,
+ auth_key=p.auth_key,
+ tunnel_header=ip_class_by_addr_type[p.addr_type](src=p.tun_dst, dst=p.tun_src),
nat_t_header=outbound_nat_header,
- esn_en=esn_en)
+ esn_en=esn_en,
+ )
p.vpp_tun_sa = SecurityAssociation(
- encryption_type, spi=p.scapy_tun_spi,
+ encryption_type,
+ spi=p.scapy_tun_spi,
crypt_algo=p.crypt_algo,
crypt_key=crypt_key,
- auth_algo=p.auth_algo, auth_key=p.auth_key,
- tunnel_header=ip_class_by_addr_type[p.addr_type](
- dst=p.tun_dst,
- src=p.tun_src),
+ auth_algo=p.auth_algo,
+ auth_key=p.auth_key,
+ tunnel_header=ip_class_by_addr_type[p.addr_type](dst=p.tun_dst, src=p.tun_src),
nat_t_header=p.nat_header,
- esn_en=esn_en)
+ esn_en=esn_en,
+ )
def config_tra_params(p, encryption_type, tun_if):
ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
- esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
- IPSEC_API_SAD_FLAG_USE_ESN))
+ esn_en = bool(
+ p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ESN)
+ )
crypt_key = mk_scapy_crypt_key(p)
p.tun_dst = tun_if.remote_ip
p.tun_src = tun_if.local_ip
if p.nat_header:
- is_default_port = (p.nat_header.dport == 4500)
+ is_default_port = p.nat_header.dport == 4500
else:
is_default_port = True
if is_default_port:
outbound_nat_header = p.nat_header
else:
- outbound_nat_header = UDP(sport=p.nat_header.dport,
- dport=p.nat_header.sport)
+ outbound_nat_header = UDP(sport=p.nat_header.dport, dport=p.nat_header.sport)
bind_layers(UDP, ESP, dport=p.nat_header.dport)
p.scapy_tun_sa = SecurityAssociation(
- encryption_type, spi=p.vpp_tun_spi,
+ encryption_type,
+ spi=p.vpp_tun_spi,
crypt_algo=p.crypt_algo,
crypt_key=crypt_key,
- auth_algo=p.auth_algo, auth_key=p.auth_key,
+ auth_algo=p.auth_algo,
+ auth_key=p.auth_key,
esn_en=esn_en,
- nat_t_header=outbound_nat_header)
+ nat_t_header=outbound_nat_header,
+ )
p.vpp_tun_sa = SecurityAssociation(
- encryption_type, spi=p.scapy_tun_spi,
+ encryption_type,
+ spi=p.scapy_tun_spi,
crypt_algo=p.crypt_algo,
crypt_key=crypt_key,
- auth_algo=p.auth_algo, auth_key=p.auth_key,
+ auth_algo=p.auth_algo,
+ auth_key=p.auth_key,
esn_en=esn_en,
- nat_t_header=p.nat_header)
+ nat_t_header=p.nat_header,
+ )
class TemplateIpsec4TunProtect(object):
- """ IPsec IPv4 Tunnel protect """
+ """IPsec IPv4 Tunnel protect"""
encryption_type = ESP
tun4_encrypt_node_name = "esp4-encrypt-tun"
@@ -121,69 +145,97 @@ class TemplateIpsec4TunProtect(object):
def config_sa_tra(self, p):
config_tun_params(p, self.encryption_type, p.tun_if)
- p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol,
- flags=p.flags)
+ p.tun_sa_out = VppIpsecSA(
+ self,
+ p.scapy_tun_sa_id,
+ p.scapy_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ flags=p.flags,
+ )
p.tun_sa_out.add_vpp_config()
- p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol,
- flags=p.flags)
+ p.tun_sa_in = VppIpsecSA(
+ self,
+ p.vpp_tun_sa_id,
+ p.vpp_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ flags=p.flags,
+ )
p.tun_sa_in.add_vpp_config()
def config_sa_tun(self, p):
config_tun_params(p, self.encryption_type, p.tun_if)
- p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol,
- self.tun_if.local_addr[p.addr_type],
- self.tun_if.remote_addr[p.addr_type],
- flags=p.flags)
+ p.tun_sa_out = VppIpsecSA(
+ self,
+ p.scapy_tun_sa_id,
+ p.scapy_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ self.tun_if.local_addr[p.addr_type],
+ self.tun_if.remote_addr[p.addr_type],
+ flags=p.flags,
+ )
p.tun_sa_out.add_vpp_config()
- p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol,
- self.tun_if.remote_addr[p.addr_type],
- self.tun_if.local_addr[p.addr_type],
- flags=p.flags)
+ p.tun_sa_in = VppIpsecSA(
+ self,
+ p.vpp_tun_sa_id,
+ p.vpp_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ self.tun_if.remote_addr[p.addr_type],
+ self.tun_if.local_addr[p.addr_type],
+ flags=p.flags,
+ )
p.tun_sa_in.add_vpp_config()
def config_protect(self, p):
- p.tun_protect = VppIpsecTunProtect(self,
- p.tun_if,
- p.tun_sa_out,
- [p.tun_sa_in])
+ p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
p.tun_protect.add_vpp_config()
def config_network(self, p):
- if hasattr(p, 'tun_dst'):
+ if hasattr(p, "tun_dst"):
tun_dst = p.tun_dst
else:
tun_dst = self.pg0.remote_ip4
- p.tun_if = VppIpIpTunInterface(self, self.pg0,
- self.pg0.local_ip4,
- tun_dst)
+ p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip4, tun_dst)
p.tun_if.add_vpp_config()
p.tun_if.admin_up()
p.tun_if.config_ip4()
p.tun_if.config_ip6()
- p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
- [VppRoutePath(p.tun_if.remote_ip4,
- 0xffffffff)])
+ p.route = VppIpRoute(
+ self,
+ p.remote_tun_if_host,
+ 32,
+ [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
+ )
p.route.add_vpp_config()
- r = VppIpRoute(self, p.remote_tun_if_host6, 128,
- [VppRoutePath(p.tun_if.remote_ip6,
- 0xffffffff,
- proto=DpoProto.DPO_PROTO_IP6)])
+ r = VppIpRoute(
+ self,
+ p.remote_tun_if_host6,
+ 128,
+ [
+ VppRoutePath(
+ p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
+ )
+ ],
+ )
r.add_vpp_config()
def unconfig_network(self, p):
@@ -198,9 +250,8 @@ class TemplateIpsec4TunProtect(object):
p.tun_sa_in.remove_vpp_config()
-class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect,
- TemplateIpsec):
- """ IPsec tunnel interface tests """
+class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec):
+ """IPsec tunnel interface tests"""
encryption_type = ESP
@@ -227,9 +278,8 @@ class TemplateIpsec4TunIfEsp(TemplateIpsec4TunProtect,
super(TemplateIpsec4TunIfEsp, self).tearDown()
-class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect,
- TemplateIpsec):
- """ IPsec UDP tunnel interface tests """
+class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect, TemplateIpsec):
+ """IPsec UDP tunnel interface tests"""
tun4_encrypt_node_name = "esp4-encrypt-tun"
tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
@@ -270,30 +320,41 @@ class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect,
def config_sa_tra(self, p):
config_tun_params(p, self.encryption_type, p.tun_if)
- p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol,
- flags=p.flags,
- udp_src=p.nat_header.sport,
- udp_dst=p.nat_header.dport)
+ p.tun_sa_out = VppIpsecSA(
+ self,
+ p.scapy_tun_sa_id,
+ p.scapy_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ flags=p.flags,
+ udp_src=p.nat_header.sport,
+ udp_dst=p.nat_header.dport,
+ )
p.tun_sa_out.add_vpp_config()
- p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol,
- flags=p.flags,
- udp_src=p.nat_header.sport,
- udp_dst=p.nat_header.dport)
+ p.tun_sa_in = VppIpsecSA(
+ self,
+ p.vpp_tun_sa_id,
+ p.vpp_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ flags=p.flags,
+ udp_src=p.nat_header.sport,
+ udp_dst=p.nat_header.dport,
+ )
p.tun_sa_in.add_vpp_config()
def setUp(self):
super(TemplateIpsec4TunIfEspUdp, self).setUp()
p = self.ipv4_params
- p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
- IPSEC_API_SAD_FLAG_UDP_ENCAP)
+ p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
p.nat_header = UDP(sport=5454, dport=4500)
self.tun_if = self.pg0
@@ -307,38 +368,38 @@ class TemplateIpsec4TunIfEspUdp(TemplateIpsec4TunProtect,
class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
- """ Ipsec ESP - TUN tests """
+ """Ipsec ESP - TUN tests"""
+
tun4_encrypt_node_name = "esp4-encrypt-tun"
tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
def test_tun_basic64(self):
- """ ipsec 6o4 tunnel basic test """
+ """ipsec 6o4 tunnel basic test"""
self.tun4_encrypt_node_name = "esp4-encrypt-tun"
self.verify_tun_64(self.params[socket.AF_INET], count=1)
def test_tun_burst64(self):
- """ ipsec 6o4 tunnel basic test """
+ """ipsec 6o4 tunnel basic test"""
self.tun4_encrypt_node_name = "esp4-encrypt-tun"
self.verify_tun_64(self.params[socket.AF_INET], count=257)
def test_tun_basic_frag44(self):
- """ ipsec 4o4 tunnel frag basic test """
+ """ipsec 4o4 tunnel frag basic test"""
self.tun4_encrypt_node_name = "esp4-encrypt-tun"
p = self.ipv4_params
- self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
- [1500, 0, 0, 0])
- self.verify_tun_44(self.params[socket.AF_INET],
- count=1, payload_size=1800, n_rx=2)
- self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
- [9000, 0, 0, 0])
+ self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [1500, 0, 0, 0])
+ self.verify_tun_44(
+ self.params[socket.AF_INET], count=1, payload_size=1800, n_rx=2
+ )
+ self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index, [9000, 0, 0, 0])
class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
- """ Ipsec ESP UDP tests """
+ """Ipsec ESP UDP tests"""
tun4_input_node = "ipsec4-tun-input"
@@ -346,22 +407,22 @@ class TestIpsec4TunIfEspUdp(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
super(TestIpsec4TunIfEspUdp, self).setUp()
def test_keepalive(self):
- """ IPSEC NAT Keepalive """
+ """IPSEC NAT Keepalive"""
self.verify_keepalive(self.ipv4_params)
class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
- """ Ipsec ESP UDP GCM tests """
+ """Ipsec ESP UDP GCM tests"""
tun4_input_node = "ipsec4-tun-input"
def setUp(self):
super(TestIpsec4TunIfEspUdpGCM, self).setUp()
p = self.ipv4_params
- p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
- IPSEC_API_INTEG_ALG_NONE)
- p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
- IPSEC_API_CRYPTO_ALG_AES_GCM_256)
+ p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
+ p.crypt_algo_vpp_id = (
+ VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
+ )
p.crypt_algo = "AES-GCM"
p.auth_algo = "NULL"
p.crypt_key = b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"
@@ -369,75 +430,104 @@ class TestIpsec4TunIfEspUdpGCM(TemplateIpsec4TunIfEspUdp, IpsecTun4Tests):
class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
- """ Ipsec ESP - TCP tests """
+ """Ipsec ESP - TCP tests"""
+
pass
class TemplateIpsec6TunProtect(object):
- """ IPsec IPv6 Tunnel protect """
+ """IPsec IPv6 Tunnel protect"""
def config_sa_tra(self, p):
config_tun_params(p, self.encryption_type, p.tun_if)
- p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol)
+ p.tun_sa_out = VppIpsecSA(
+ self,
+ p.scapy_tun_sa_id,
+ p.scapy_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ )
p.tun_sa_out.add_vpp_config()
- p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol)
+ p.tun_sa_in = VppIpsecSA(
+ self,
+ p.vpp_tun_sa_id,
+ p.vpp_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ )
p.tun_sa_in.add_vpp_config()
def config_sa_tun(self, p):
config_tun_params(p, self.encryption_type, p.tun_if)
- p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol,
- self.tun_if.local_addr[p.addr_type],
- self.tun_if.remote_addr[p.addr_type])
+ p.tun_sa_out = VppIpsecSA(
+ self,
+ p.scapy_tun_sa_id,
+ p.scapy_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ self.tun_if.local_addr[p.addr_type],
+ self.tun_if.remote_addr[p.addr_type],
+ )
p.tun_sa_out.add_vpp_config()
- p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol,
- self.tun_if.remote_addr[p.addr_type],
- self.tun_if.local_addr[p.addr_type])
+ p.tun_sa_in = VppIpsecSA(
+ self,
+ p.vpp_tun_sa_id,
+ p.vpp_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ self.tun_if.remote_addr[p.addr_type],
+ self.tun_if.local_addr[p.addr_type],
+ )
p.tun_sa_in.add_vpp_config()
def config_protect(self, p):
- p.tun_protect = VppIpsecTunProtect(self,
- p.tun_if,
- p.tun_sa_out,
- [p.tun_sa_in])
+ p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
p.tun_protect.add_vpp_config()
def config_network(self, p):
- if hasattr(p, 'tun_dst'):
+ if hasattr(p, "tun_dst"):
tun_dst = p.tun_dst
else:
tun_dst = self.pg0.remote_ip6
- p.tun_if = VppIpIpTunInterface(self, self.pg0,
- self.pg0.local_ip6,
- tun_dst)
+ p.tun_if = VppIpIpTunInterface(self, self.pg0, self.pg0.local_ip6, tun_dst)
p.tun_if.add_vpp_config()
p.tun_if.admin_up()
p.tun_if.config_ip6()
p.tun_if.config_ip4()
- p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
- [VppRoutePath(p.tun_if.remote_ip6,
- 0xffffffff,
- proto=DpoProto.DPO_PROTO_IP6)])
+ p.route = VppIpRoute(
+ self,
+ p.remote_tun_if_host,
+ 128,
+ [
+ VppRoutePath(
+ p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
+ )
+ ],
+ )
p.route.add_vpp_config()
- r = VppIpRoute(self, p.remote_tun_if_host4, 32,
- [VppRoutePath(p.tun_if.remote_ip4,
- 0xffffffff)])
+ r = VppIpRoute(
+ self,
+ p.remote_tun_if_host4,
+ 32,
+ [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
+ )
r.add_vpp_config()
def unconfig_network(self, p):
@@ -452,9 +542,8 @@ class TemplateIpsec6TunProtect(object):
p.tun_sa_in.remove_vpp_config()
-class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect,
- TemplateIpsec):
- """ IPsec tunnel interface tests """
+class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec):
+ """IPsec tunnel interface tests"""
encryption_type = ESP
@@ -472,31 +561,31 @@ class TemplateIpsec6TunIfEsp(TemplateIpsec6TunProtect,
super(TemplateIpsec6TunIfEsp, self).tearDown()
-class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp,
- IpsecTun6Tests):
- """ Ipsec ESP - TUN tests """
+class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
+ """Ipsec ESP - TUN tests"""
+
tun6_encrypt_node_name = "esp6-encrypt-tun"
tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
def test_tun_basic46(self):
- """ ipsec 4o6 tunnel basic test """
+ """ipsec 4o6 tunnel basic test"""
self.tun6_encrypt_node_name = "esp6-encrypt-tun"
self.verify_tun_46(self.params[socket.AF_INET6], count=1)
def test_tun_burst46(self):
- """ ipsec 4o6 tunnel burst test """
+ """ipsec 4o6 tunnel burst test"""
self.tun6_encrypt_node_name = "esp6-encrypt-tun"
self.verify_tun_46(self.params[socket.AF_INET6], count=257)
-class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
- IpsecTun6HandoffTests):
- """ Ipsec ESP 6 Handoff tests """
+class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp, IpsecTun6HandoffTests):
+ """Ipsec ESP 6 Handoff tests"""
+
tun6_encrypt_node_name = "esp6-encrypt-tun"
tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
def test_tun_handoff_66_police(self):
- """ ESP 6o6 tunnel with policer worker hand-off test """
+ """ESP 6o6 tunnel with policer worker hand-off test"""
self.vapi.cli("clear errors")
self.vapi.cli("clear ipsec sa")
@@ -504,12 +593,19 @@ class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
p = self.params[socket.AF_INET6]
action_tx = PolicerAction(
- VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
- 0)
- policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
- conform_action=action_tx,
- exceed_action=action_tx,
- violate_action=action_tx)
+ VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
+ )
+ policer = VppPolicer(
+ self,
+ "pol1",
+ 80,
+ 0,
+ 1000,
+ 0,
+ conform_action=action_tx,
+ exceed_action=action_tx,
+ violate_action=action_tx,
+ )
policer.add_vpp_config()
# Start policing on tun
@@ -520,13 +616,17 @@ class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
# inject alternately on worker 0 and 1.
for worker in [0, 1, 0, 1]:
- send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa,
- self.tun_if,
- src=p.remote_tun_if_host,
- dst=self.pg1.remote_ip6,
- count=N_PKTS)
- recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
- self.pg1, worker=worker)
+ send_pkts = self.gen_encrypt_pkts6(
+ p,
+ p.scapy_tun_sa,
+ self.tun_if,
+ src=p.remote_tun_if_host,
+ dst=self.pg1.remote_ip6,
+ count=N_PKTS,
+ )
+ recv_pkts = self.send_and_expect(
+ self.tun_if, send_pkts, self.pg1, worker=worker
+ )
self.verify_decrypted6(p, recv_pkts)
self.logger.debug(self.vapi.cli("show trace max 100"))
@@ -539,36 +639,36 @@ class TestIpsec6TunIfEspHandoff(TemplateIpsec6TunIfEsp,
self.assertEqual(stats, stats1)
# Worker 0, should have handed everything off
- self.assertEqual(stats0['conform_packets'], 0)
- self.assertEqual(stats0['exceed_packets'], 0)
- self.assertEqual(stats0['violate_packets'], 0)
+ self.assertEqual(stats0["conform_packets"], 0)
+ self.assertEqual(stats0["exceed_packets"], 0)
+ self.assertEqual(stats0["violate_packets"], 0)
else:
# Second pass: both workers should have policed equal amounts
- self.assertGreater(stats1['conform_packets'], 0)
- self.assertEqual(stats1['exceed_packets'], 0)
- self.assertGreater(stats1['violate_packets'], 0)
+ self.assertGreater(stats1["conform_packets"], 0)
+ self.assertEqual(stats1["exceed_packets"], 0)
+ self.assertGreater(stats1["violate_packets"], 0)
- self.assertGreater(stats0['conform_packets'], 0)
- self.assertEqual(stats0['exceed_packets'], 0)
- self.assertGreater(stats0['violate_packets'], 0)
+ self.assertGreater(stats0["conform_packets"], 0)
+ self.assertEqual(stats0["exceed_packets"], 0)
+ self.assertGreater(stats0["violate_packets"], 0)
- self.assertEqual(stats0['conform_packets'] +
- stats0['violate_packets'],
- stats1['conform_packets'] +
- stats1['violate_packets'])
+ self.assertEqual(
+ stats0["conform_packets"] + stats0["violate_packets"],
+ stats1["conform_packets"] + stats1["violate_packets"],
+ )
policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
policer.remove_vpp_config()
-class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
- IpsecTun4HandoffTests):
- """ Ipsec ESP 4 Handoff tests """
+class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp, IpsecTun4HandoffTests):
+ """Ipsec ESP 4 Handoff tests"""
+
tun4_encrypt_node_name = "esp4-encrypt-tun"
tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
def test_tun_handoff_44_police(self):
- """ ESP 4o4 tunnel with policer worker hand-off test """
+ """ESP 4o4 tunnel with policer worker hand-off test"""
self.vapi.cli("clear errors")
self.vapi.cli("clear ipsec sa")
@@ -576,12 +676,19 @@ class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
p = self.params[socket.AF_INET]
action_tx = PolicerAction(
- VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
- 0)
- policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
- conform_action=action_tx,
- exceed_action=action_tx,
- violate_action=action_tx)
+ VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
+ )
+ policer = VppPolicer(
+ self,
+ "pol1",
+ 80,
+ 0,
+ 1000,
+ 0,
+ conform_action=action_tx,
+ exceed_action=action_tx,
+ violate_action=action_tx,
+ )
policer.add_vpp_config()
# Start policing on tun
@@ -592,13 +699,17 @@ class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
# inject alternately on worker 0 and 1.
for worker in [0, 1, 0, 1]:
- send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa,
- self.tun_if,
- src=p.remote_tun_if_host,
- dst=self.pg1.remote_ip4,
- count=N_PKTS)
- recv_pkts = self.send_and_expect(self.tun_if, send_pkts,
- self.pg1, worker=worker)
+ send_pkts = self.gen_encrypt_pkts(
+ p,
+ p.scapy_tun_sa,
+ self.tun_if,
+ src=p.remote_tun_if_host,
+ dst=self.pg1.remote_ip4,
+ count=N_PKTS,
+ )
+ recv_pkts = self.send_and_expect(
+ self.tun_if, send_pkts, self.pg1, worker=worker
+ )
self.verify_decrypted(p, recv_pkts)
self.logger.debug(self.vapi.cli("show trace max 100"))
@@ -611,33 +722,31 @@ class TestIpsec4TunIfEspHandoff(TemplateIpsec4TunIfEsp,
self.assertEqual(stats, stats1)
# Worker 0, should have handed everything off
- self.assertEqual(stats0['conform_packets'], 0)
- self.assertEqual(stats0['exceed_packets'], 0)
- self.assertEqual(stats0['violate_packets'], 0)
+ self.assertEqual(stats0["conform_packets"], 0)
+ self.assertEqual(stats0["exceed_packets"], 0)
+ self.assertEqual(stats0["violate_packets"], 0)
else:
# Second pass: both workers should have policed equal amounts
- self.assertGreater(stats1['conform_packets'], 0)
- self.assertEqual(stats1['exceed_packets'], 0)
- self.assertGreater(stats1['violate_packets'], 0)
+ self.assertGreater(stats1["conform_packets"], 0)
+ self.assertEqual(stats1["exceed_packets"], 0)
+ self.assertGreater(stats1["violate_packets"], 0)
- self.assertGreater(stats0['conform_packets'], 0)
- self.assertEqual(stats0['exceed_packets'], 0)
- self.assertGreater(stats0['violate_packets'], 0)
+ self.assertGreater(stats0["conform_packets"], 0)
+ self.assertEqual(stats0["exceed_packets"], 0)
+ self.assertGreater(stats0["violate_packets"], 0)
- self.assertEqual(stats0['conform_packets'] +
- stats0['violate_packets'],
- stats1['conform_packets'] +
- stats1['violate_packets'])
+ self.assertEqual(
+ stats0["conform_packets"] + stats0["violate_packets"],
+ stats1["conform_packets"] + stats1["violate_packets"],
+ )
policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
policer.remove_vpp_config()
@tag_fixme_vpp_workers
-class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect,
- TemplateIpsec,
- IpsecTun4):
- """ IPsec IPv4 Multi Tunnel interface """
+class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
+ """IPsec IPv4 Multi Tunnel interface"""
encryption_type = ESP
tun4_encrypt_node_name = "esp4-encrypt-tun"
@@ -676,19 +785,23 @@ class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect,
super(TestIpsec4MultiTunIfEsp, self).tearDown()
def test_tun_44(self):
- """Multiple IPSEC tunnel interfaces """
+ """Multiple IPSEC tunnel interfaces"""
for p in self.multi_params:
self.verify_tun_44(p, count=127)
self.assertEqual(p.tun_if.get_rx_stats(), 127)
self.assertEqual(p.tun_if.get_tx_stats(), 127)
def test_tun_rr_44(self):
- """ Round-robin packets acrros multiple interface """
+ """Round-robin packets acrros multiple interface"""
tx = []
for p in self.multi_params:
- tx = tx + self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
- src=p.remote_tun_if_host,
- dst=self.pg1.remote_ip4)
+ tx = tx + self.gen_encrypt_pkts(
+ p,
+ p.scapy_tun_sa,
+ self.tun_if,
+ src=p.remote_tun_if_host,
+ dst=self.pg1.remote_ip4,
+ )
rxs = self.send_and_expect(self.tun_if, tx, self.pg1)
for rx, p in zip(rxs, self.multi_params):
@@ -696,18 +809,17 @@ class TestIpsec4MultiTunIfEsp(TemplateIpsec4TunProtect,
tx = []
for p in self.multi_params:
- tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
- dst=p.remote_tun_if_host)
+ tx = tx + self.gen_pkts(
+ self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host
+ )
rxs = self.send_and_expect(self.pg1, tx, self.tun_if)
for rx, p in zip(rxs, self.multi_params):
self.verify_encrypted(p, p.vpp_tun_sa, [rx])
-class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect,
- TemplateIpsec,
- IpsecTun4):
- """ IPsec IPv4 Tunnel interface all Algos """
+class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
+ """IPsec IPv4 Tunnel interface all Algos"""
encryption_type = ESP
tun4_encrypt_node_name = "esp4-encrypt-tun"
@@ -736,7 +848,7 @@ class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect,
# change the key and the SPI
#
np = copy.copy(p)
- p.crypt_key = b'X' + p.crypt_key[1:]
+ p.crypt_key = b"X" + p.crypt_key[1:]
p.scapy_tun_spi += 1
p.scapy_tun_sa_id += 1
p.vpp_tun_spi += 1
@@ -746,26 +858,30 @@ class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect,
config_tun_params(p, self.encryption_type, p.tun_if)
- p.tun_sa_out = VppIpsecSA(self,
- p.scapy_tun_sa_id,
- p.scapy_tun_spi,
- p.auth_algo_vpp_id,
- p.auth_key,
- p.crypt_algo_vpp_id,
- p.crypt_key,
- self.vpp_esp_protocol,
- flags=p.flags,
- salt=p.salt)
- p.tun_sa_in = VppIpsecSA(self,
- p.vpp_tun_sa_id,
- p.vpp_tun_spi,
- p.auth_algo_vpp_id,
- p.auth_key,
- p.crypt_algo_vpp_id,
- p.crypt_key,
- self.vpp_esp_protocol,
- flags=p.flags,
- salt=p.salt)
+ p.tun_sa_out = VppIpsecSA(
+ self,
+ p.scapy_tun_sa_id,
+ p.scapy_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ flags=p.flags,
+ salt=p.salt,
+ )
+ p.tun_sa_in = VppIpsecSA(
+ self,
+ p.vpp_tun_sa_id,
+ p.vpp_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ flags=p.flags,
+ salt=p.salt,
+ )
p.tun_sa_in.add_vpp_config()
p.tun_sa_out.add_vpp_config()
@@ -775,68 +891,98 @@ class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect,
self.logger.info(self.vapi.cli("sh ipsec sa"))
def test_tun_44(self):
- """IPSEC tunnel all algos """
+ """IPSEC tunnel all algos"""
# foreach VPP crypto engine
engines = ["ia32", "ipsecmb", "openssl"]
# foreach crypto algorithm
- algos = [{'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
- IPSEC_API_CRYPTO_ALG_AES_GCM_128),
- 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
- IPSEC_API_INTEG_ALG_NONE),
- 'scapy-crypto': "AES-GCM",
- 'scapy-integ': "NULL",
- 'key': b"JPjyOWBeVEQiMe7h",
- 'salt': 3333},
- {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
- IPSEC_API_CRYPTO_ALG_AES_GCM_192),
- 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
- IPSEC_API_INTEG_ALG_NONE),
- 'scapy-crypto': "AES-GCM",
- 'scapy-integ': "NULL",
- 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe",
- 'salt': 0},
- {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
- IPSEC_API_CRYPTO_ALG_AES_GCM_256),
- 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
- IPSEC_API_INTEG_ALG_NONE),
- 'scapy-crypto': "AES-GCM",
- 'scapy-integ': "NULL",
- 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
- 'salt': 9999},
- {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
- IPSEC_API_CRYPTO_ALG_AES_CBC_128),
- 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
- IPSEC_API_INTEG_ALG_SHA1_96),
- 'scapy-crypto': "AES-CBC",
- 'scapy-integ': "HMAC-SHA1-96",
- 'salt': 0,
- 'key': b"JPjyOWBeVEQiMe7h"},
- {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
- IPSEC_API_CRYPTO_ALG_AES_CBC_192),
- 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
- IPSEC_API_INTEG_ALG_SHA_512_256),
- 'scapy-crypto': "AES-CBC",
- 'scapy-integ': "SHA2-512-256",
- 'salt': 0,
- 'key': b"JPjyOWBeVEQiMe7hJPjyOWBe"},
- {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
- IPSEC_API_CRYPTO_ALG_AES_CBC_256),
- 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
- IPSEC_API_INTEG_ALG_SHA_256_128),
- 'scapy-crypto': "AES-CBC",
- 'scapy-integ': "SHA2-256-128",
- 'salt': 0,
- 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"},
- {'vpp-crypto': (VppEnum.vl_api_ipsec_crypto_alg_t.
- IPSEC_API_CRYPTO_ALG_NONE),
- 'vpp-integ': (VppEnum.vl_api_ipsec_integ_alg_t.
- IPSEC_API_INTEG_ALG_SHA1_96),
- 'scapy-crypto': "NULL",
- 'scapy-integ': "HMAC-SHA1-96",
- 'salt': 0,
- 'key': b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h"}]
+ algos = [
+ {
+ "vpp-crypto": (
+ VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_128
+ ),
+ "vpp-integ": (
+ VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
+ ),
+ "scapy-crypto": "AES-GCM",
+ "scapy-integ": "NULL",
+ "key": b"JPjyOWBeVEQiMe7h",
+ "salt": 3333,
+ },
+ {
+ "vpp-crypto": (
+ VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_192
+ ),
+ "vpp-integ": (
+ VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
+ ),
+ "scapy-crypto": "AES-GCM",
+ "scapy-integ": "NULL",
+ "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
+ "salt": 0,
+ },
+ {
+ "vpp-crypto": (
+ VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_GCM_256
+ ),
+ "vpp-integ": (
+ VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
+ ),
+ "scapy-crypto": "AES-GCM",
+ "scapy-integ": "NULL",
+ "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
+ "salt": 9999,
+ },
+ {
+ "vpp-crypto": (
+ VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_128
+ ),
+ "vpp-integ": (
+ VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
+ ),
+ "scapy-crypto": "AES-CBC",
+ "scapy-integ": "HMAC-SHA1-96",
+ "salt": 0,
+ "key": b"JPjyOWBeVEQiMe7h",
+ },
+ {
+ "vpp-crypto": (
+ VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_192
+ ),
+ "vpp-integ": (
+ VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_512_256
+ ),
+ "scapy-crypto": "AES-CBC",
+ "scapy-integ": "SHA2-512-256",
+ "salt": 0,
+ "key": b"JPjyOWBeVEQiMe7hJPjyOWBe",
+ },
+ {
+ "vpp-crypto": (
+ VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_AES_CBC_256
+ ),
+ "vpp-integ": (
+ VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA_256_128
+ ),
+ "scapy-crypto": "AES-CBC",
+ "scapy-integ": "SHA2-256-128",
+ "salt": 0,
+ "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
+ },
+ {
+ "vpp-crypto": (
+ VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
+ ),
+ "vpp-integ": (
+ VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_SHA1_96
+ ),
+ "scapy-crypto": "NULL",
+ "scapy-integ": "HMAC-SHA1-96",
+ "salt": 0,
+ "key": b"JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h",
+ },
+ ]
for engine in engines:
self.vapi.cli("set crypto handler all %s" % engine)
@@ -848,12 +994,12 @@ class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect,
# with self.subTest(algo=algo['scapy']):
p = self.ipv4_params
- p.auth_algo_vpp_id = algo['vpp-integ']
- p.crypt_algo_vpp_id = algo['vpp-crypto']
- p.crypt_algo = algo['scapy-crypto']
- p.auth_algo = algo['scapy-integ']
- p.crypt_key = algo['key']
- p.salt = algo['salt']
+ p.auth_algo_vpp_id = algo["vpp-integ"]
+ p.crypt_algo_vpp_id = algo["vpp-crypto"]
+ p.crypt_algo = algo["scapy-crypto"]
+ p.auth_algo = algo["scapy-integ"]
+ p.crypt_key = algo["key"]
+ p.salt = algo["salt"]
#
# rekey the tunnel
@@ -862,10 +1008,8 @@ class TestIpsec4TunIfEspAll(TemplateIpsec4TunProtect,
self.verify_tun_44(p, count=127)
-class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect,
- TemplateIpsec,
- IpsecTun4):
- """ IPsec IPv4 Tunnel interface no Algos """
+class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect, TemplateIpsec, IpsecTun4):
+ """IPsec IPv4 Tunnel interface no Algos"""
encryption_type = ESP
tun4_encrypt_node_name = "esp4-encrypt-tun"
@@ -876,29 +1020,28 @@ class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect,
self.tun_if = self.pg0
p = self.ipv4_params
- p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
- IPSEC_API_INTEG_ALG_NONE)
- p.auth_algo = 'NULL'
+ p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
+ p.auth_algo = "NULL"
p.auth_key = []
- p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
- IPSEC_API_CRYPTO_ALG_NONE)
- p.crypt_algo = 'NULL'
+ p.crypt_algo_vpp_id = (
+ VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
+ )
+ p.crypt_algo = "NULL"
p.crypt_key = []
def tearDown(self):
super(TestIpsec4TunIfEspNoAlgo, self).tearDown()
def test_tun_44(self):
- """ IPSec SA with NULL algos """
+ """IPSec SA with NULL algos"""
p = self.ipv4_params
self.config_network(p)
self.config_sa_tra(p)
self.config_protect(p)
- tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
- dst=p.remote_tun_if_host)
+ tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4, dst=p.remote_tun_if_host)
self.send_and_assert_no_replies(self.pg1, tx)
self.unconfig_protect(p)
@@ -907,10 +1050,8 @@ class TestIpsec4TunIfEspNoAlgo(TemplateIpsec4TunProtect,
@tag_fixme_vpp_workers
-class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect,
- TemplateIpsec,
- IpsecTun6):
- """ IPsec IPv6 Multi Tunnel interface """
+class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect, TemplateIpsec, IpsecTun6):
+ """IPsec IPv6 Multi Tunnel interface"""
encryption_type = ESP
tun6_encrypt_node_name = "esp6-encrypt-tun"
@@ -949,40 +1090,43 @@ class TestIpsec6MultiTunIfEsp(TemplateIpsec6TunProtect,
super(TestIpsec6MultiTunIfEsp, self).tearDown()
def test_tun_66(self):
- """Multiple IPSEC tunnel interfaces """
+ """Multiple IPSEC tunnel interfaces"""
for p in self.multi_params:
self.verify_tun_66(p, count=127)
self.assertEqual(p.tun_if.get_rx_stats(), 127)
self.assertEqual(p.tun_if.get_tx_stats(), 127)
-class TestIpsecGreTebIfEsp(TemplateIpsec,
- IpsecTun4Tests):
- """ Ipsec GRE TEB ESP - TUN tests """
+class TestIpsecGreTebIfEsp(TemplateIpsec, IpsecTun4Tests):
+ """Ipsec GRE TEB ESP - TUN tests"""
+
tun4_encrypt_node_name = "esp4-encrypt-tun"
tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
encryption_type = ESP
omac = "00:11:22:33:44:55"
- def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(IP(src=self.pg0.remote_ip4,
- dst=self.pg0.local_ip4) /
- GRE() /
- Ether(dst=self.omac) /
- IP(src="1.1.1.1", dst="1.1.1.2") /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size))
- for i in range(count)]
-
- def gen_pkts(self, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(dst=self.omac) /
- IP(src="1.1.1.1", dst="1.1.1.2") /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size)
- for i in range(count)]
+ def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / sa.encrypt(
+ IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
+ / GRE()
+ / Ether(dst=self.omac)
+ / IP(src="1.1.1.1", dst="1.1.1.2")
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ )
+ for i in range(count)
+ ]
+
+ def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(dst=self.omac)
+ / IP(src="1.1.1.1", dst="1.1.1.2")
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ for i in range(count)
+ ]
def verify_decrypted(self, p, rxs):
for rx in rxs:
@@ -1020,33 +1164,43 @@ class TestIpsecGreTebIfEsp(TemplateIpsec,
bd1 = VppBridgeDomain(self, 1)
bd1.add_vpp_config()
- p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol,
- self.pg0.local_ip4,
- self.pg0.remote_ip4)
+ p.tun_sa_out = VppIpsecSA(
+ self,
+ p.scapy_tun_sa_id,
+ p.scapy_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ self.pg0.local_ip4,
+ self.pg0.remote_ip4,
+ )
p.tun_sa_out.add_vpp_config()
- p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol,
- self.pg0.remote_ip4,
- self.pg0.local_ip4)
+ p.tun_sa_in = VppIpsecSA(
+ self,
+ p.vpp_tun_sa_id,
+ p.vpp_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ self.pg0.remote_ip4,
+ self.pg0.local_ip4,
+ )
p.tun_sa_in.add_vpp_config()
- p.tun_if = VppGreInterface(self,
- self.pg0.local_ip4,
- self.pg0.remote_ip4,
- type=(VppEnum.vl_api_gre_tunnel_type_t.
- GRE_API_TUNNEL_TYPE_TEB))
+ p.tun_if = VppGreInterface(
+ self,
+ self.pg0.local_ip4,
+ self.pg0.remote_ip4,
+ type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
+ )
p.tun_if.add_vpp_config()
- p.tun_protect = VppIpsecTunProtect(self,
- p.tun_if,
- p.tun_sa_out,
- [p.tun_sa_in])
+ p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
p.tun_protect.add_vpp_config()
@@ -1067,34 +1221,37 @@ class TestIpsecGreTebIfEsp(TemplateIpsec,
super(TestIpsecGreTebIfEsp, self).tearDown()
-class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
- IpsecTun4Tests):
- """ Ipsec GRE TEB ESP - TUN tests """
+class TestIpsecGreTebVlanIfEsp(TemplateIpsec, IpsecTun4Tests):
+ """Ipsec GRE TEB ESP - TUN tests"""
+
tun4_encrypt_node_name = "esp4-encrypt-tun"
tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
encryption_type = ESP
omac = "00:11:22:33:44:55"
- def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(IP(src=self.pg0.remote_ip4,
- dst=self.pg0.local_ip4) /
- GRE() /
- Ether(dst=self.omac) /
- IP(src="1.1.1.1", dst="1.1.1.2") /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size))
- for i in range(count)]
-
- def gen_pkts(self, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(dst=self.omac) /
- Dot1Q(vlan=11) /
- IP(src="1.1.1.1", dst="1.1.1.2") /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size)
- for i in range(count)]
+ def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / sa.encrypt(
+ IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
+ / GRE()
+ / Ether(dst=self.omac)
+ / IP(src="1.1.1.1", dst="1.1.1.2")
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ )
+ for i in range(count)
+ ]
+
+ def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(dst=self.omac)
+ / Dot1Q(vlan=11)
+ / IP(src="1.1.1.1", dst="1.1.1.2")
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ for i in range(count)
+ ]
def verify_decrypted(self, p, rxs):
for rx in rxs:
@@ -1136,37 +1293,49 @@ class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
self.pg1_11 = VppDot1QSubint(self, self.pg1, 11)
self.vapi.l2_interface_vlan_tag_rewrite(
- sw_if_index=self.pg1_11.sw_if_index, vtr_op=L2_VTR_OP.L2_POP_1,
- push_dot1q=11)
+ sw_if_index=self.pg1_11.sw_if_index,
+ vtr_op=L2_VTR_OP.L2_POP_1,
+ push_dot1q=11,
+ )
self.pg1_11.admin_up()
- p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol,
- self.pg0.local_ip4,
- self.pg0.remote_ip4)
+ p.tun_sa_out = VppIpsecSA(
+ self,
+ p.scapy_tun_sa_id,
+ p.scapy_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ self.pg0.local_ip4,
+ self.pg0.remote_ip4,
+ )
p.tun_sa_out.add_vpp_config()
- p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol,
- self.pg0.remote_ip4,
- self.pg0.local_ip4)
+ p.tun_sa_in = VppIpsecSA(
+ self,
+ p.vpp_tun_sa_id,
+ p.vpp_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ self.pg0.remote_ip4,
+ self.pg0.local_ip4,
+ )
p.tun_sa_in.add_vpp_config()
- p.tun_if = VppGreInterface(self,
- self.pg0.local_ip4,
- self.pg0.remote_ip4,
- type=(VppEnum.vl_api_gre_tunnel_type_t.
- GRE_API_TUNNEL_TYPE_TEB))
+ p.tun_if = VppGreInterface(
+ self,
+ self.pg0.local_ip4,
+ self.pg0.remote_ip4,
+ type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
+ )
p.tun_if.add_vpp_config()
- p.tun_protect = VppIpsecTunProtect(self,
- p.tun_if,
- p.tun_sa_out,
- [p.tun_sa_in])
+ p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
p.tun_protect.add_vpp_config()
@@ -1187,33 +1356,36 @@ class TestIpsecGreTebVlanIfEsp(TemplateIpsec,
self.pg1_11.remove_vpp_config()
-class TestIpsecGreTebIfEspTra(TemplateIpsec,
- IpsecTun4Tests):
- """ Ipsec GRE TEB ESP - Tra tests """
+class TestIpsecGreTebIfEspTra(TemplateIpsec, IpsecTun4Tests):
+ """Ipsec GRE TEB ESP - Tra tests"""
+
tun4_encrypt_node_name = "esp4-encrypt-tun"
tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
encryption_type = ESP
omac = "00:11:22:33:44:55"
- def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(IP(src=self.pg0.remote_ip4,
- dst=self.pg0.local_ip4) /
- GRE() /
- Ether(dst=self.omac) /
- IP(src="1.1.1.1", dst="1.1.1.2") /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size))
- for i in range(count)]
-
- def gen_pkts(self, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(dst=self.omac) /
- IP(src="1.1.1.1", dst="1.1.1.2") /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size)
- for i in range(count)]
+ def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / sa.encrypt(
+ IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
+ / GRE()
+ / Ether(dst=self.omac)
+ / IP(src="1.1.1.1", dst="1.1.1.2")
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ )
+ for i in range(count)
+ ]
+
+ def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(dst=self.omac)
+ / IP(src="1.1.1.1", dst="1.1.1.2")
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ for i in range(count)
+ ]
def verify_decrypted(self, p, rxs):
for rx in rxs:
@@ -1251,29 +1423,39 @@ class TestIpsecGreTebIfEspTra(TemplateIpsec,
bd1 = VppBridgeDomain(self, 1)
bd1.add_vpp_config()
- p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol)
+ p.tun_sa_out = VppIpsecSA(
+ self,
+ p.scapy_tun_sa_id,
+ p.scapy_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ )
p.tun_sa_out.add_vpp_config()
- p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol)
+ p.tun_sa_in = VppIpsecSA(
+ self,
+ p.vpp_tun_sa_id,
+ p.vpp_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ )
p.tun_sa_in.add_vpp_config()
- p.tun_if = VppGreInterface(self,
- self.pg0.local_ip4,
- self.pg0.remote_ip4,
- type=(VppEnum.vl_api_gre_tunnel_type_t.
- GRE_API_TUNNEL_TYPE_TEB))
+ p.tun_if = VppGreInterface(
+ self,
+ self.pg0.local_ip4,
+ self.pg0.remote_ip4,
+ type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
+ )
p.tun_if.add_vpp_config()
- p.tun_protect = VppIpsecTunProtect(self,
- p.tun_if,
- p.tun_sa_out,
- [p.tun_sa_in])
+ p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
p.tun_protect.add_vpp_config()
@@ -1292,33 +1474,36 @@ class TestIpsecGreTebIfEspTra(TemplateIpsec,
super(TestIpsecGreTebIfEspTra, self).tearDown()
-class TestIpsecGreTebUdpIfEspTra(TemplateIpsec,
- IpsecTun4Tests):
- """ Ipsec GRE TEB UDP ESP - Tra tests """
+class TestIpsecGreTebUdpIfEspTra(TemplateIpsec, IpsecTun4Tests):
+ """Ipsec GRE TEB UDP ESP - Tra tests"""
+
tun4_encrypt_node_name = "esp4-encrypt-tun"
tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
encryption_type = ESP
omac = "00:11:22:33:44:55"
- def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(IP(src=self.pg0.remote_ip4,
- dst=self.pg0.local_ip4) /
- GRE() /
- Ether(dst=self.omac) /
- IP(src="1.1.1.1", dst="1.1.1.2") /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size))
- for i in range(count)]
-
- def gen_pkts(self, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(dst=self.omac) /
- IP(src="1.1.1.1", dst="1.1.1.2") /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size)
- for i in range(count)]
+ def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / sa.encrypt(
+ IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
+ / GRE()
+ / Ether(dst=self.omac)
+ / IP(src="1.1.1.1", dst="1.1.1.2")
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ )
+ for i in range(count)
+ ]
+
+ def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(dst=self.omac)
+ / IP(src="1.1.1.1", dst="1.1.1.2")
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ for i in range(count)
+ ]
def verify_decrypted(self, p, rxs):
for rx in rxs:
@@ -1356,44 +1541,53 @@ class TestIpsecGreTebUdpIfEspTra(TemplateIpsec,
p = self.ipv4_params
p = self.ipv4_params
- p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
- IPSEC_API_SAD_FLAG_UDP_ENCAP)
+ p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
p.nat_header = UDP(sport=5454, dport=4545)
bd1 = VppBridgeDomain(self, 1)
bd1.add_vpp_config()
- p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol,
- flags=p.flags,
- udp_src=5454,
- udp_dst=4545)
+ p.tun_sa_out = VppIpsecSA(
+ self,
+ p.scapy_tun_sa_id,
+ p.scapy_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ flags=p.flags,
+ udp_src=5454,
+ udp_dst=4545,
+ )
p.tun_sa_out.add_vpp_config()
- p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol,
- flags=(p.flags |
- VppEnum.vl_api_ipsec_sad_flags_t.
- IPSEC_API_SAD_FLAG_IS_INBOUND),
- udp_src=4545,
- udp_dst=5454)
+ p.tun_sa_in = VppIpsecSA(
+ self,
+ p.vpp_tun_sa_id,
+ p.vpp_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ flags=(
+ p.flags | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND
+ ),
+ udp_src=4545,
+ udp_dst=5454,
+ )
p.tun_sa_in.add_vpp_config()
- p.tun_if = VppGreInterface(self,
- self.pg0.local_ip4,
- self.pg0.remote_ip4,
- type=(VppEnum.vl_api_gre_tunnel_type_t.
- GRE_API_TUNNEL_TYPE_TEB))
+ p.tun_if = VppGreInterface(
+ self,
+ self.pg0.local_ip4,
+ self.pg0.remote_ip4,
+ type=(VppEnum.vl_api_gre_tunnel_type_t.GRE_API_TUNNEL_TYPE_TEB),
+ )
p.tun_if.add_vpp_config()
- p.tun_protect = VppIpsecTunProtect(self,
- p.tun_if,
- p.tun_sa_out,
- [p.tun_sa_in])
+ p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
p.tun_protect.add_vpp_config()
@@ -1413,32 +1607,34 @@ class TestIpsecGreTebUdpIfEspTra(TemplateIpsec,
super(TestIpsecGreTebUdpIfEspTra, self).tearDown()
-class TestIpsecGreIfEsp(TemplateIpsec,
- IpsecTun4Tests):
- """ Ipsec GRE ESP - TUN tests """
+class TestIpsecGreIfEsp(TemplateIpsec, IpsecTun4Tests):
+ """Ipsec GRE ESP - TUN tests"""
+
tun4_encrypt_node_name = "esp4-encrypt-tun"
tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
encryption_type = ESP
- def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(IP(src=self.pg0.remote_ip4,
- dst=self.pg0.local_ip4) /
- GRE() /
- IP(src=self.pg1.local_ip4,
- dst=self.pg1.remote_ip4) /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size))
- for i in range(count)]
-
- def gen_pkts(self, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- IP(src="1.1.1.1", dst="1.1.1.2") /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size)
- for i in range(count)]
+ def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / sa.encrypt(
+ IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
+ / GRE()
+ / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ )
+ for i in range(count)
+ ]
+
+ def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / IP(src="1.1.1.1", dst="1.1.1.2")
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ for i in range(count)
+ ]
def verify_decrypted(self, p, rxs):
for rx in rxs:
@@ -1475,40 +1671,47 @@ class TestIpsecGreIfEsp(TemplateIpsec,
bd1 = VppBridgeDomain(self, 1)
bd1.add_vpp_config()
- p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol,
- self.pg0.local_ip4,
- self.pg0.remote_ip4)
+ p.tun_sa_out = VppIpsecSA(
+ self,
+ p.scapy_tun_sa_id,
+ p.scapy_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ self.pg0.local_ip4,
+ self.pg0.remote_ip4,
+ )
p.tun_sa_out.add_vpp_config()
- p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol,
- self.pg0.remote_ip4,
- self.pg0.local_ip4)
+ p.tun_sa_in = VppIpsecSA(
+ self,
+ p.vpp_tun_sa_id,
+ p.vpp_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ self.pg0.remote_ip4,
+ self.pg0.local_ip4,
+ )
p.tun_sa_in.add_vpp_config()
- p.tun_if = VppGreInterface(self,
- self.pg0.local_ip4,
- self.pg0.remote_ip4)
+ p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
p.tun_if.add_vpp_config()
- p.tun_protect = VppIpsecTunProtect(self,
- p.tun_if,
- p.tun_sa_out,
- [p.tun_sa_in])
+ p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
p.tun_protect.add_vpp_config()
p.tun_if.admin_up()
p.tun_if.config_ip4()
config_tun_params(p, self.encryption_type, p.tun_if)
- VppIpRoute(self, "1.1.1.2", 32,
- [VppRoutePath(p.tun_if.remote_ip4,
- 0xffffffff)]).add_vpp_config()
+ VppIpRoute(
+ self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
+ ).add_vpp_config()
def tearDown(self):
p = self.ipv4_params
@@ -1516,42 +1719,46 @@ class TestIpsecGreIfEsp(TemplateIpsec,
super(TestIpsecGreIfEsp, self).tearDown()
-class TestIpsecGreIfEspTra(TemplateIpsec,
- IpsecTun4Tests):
- """ Ipsec GRE ESP - TRA tests """
+class TestIpsecGreIfEspTra(TemplateIpsec, IpsecTun4Tests):
+ """Ipsec GRE ESP - TRA tests"""
+
tun4_encrypt_node_name = "esp4-encrypt-tun"
tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
encryption_type = ESP
- def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(IP(src=self.pg0.remote_ip4,
- dst=self.pg0.local_ip4) /
- GRE() /
- IP(src=self.pg1.local_ip4,
- dst=self.pg1.remote_ip4) /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size))
- for i in range(count)]
-
- def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(IP(src=self.pg0.remote_ip4,
- dst=self.pg0.local_ip4) /
- GRE() /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size))
- for i in range(count)]
-
- def gen_pkts(self, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- IP(src="1.1.1.1", dst="1.1.1.2") /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size)
- for i in range(count)]
+ def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / sa.encrypt(
+ IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
+ / GRE()
+ / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ )
+ for i in range(count)
+ ]
+
+ def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / sa.encrypt(
+ IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
+ / GRE()
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ )
+ for i in range(count)
+ ]
+
+ def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / IP(src="1.1.1.1", dst="1.1.1.2")
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ for i in range(count)
+ ]
def verify_decrypted(self, p, rxs):
for rx in rxs:
@@ -1583,36 +1790,43 @@ class TestIpsecGreIfEspTra(TemplateIpsec,
p = self.ipv4_params
- p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol)
+ p.tun_sa_out = VppIpsecSA(
+ self,
+ p.scapy_tun_sa_id,
+ p.scapy_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ )
p.tun_sa_out.add_vpp_config()
- p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol)
+ p.tun_sa_in = VppIpsecSA(
+ self,
+ p.vpp_tun_sa_id,
+ p.vpp_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ )
p.tun_sa_in.add_vpp_config()
- p.tun_if = VppGreInterface(self,
- self.pg0.local_ip4,
- self.pg0.remote_ip4)
+ p.tun_if = VppGreInterface(self, self.pg0.local_ip4, self.pg0.remote_ip4)
p.tun_if.add_vpp_config()
- p.tun_protect = VppIpsecTunProtect(self,
- p.tun_if,
- p.tun_sa_out,
- [p.tun_sa_in])
+ p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
p.tun_protect.add_vpp_config()
p.tun_if.admin_up()
p.tun_if.config_ip4()
config_tra_params(p, self.encryption_type, p.tun_if)
- VppIpRoute(self, "1.1.1.2", 32,
- [VppRoutePath(p.tun_if.remote_ip4,
- 0xffffffff)]).add_vpp_config()
+ VppIpRoute(
+ self, "1.1.1.2", 32, [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)]
+ ).add_vpp_config()
def tearDown(self):
p = self.ipv4_params
@@ -1621,41 +1835,45 @@ class TestIpsecGreIfEspTra(TemplateIpsec,
def test_gre_non_ip(self):
p = self.ipv4_params
- tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if,
- src=p.remote_tun_if_host,
- dst=self.pg1.remote_ip6)
+ tx = self.gen_encrypt_non_ip_pkts(
+ p.scapy_tun_sa,
+ self.tun_if,
+ src=p.remote_tun_if_host,
+ dst=self.pg1.remote_ip6,
+ )
self.send_and_assert_no_replies(self.tun_if, tx)
- node_name = ('/err/%s/unsupported payload' %
- self.tun4_decrypt_node_name[0])
+ node_name = "/err/%s/unsupported payload" % self.tun4_decrypt_node_name[0]
self.assertEqual(1, self.statistics.get_err_counter(node_name))
-class TestIpsecGre6IfEspTra(TemplateIpsec,
- IpsecTun6Tests):
- """ Ipsec GRE ESP - TRA tests """
+class TestIpsecGre6IfEspTra(TemplateIpsec, IpsecTun6Tests):
+ """Ipsec GRE ESP - TRA tests"""
+
tun6_encrypt_node_name = "esp6-encrypt-tun"
tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
encryption_type = ESP
- def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(IPv6(src=self.pg0.remote_ip6,
- dst=self.pg0.local_ip6) /
- GRE() /
- IPv6(src=self.pg1.local_ip6,
- dst=self.pg1.remote_ip6) /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size))
- for i in range(count)]
-
- def gen_pkts6(self, p, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- IPv6(src="1::1", dst="1::2") /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size)
- for i in range(count)]
+ def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / sa.encrypt(
+ IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
+ / GRE()
+ / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ )
+ for i in range(count)
+ ]
+
+ def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / IPv6(src="1::1", dst="1::2")
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ for i in range(count)
+ ]
def verify_decrypted6(self, p, rxs):
for rx in rxs:
@@ -1690,37 +1908,50 @@ class TestIpsecGre6IfEspTra(TemplateIpsec,
bd1 = VppBridgeDomain(self, 1)
bd1.add_vpp_config()
- p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol)
+ p.tun_sa_out = VppIpsecSA(
+ self,
+ p.scapy_tun_sa_id,
+ p.scapy_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ )
p.tun_sa_out.add_vpp_config()
- p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol)
+ p.tun_sa_in = VppIpsecSA(
+ self,
+ p.vpp_tun_sa_id,
+ p.vpp_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ )
p.tun_sa_in.add_vpp_config()
- p.tun_if = VppGreInterface(self,
- self.pg0.local_ip6,
- self.pg0.remote_ip6)
+ p.tun_if = VppGreInterface(self, self.pg0.local_ip6, self.pg0.remote_ip6)
p.tun_if.add_vpp_config()
- p.tun_protect = VppIpsecTunProtect(self,
- p.tun_if,
- p.tun_sa_out,
- [p.tun_sa_in])
+ p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
p.tun_protect.add_vpp_config()
p.tun_if.admin_up()
p.tun_if.config_ip6()
config_tra_params(p, self.encryption_type, p.tun_if)
- r = VppIpRoute(self, "1::2", 128,
- [VppRoutePath(p.tun_if.remote_ip6,
- 0xffffffff,
- proto=DpoProto.DPO_PROTO_IP6)])
+ r = VppIpRoute(
+ self,
+ "1::2",
+ 128,
+ [
+ VppRoutePath(
+ p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
+ )
+ ],
+ )
r.add_vpp_config()
def tearDown(self):
@@ -1730,30 +1961,33 @@ class TestIpsecGre6IfEspTra(TemplateIpsec,
class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
- """ Ipsec mGRE ESP v4 TRA tests """
+ """Ipsec mGRE ESP v4 TRA tests"""
+
tun4_encrypt_node_name = "esp4-encrypt-tun"
tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
encryption_type = ESP
- def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(IP(src=p.tun_dst,
- dst=self.pg0.local_ip4) /
- GRE() /
- IP(src=self.pg1.local_ip4,
- dst=self.pg1.remote_ip4) /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size))
- for i in range(count)]
-
- def gen_pkts(self, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- IP(src="1.1.1.1", dst=dst) /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size)
- for i in range(count)]
+ def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / sa.encrypt(
+ IP(src=p.tun_dst, dst=self.pg0.local_ip4)
+ / GRE()
+ / IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ )
+ for i in range(count)
+ ]
+
+ def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / IP(src="1.1.1.1", dst=dst)
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ for i in range(count)
+ ]
def verify_decrypted(self, p, rxs):
for rx in rxs:
@@ -1784,11 +2018,12 @@ class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
N_NHS = 16
self.tun_if = self.pg0
p = self.ipv4_params
- p.tun_if = VppGreInterface(self,
- self.pg0.local_ip4,
- "0.0.0.0",
- mode=(VppEnum.vl_api_tunnel_mode_t.
- TUNNEL_API_MODE_MP))
+ p.tun_if = VppGreInterface(
+ self,
+ self.pg0.local_ip4,
+ "0.0.0.0",
+ mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
+ )
p.tun_if.add_vpp_config()
p.tun_if.admin_up()
p.tun_if.config_ip4()
@@ -1812,16 +2047,28 @@ class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
p.scapy_tra_spi = p.scapy_tra_spi + ii
p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
p.vpp_tra_spi = p.vpp_tra_spi + ii
- p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol)
+ p.tun_sa_out = VppIpsecSA(
+ self,
+ p.scapy_tun_sa_id,
+ p.scapy_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ )
p.tun_sa_out.add_vpp_config()
- p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol)
+ p.tun_sa_in = VppIpsecSA(
+ self,
+ p.vpp_tun_sa_id,
+ p.vpp_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ )
p.tun_sa_in.add_vpp_config()
p.tun_protect = VppIpsecTunProtect(
@@ -1829,19 +2076,26 @@ class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
p.tun_if,
p.tun_sa_out,
[p.tun_sa_in],
- nh=p.tun_if.remote_hosts[ii].ip4)
+ nh=p.tun_if.remote_hosts[ii].ip4,
+ )
p.tun_protect.add_vpp_config()
config_tra_params(p, self.encryption_type, p.tun_if)
self.multi_params.append(p)
- VppIpRoute(self, p.remote_tun_if_host, 32,
- [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
- p.tun_if.sw_if_index)]).add_vpp_config()
+ VppIpRoute(
+ self,
+ p.remote_tun_if_host,
+ 32,
+ [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
+ ).add_vpp_config()
# in this v4 variant add the teibs after the protect
- p.teib = VppTeib(self, p.tun_if,
- p.tun_if.remote_hosts[ii].ip4,
- self.pg0.remote_hosts[ii].ip4).add_vpp_config()
+ p.teib = VppTeib(
+ self,
+ p.tun_if,
+ p.tun_if.remote_hosts[ii].ip4,
+ self.pg0.remote_hosts[ii].ip4,
+ ).add_vpp_config()
p.tun_dst = self.pg0.remote_hosts[ii].ip4
self.logger.info(self.vapi.cli("sh ipsec protect-hash"))
@@ -1862,30 +2116,33 @@ class TestIpsecMGreIfEspTra4(TemplateIpsec, IpsecTun4):
class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
- """ Ipsec mGRE ESP v6 TRA tests """
+ """Ipsec mGRE ESP v6 TRA tests"""
+
tun6_encrypt_node_name = "esp6-encrypt-tun"
tun6_decrypt_node_name = ["esp6-decrypt-tun", "esp6-decrypt-tun-post"]
encryption_type = ESP
- def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(IPv6(src=p.tun_dst,
- dst=self.pg0.local_ip6) /
- GRE() /
- IPv6(src=self.pg1.local_ip6,
- dst=self.pg1.remote_ip6) /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size))
- for i in range(count)]
-
- def gen_pkts6(self, p, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- IPv6(src="1::1", dst=dst) /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size)
- for i in range(count)]
+ def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / sa.encrypt(
+ IPv6(src=p.tun_dst, dst=self.pg0.local_ip6)
+ / GRE()
+ / IPv6(src=self.pg1.local_ip6, dst=self.pg1.remote_ip6)
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ )
+ for i in range(count)
+ ]
+
+ def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / IPv6(src="1::1", dst=dst)
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ for i in range(count)
+ ]
def verify_decrypted6(self, p, rxs):
for rx in rxs:
@@ -1918,11 +2175,12 @@ class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
N_NHS = 16
self.tun_if = self.pg0
p = self.ipv6_params
- p.tun_if = VppGreInterface(self,
- self.pg0.local_ip6,
- "::",
- mode=(VppEnum.vl_api_tunnel_mode_t.
- TUNNEL_API_MODE_MP))
+ p.tun_if = VppGreInterface(
+ self,
+ self.pg0.local_ip6,
+ "::",
+ mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP),
+ )
p.tun_if.add_vpp_config()
p.tun_if.admin_up()
p.tun_if.config_ip6()
@@ -1946,37 +2204,53 @@ class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
p.scapy_tra_spi = p.scapy_tra_spi + ii
p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
p.vpp_tra_spi = p.vpp_tra_spi + ii
- p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol)
+ p.tun_sa_out = VppIpsecSA(
+ self,
+ p.scapy_tun_sa_id,
+ p.scapy_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ )
p.tun_sa_out.add_vpp_config()
- p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol)
+ p.tun_sa_in = VppIpsecSA(
+ self,
+ p.vpp_tun_sa_id,
+ p.vpp_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ )
p.tun_sa_in.add_vpp_config()
# in this v6 variant add the teibs first then the protection
p.tun_dst = self.pg0.remote_hosts[ii].ip6
- VppTeib(self, p.tun_if,
- p.tun_if.remote_hosts[ii].ip6,
- p.tun_dst).add_vpp_config()
+ VppTeib(
+ self, p.tun_if, p.tun_if.remote_hosts[ii].ip6, p.tun_dst
+ ).add_vpp_config()
p.tun_protect = VppIpsecTunProtect(
self,
p.tun_if,
p.tun_sa_out,
[p.tun_sa_in],
- nh=p.tun_if.remote_hosts[ii].ip6)
+ nh=p.tun_if.remote_hosts[ii].ip6,
+ )
p.tun_protect.add_vpp_config()
config_tra_params(p, self.encryption_type, p.tun_if)
self.multi_params.append(p)
- VppIpRoute(self, p.remote_tun_if_host, 128,
- [VppRoutePath(p.tun_if.remote_hosts[ii].ip6,
- p.tun_if.sw_if_index)]).add_vpp_config()
+ VppIpRoute(
+ self,
+ p.remote_tun_if_host,
+ 128,
+ [VppRoutePath(p.tun_if.remote_hosts[ii].ip6, p.tun_if.sw_if_index)],
+ ).add_vpp_config()
p.tun_dst = self.pg0.remote_hosts[ii].ip6
self.logger.info(self.vapi.cli("sh log"))
@@ -1995,10 +2269,8 @@ class TestIpsecMGreIfEspTra6(TemplateIpsec, IpsecTun6):
@tag_fixme_vpp_workers
-class TestIpsec4TunProtect(TemplateIpsec,
- TemplateIpsec4TunProtect,
- IpsecTun4):
- """ IPsec IPv4 Tunnel protect - transport mode"""
+class TestIpsec4TunProtect(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
+ """IPsec IPv4 Tunnel protect - transport mode"""
def setUp(self):
super(TestIpsec4TunProtect, self).setUp()
@@ -2028,7 +2300,7 @@ class TestIpsec4TunProtect(TemplateIpsec,
# rekey - create new SAs and update the tunnel protection
np = copy.copy(p)
- np.crypt_key = b'X' + p.crypt_key[1:]
+ np.crypt_key = b"X" + p.crypt_key[1:]
np.scapy_tun_spi += 100
np.scapy_tun_sa_id += 1
np.vpp_tun_spi += 100
@@ -2051,10 +2323,8 @@ class TestIpsec4TunProtect(TemplateIpsec,
@tag_fixme_vpp_workers
-class TestIpsec4TunProtectUdp(TemplateIpsec,
- TemplateIpsec4TunProtect,
- IpsecTun4):
- """ IPsec IPv4 Tunnel protect - transport mode"""
+class TestIpsec4TunProtectUdp(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
+ """IPsec IPv4 Tunnel protect - transport mode"""
def setUp(self):
super(TestIpsec4TunProtectUdp, self).setUp()
@@ -2062,8 +2332,7 @@ class TestIpsec4TunProtectUdp(TemplateIpsec,
self.tun_if = self.pg0
p = self.ipv4_params
- p.flags = (VppEnum.vl_api_ipsec_sad_flags_t.
- IPSEC_API_SAD_FLAG_UDP_ENCAP)
+ p.flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
p.nat_header = UDP(sport=4500, dport=4500)
self.config_network(p)
self.config_sa_tra(p)
@@ -2093,15 +2362,13 @@ class TestIpsec4TunProtectUdp(TemplateIpsec,
self.assertEqual(p.tun_if.get_tx_stats(), 127)
def test_keepalive(self):
- """ IPSEC NAT Keepalive """
+ """IPSEC NAT Keepalive"""
self.verify_keepalive(self.ipv4_params)
@tag_fixme_vpp_workers
-class TestIpsec4TunProtectTun(TemplateIpsec,
- TemplateIpsec4TunProtect,
- IpsecTun4):
- """ IPsec IPv4 Tunnel protect - tunnel mode"""
+class TestIpsec4TunProtectTun(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
+ """IPsec IPv4 Tunnel protect - tunnel mode"""
encryption_type = ESP
tun4_encrypt_node_name = "esp4-encrypt-tun"
@@ -2115,23 +2382,26 @@ class TestIpsec4TunProtectTun(TemplateIpsec,
def tearDown(self):
super(TestIpsec4TunProtectTun, self).tearDown()
- def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(IP(src=sw_intf.remote_ip4,
- dst=sw_intf.local_ip4) /
- IP(src=src, dst=dst) /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size))
- for i in range(count)]
-
- def gen_pkts(self, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- IP(src=src, dst=dst) /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size)
- for i in range(count)]
+ def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / sa.encrypt(
+ IP(src=sw_intf.remote_ip4, dst=sw_intf.local_ip4)
+ / IP(src=src, dst=dst)
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ )
+ for i in range(count)
+ ]
+
+ def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / IP(src=src, dst=dst)
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ for i in range(count)
+ ]
def verify_decrypted(self, p, rxs):
for rx in rxs:
@@ -2160,7 +2430,7 @@ class TestIpsec4TunProtectTun(TemplateIpsec,
raise
def test_tun_44(self):
- """IPSEC tunnel protect """
+ """IPSEC tunnel protect"""
p = self.ipv4_params
@@ -2170,10 +2440,7 @@ class TestIpsec4TunProtectTun(TemplateIpsec,
# also add an output features on the tunnel and physical interface
# so we test they still work
- r_all = AclRule(True,
- src_prefix="0.0.0.0/0",
- dst_prefix="0.0.0.0/0",
- proto=0)
+ r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
a = VppAcl(self, [r_all]).add_vpp_config()
VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
@@ -2186,7 +2453,7 @@ class TestIpsec4TunProtectTun(TemplateIpsec,
# rekey - create new SAs and update the tunnel protection
np = copy.copy(p)
- np.crypt_key = b'X' + p.crypt_key[1:]
+ np.crypt_key = b"X" + p.crypt_key[1:]
np.scapy_tun_spi += 100
np.scapy_tun_sa_id += 1
np.vpp_tun_spi += 100
@@ -2208,10 +2475,8 @@ class TestIpsec4TunProtectTun(TemplateIpsec,
self.unconfig_network(p)
-class TestIpsec4TunProtectTunDrop(TemplateIpsec,
- TemplateIpsec4TunProtect,
- IpsecTun4):
- """ IPsec IPv4 Tunnel protect - tunnel mode - drop"""
+class TestIpsec4TunProtectTunDrop(TemplateIpsec, TemplateIpsec4TunProtect, IpsecTun4):
+ """IPsec IPv4 Tunnel protect - tunnel mode - drop"""
encryption_type = ESP
tun4_encrypt_node_name = "esp4-encrypt-tun"
@@ -2225,18 +2490,20 @@ class TestIpsec4TunProtectTunDrop(TemplateIpsec,
def tearDown(self):
super(TestIpsec4TunProtectTunDrop, self).tearDown()
- def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(IP(src=sw_intf.remote_ip4,
- dst="5.5.5.5") /
- IP(src=src, dst=dst) /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size))
- for i in range(count)]
+ def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / sa.encrypt(
+ IP(src=sw_intf.remote_ip4, dst="5.5.5.5")
+ / IP(src=src, dst=dst)
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ )
+ for i in range(count)
+ ]
def test_tun_drop_44(self):
- """IPSEC tunnel protect bogus tunnel header """
+ """IPSEC tunnel protect bogus tunnel header"""
p = self.ipv4_params
@@ -2244,10 +2511,14 @@ class TestIpsec4TunProtectTunDrop(TemplateIpsec,
self.config_sa_tun(p)
self.config_protect(p)
- tx = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if,
- src=p.remote_tun_if_host,
- dst=self.pg1.remote_ip4,
- count=63)
+ tx = self.gen_encrypt_pkts(
+ p,
+ p.scapy_tun_sa,
+ self.tun_if,
+ src=p.remote_tun_if_host,
+ dst=self.pg1.remote_ip4,
+ count=63,
+ )
self.send_and_assert_no_replies(self.tun_if, tx)
# teardown
@@ -2257,10 +2528,8 @@ class TestIpsec4TunProtectTunDrop(TemplateIpsec,
@tag_fixme_vpp_workers
-class TestIpsec6TunProtect(TemplateIpsec,
- TemplateIpsec6TunProtect,
- IpsecTun6):
- """ IPsec IPv6 Tunnel protect - transport mode"""
+class TestIpsec6TunProtect(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
+ """IPsec IPv6 Tunnel protect - transport mode"""
encryption_type = ESP
tun6_encrypt_node_name = "esp6-encrypt-tun"
@@ -2289,7 +2558,7 @@ class TestIpsec6TunProtect(TemplateIpsec,
# rekey - create new SAs and update the tunnel protection
np = copy.copy(p)
- np.crypt_key = b'X' + p.crypt_key[1:]
+ np.crypt_key = b"X" + p.crypt_key[1:]
np.scapy_tun_spi += 100
np.scapy_tun_sa_id += 1
np.vpp_tun_spi += 100
@@ -2308,8 +2577,9 @@ class TestIpsec6TunProtect(TemplateIpsec,
# bounce the interface state
p.tun_if.admin_down()
self.verify_drop_tun_66(np, count=127)
- node = ('/err/ipsec6-tun-input/%s' %
- 'ipsec packets received on disabled interface')
+ node = (
+ "/err/ipsec6-tun-input/%s" % "ipsec packets received on disabled interface"
+ )
self.assertEqual(127, self.statistics.get_err_counter(node))
p.tun_if.admin_up()
self.verify_tun_66(np, count=127)
@@ -2319,7 +2589,7 @@ class TestIpsec6TunProtect(TemplateIpsec,
# 2) swap output SA to [new]
# 3) use only [new] input SA
np3 = copy.copy(np)
- np3.crypt_key = b'Z' + p.crypt_key[1:]
+ np3.crypt_key = b"Z" + p.crypt_key[1:]
np3.scapy_tun_spi += 100
np3.scapy_tun_sa_id += 1
np3.vpp_tun_spi += 100
@@ -2330,25 +2600,22 @@ class TestIpsec6TunProtect(TemplateIpsec,
self.config_sa_tra(np3)
# step 1;
- p.tun_protect.update_vpp_config(np.tun_sa_out,
- [np.tun_sa_in, np3.tun_sa_in])
+ p.tun_protect.update_vpp_config(np.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
self.verify_tun_66(np, np, count=127)
self.verify_tun_66(np3, np, count=127)
# step 2;
- p.tun_protect.update_vpp_config(np3.tun_sa_out,
- [np.tun_sa_in, np3.tun_sa_in])
+ p.tun_protect.update_vpp_config(np3.tun_sa_out, [np.tun_sa_in, np3.tun_sa_in])
self.verify_tun_66(np, np3, count=127)
self.verify_tun_66(np3, np3, count=127)
# step 1;
- p.tun_protect.update_vpp_config(np3.tun_sa_out,
- [np3.tun_sa_in])
+ p.tun_protect.update_vpp_config(np3.tun_sa_out, [np3.tun_sa_in])
self.verify_tun_66(np3, np3, count=127)
self.verify_drop_tun_rx_66(np, count=127)
- self.assertEqual(p.tun_if.get_rx_stats(), 127*9)
- self.assertEqual(p.tun_if.get_tx_stats(), 127*8)
+ self.assertEqual(p.tun_if.get_rx_stats(), 127 * 9)
+ self.assertEqual(p.tun_if.get_tx_stats(), 127 * 8)
self.unconfig_sa(np)
# teardown
@@ -2376,10 +2643,8 @@ class TestIpsec6TunProtect(TemplateIpsec,
@tag_fixme_vpp_workers
-class TestIpsec6TunProtectTun(TemplateIpsec,
- TemplateIpsec6TunProtect,
- IpsecTun6):
- """ IPsec IPv6 Tunnel protect - tunnel mode"""
+class TestIpsec6TunProtectTun(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
+ """IPsec IPv6 Tunnel protect - tunnel mode"""
encryption_type = ESP
tun6_encrypt_node_name = "esp6-encrypt-tun"
@@ -2393,23 +2658,26 @@ class TestIpsec6TunProtectTun(TemplateIpsec,
def tearDown(self):
super(TestIpsec6TunProtectTun, self).tearDown()
- def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(IPv6(src=sw_intf.remote_ip6,
- dst=sw_intf.local_ip6) /
- IPv6(src=src, dst=dst) /
- UDP(sport=1166, dport=2233) /
- Raw(b'X' * payload_size))
- for i in range(count)]
-
- def gen_pkts6(self, p, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- IPv6(src=src, dst=dst) /
- UDP(sport=1166, dport=2233) /
- Raw(b'X' * payload_size)
- for i in range(count)]
+ def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / sa.encrypt(
+ IPv6(src=sw_intf.remote_ip6, dst=sw_intf.local_ip6)
+ / IPv6(src=src, dst=dst)
+ / UDP(sport=1166, dport=2233)
+ / Raw(b"X" * payload_size)
+ )
+ for i in range(count)
+ ]
+
+ def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / IPv6(src=src, dst=dst)
+ / UDP(sport=1166, dport=2233)
+ / Raw(b"X" * payload_size)
+ for i in range(count)
+ ]
def verify_decrypted6(self, p, rxs):
for rx in rxs:
@@ -2438,7 +2706,7 @@ class TestIpsec6TunProtectTun(TemplateIpsec,
raise
def test_tun_66(self):
- """IPSEC tunnel protect """
+ """IPSEC tunnel protect"""
p = self.ipv6_params
@@ -2453,7 +2721,7 @@ class TestIpsec6TunProtectTun(TemplateIpsec,
# rekey - create new SAs and update the tunnel protection
np = copy.copy(p)
- np.crypt_key = b'X' + p.crypt_key[1:]
+ np.crypt_key = b"X" + p.crypt_key[1:]
np.scapy_tun_spi += 100
np.scapy_tun_sa_id += 1
np.vpp_tun_spi += 100
@@ -2475,10 +2743,8 @@ class TestIpsec6TunProtectTun(TemplateIpsec,
self.unconfig_network(p)
-class TestIpsec6TunProtectTunDrop(TemplateIpsec,
- TemplateIpsec6TunProtect,
- IpsecTun6):
- """ IPsec IPv6 Tunnel protect - tunnel mode - drop"""
+class TestIpsec6TunProtectTunDrop(TemplateIpsec, TemplateIpsec6TunProtect, IpsecTun6):
+ """IPsec IPv6 Tunnel protect - tunnel mode - drop"""
encryption_type = ESP
tun6_encrypt_node_name = "esp6-encrypt-tun"
@@ -2492,20 +2758,22 @@ class TestIpsec6TunProtectTunDrop(TemplateIpsec,
def tearDown(self):
super(TestIpsec6TunProtectTunDrop, self).tearDown()
- def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
- payload_size=100):
+ def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
# the IP destination of the revelaed packet does not match
# that assigned to the tunnel
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(IPv6(src=sw_intf.remote_ip6,
- dst="5::5") /
- IPv6(src=src, dst=dst) /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size))
- for i in range(count)]
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / sa.encrypt(
+ IPv6(src=sw_intf.remote_ip6, dst="5::5")
+ / IPv6(src=src, dst=dst)
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ )
+ for i in range(count)
+ ]
def test_tun_drop_66(self):
- """IPSEC 6 tunnel protect bogus tunnel header """
+ """IPSEC 6 tunnel protect bogus tunnel header"""
p = self.ipv6_params
@@ -2513,10 +2781,14 @@ class TestIpsec6TunProtectTunDrop(TemplateIpsec,
self.config_sa_tun(p)
self.config_protect(p)
- tx = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if,
- src=p.remote_tun_if_host,
- dst=self.pg1.remote_ip6,
- count=63)
+ tx = self.gen_encrypt_pkts6(
+ p,
+ p.scapy_tun_sa,
+ self.tun_if,
+ src=p.remote_tun_if_host,
+ dst=self.pg1.remote_ip6,
+ count=63,
+ )
self.send_and_assert_no_replies(self.tun_if, tx)
self.unconfig_protect(p)
@@ -2525,7 +2797,7 @@ class TestIpsec6TunProtectTunDrop(TemplateIpsec,
class TemplateIpsecItf4(object):
- """ IPsec Interface IPv4 """
+ """IPsec Interface IPv4"""
encryption_type = ESP
tun4_encrypt_node_name = "esp4-encrypt-tun"
@@ -2535,30 +2807,41 @@ class TemplateIpsecItf4(object):
def config_sa_tun(self, p, src, dst):
config_tun_params(p, self.encryption_type, None, src, dst)
- p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol,
- src, dst,
- flags=p.flags)
+ p.tun_sa_out = VppIpsecSA(
+ self,
+ p.scapy_tun_sa_id,
+ p.scapy_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ src,
+ dst,
+ flags=p.flags,
+ )
p.tun_sa_out.add_vpp_config()
- p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol,
- dst, src,
- flags=p.flags)
+ p.tun_sa_in = VppIpsecSA(
+ self,
+ p.vpp_tun_sa_id,
+ p.vpp_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ dst,
+ src,
+ flags=p.flags,
+ )
p.tun_sa_in.add_vpp_config()
def config_protect(self, p):
- p.tun_protect = VppIpsecTunProtect(self,
- p.tun_if,
- p.tun_sa_out,
- [p.tun_sa_in])
+ p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
p.tun_protect.add_vpp_config()
- def config_network(self, p, instance=0xffffffff):
+ def config_network(self, p, instance=0xFFFFFFFF):
p.tun_if = VppIpsecInterface(self, instance=instance)
p.tun_if.add_vpp_config()
@@ -2566,14 +2849,23 @@ class TemplateIpsecItf4(object):
p.tun_if.config_ip4()
p.tun_if.config_ip6()
- p.route = VppIpRoute(self, p.remote_tun_if_host, 32,
- [VppRoutePath(p.tun_if.remote_ip4,
- 0xffffffff)])
+ p.route = VppIpRoute(
+ self,
+ p.remote_tun_if_host,
+ 32,
+ [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
+ )
p.route.add_vpp_config()
- r = VppIpRoute(self, p.remote_tun_if_host6, 128,
- [VppRoutePath(p.tun_if.remote_ip6,
- 0xffffffff,
- proto=DpoProto.DPO_PROTO_IP6)])
+ r = VppIpRoute(
+ self,
+ p.remote_tun_if_host6,
+ 128,
+ [
+ VppRoutePath(
+ p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
+ )
+ ],
+ )
r.add_vpp_config()
def unconfig_network(self, p):
@@ -2589,10 +2881,8 @@ class TemplateIpsecItf4(object):
@tag_fixme_vpp_workers
-class TestIpsecItf4(TemplateIpsec,
- TemplateIpsecItf4,
- IpsecTun4):
- """ IPsec Interface IPv4 """
+class TestIpsecItf4(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
+ """IPsec Interface IPv4"""
def setUp(self):
super(TestIpsecItf4, self).setUp()
@@ -2621,13 +2911,11 @@ class TestIpsecItf4(TemplateIpsec,
p = self.ipv4_params
self.config_network(p)
- config_tun_params(p, self.encryption_type, None,
- self.pg0.local_ip4,
- self.pg0.remote_ip4)
+ config_tun_params(
+ p, self.encryption_type, None, self.pg0.local_ip4, self.pg0.remote_ip4
+ )
self.verify_tun_dropped_44(p, count=n_pkts)
- self.config_sa_tun(p,
- self.pg0.local_ip4,
- self.pg0.remote_ip4)
+ self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
self.config_protect(p)
self.verify_tun_44(p, count=n_pkts)
@@ -2639,15 +2927,15 @@ class TestIpsecItf4(TemplateIpsec,
p.tun_if.admin_up()
self.verify_tun_44(p, count=n_pkts)
- self.assertEqual(p.tun_if.get_rx_stats(), 3*n_pkts)
- self.assertEqual(p.tun_if.get_tx_stats(), 2*n_pkts)
+ self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
+ self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
# it's a v6 packet when its encrypted
self.tun4_encrypt_node_name = "esp6-encrypt-tun"
self.verify_tun_64(p, count=n_pkts)
- self.assertEqual(p.tun_if.get_rx_stats(), 4*n_pkts)
- self.assertEqual(p.tun_if.get_tx_stats(), 3*n_pkts)
+ self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
+ self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
self.tun4_encrypt_node_name = "esp4-encrypt-tun"
@@ -2655,7 +2943,7 @@ class TestIpsecItf4(TemplateIpsec,
# rekey - create new SAs and update the tunnel protection
np = copy.copy(p)
- np.crypt_key = b'X' + p.crypt_key[1:]
+ np.crypt_key = b"X" + p.crypt_key[1:]
np.scapy_tun_spi += 100
np.scapy_tun_sa_id += 1
np.vpp_tun_spi += 100
@@ -2663,9 +2951,7 @@ class TestIpsecItf4(TemplateIpsec,
np.tun_if.local_spi = p.vpp_tun_spi
np.tun_if.remote_spi = p.scapy_tun_spi
- self.config_sa_tun(np,
- self.pg0.local_ip4,
- self.pg0.remote_ip4)
+ self.config_sa_tun(np, self.pg0.local_ip4, self.pg0.remote_ip4)
self.config_protect(np)
self.unconfig_sa(p)
@@ -2684,17 +2970,15 @@ class TestIpsecItf4(TemplateIpsec,
n_pkts = 127
p = copy.copy(self.ipv4_params)
- p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
- IPSEC_API_INTEG_ALG_NONE)
- p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
- IPSEC_API_CRYPTO_ALG_NONE)
+ p.auth_algo_vpp_id = VppEnum.vl_api_ipsec_integ_alg_t.IPSEC_API_INTEG_ALG_NONE
+ p.crypt_algo_vpp_id = (
+ VppEnum.vl_api_ipsec_crypto_alg_t.IPSEC_API_CRYPTO_ALG_NONE
+ )
p.crypt_algo = "NULL"
p.auth_algo = "NULL"
self.config_network(p)
- self.config_sa_tun(p,
- self.pg0.local_ip4,
- self.pg0.remote_ip4)
+ self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
self.config_protect(p)
self.logger.info(self.vapi.cli("sh ipsec sa"))
@@ -2711,18 +2995,23 @@ class TestIpsecItf4(TemplateIpsec,
p = self.ipv4_params
self.config_network(p)
- self.config_sa_tun(p,
- self.pg0.local_ip4,
- self.pg0.remote_ip4)
+ self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
self.config_protect(p)
action_tx = PolicerAction(
- VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
- 0)
- policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
- conform_action=action_tx,
- exceed_action=action_tx,
- violate_action=action_tx)
+ VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
+ )
+ policer = VppPolicer(
+ self,
+ "pol1",
+ 80,
+ 0,
+ 1000,
+ 0,
+ conform_action=action_tx,
+ exceed_action=action_tx,
+ violate_action=action_tx,
+ )
policer.add_vpp_config()
# Start policing on tun
@@ -2735,9 +3024,9 @@ class TestIpsecItf4(TemplateIpsec,
stats = policer.get_stats()
# Single rate, 2 colour policer - expect conform, violate but no exceed
- self.assertGreater(stats['conform_packets'], 0)
- self.assertEqual(stats['exceed_packets'], 0)
- self.assertGreater(stats['violate_packets'], 0)
+ self.assertGreater(stats["conform_packets"], 0)
+ self.assertEqual(stats["exceed_packets"], 0)
+ self.assertGreater(stats["violate_packets"], 0)
# Stop policing on tun
policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
@@ -2754,10 +3043,8 @@ class TestIpsecItf4(TemplateIpsec,
self.unconfig_network(p)
-class TestIpsecItf4MPLS(TemplateIpsec,
- TemplateIpsecItf4,
- IpsecTun4):
- """ IPsec Interface MPLSoIPv4 """
+class TestIpsecItf4MPLS(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
+ """IPsec Interface MPLSoIPv4"""
tun4_encrypt_node_name = "esp-mpls-encrypt-tun"
@@ -2769,14 +3056,17 @@ class TestIpsecItf4MPLS(TemplateIpsec,
def tearDown(self):
super(TestIpsecItf4MPLS, self).tearDown()
- def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(MPLS(label=44, ttl=3) /
- IP(src=src, dst=dst) /
- UDP(sport=1166, dport=2233) /
- Raw(b'X' * payload_size))
- for i in range(count)]
+ def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / sa.encrypt(
+ MPLS(label=44, ttl=3)
+ / IP(src=src, dst=dst)
+ / UDP(sport=1166, dport=2233)
+ / Raw(b"X" * payload_size)
+ )
+ for i in range(count)
+ ]
def verify_encrypted(self, p, sa, rxs):
for rx in rxs:
@@ -2807,18 +3097,19 @@ class TestIpsecItf4MPLS(TemplateIpsec,
self.config_network(p)
# deag MPLS routes from the tunnel
- r4 = VppMplsRoute(self, 44, 1,
- [VppRoutePath(
- self.pg1.remote_ip4,
- self.pg1.sw_if_index)]).add_vpp_config()
- p.route.modify([VppRoutePath(p.tun_if.remote_ip4,
- p.tun_if.sw_if_index,
- labels=[VppMplsLabel(44)])])
+ r4 = VppMplsRoute(
+ self, 44, 1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)]
+ ).add_vpp_config()
+ p.route.modify(
+ [
+ VppRoutePath(
+ p.tun_if.remote_ip4, p.tun_if.sw_if_index, labels=[VppMplsLabel(44)]
+ )
+ ]
+ )
p.tun_if.enable_mpls()
- self.config_sa_tun(p,
- self.pg0.local_ip4,
- self.pg0.remote_ip4)
+ self.config_sa_tun(p, self.pg0.local_ip4, self.pg0.remote_ip4)
self.config_protect(p)
self.verify_tun_44(p, count=n_pkts)
@@ -2831,7 +3122,7 @@ class TestIpsecItf4MPLS(TemplateIpsec,
class TemplateIpsecItf6(object):
- """ IPsec Interface IPv6 """
+ """IPsec Interface IPv6"""
encryption_type = ESP
tun6_encrypt_node_name = "esp6-encrypt-tun"
@@ -2841,34 +3132,45 @@ class TemplateIpsecItf6(object):
def config_sa_tun(self, p, src, dst):
config_tun_params(p, self.encryption_type, None, src, dst)
- if not hasattr(p, 'tun_flags'):
+ if not hasattr(p, "tun_flags"):
p.tun_flags = None
- if not hasattr(p, 'hop_limit'):
+ if not hasattr(p, "hop_limit"):
p.hop_limit = 255
- p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol,
- src, dst,
- flags=p.flags,
- tun_flags=p.tun_flags,
- hop_limit=p.hop_limit)
+ p.tun_sa_out = VppIpsecSA(
+ self,
+ p.scapy_tun_sa_id,
+ p.scapy_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ src,
+ dst,
+ flags=p.flags,
+ tun_flags=p.tun_flags,
+ hop_limit=p.hop_limit,
+ )
p.tun_sa_out.add_vpp_config()
- p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
- self.vpp_esp_protocol,
- dst, src,
- flags=p.flags)
+ p.tun_sa_in = VppIpsecSA(
+ self,
+ p.vpp_tun_sa_id,
+ p.vpp_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
+ self.vpp_esp_protocol,
+ dst,
+ src,
+ flags=p.flags,
+ )
p.tun_sa_in.add_vpp_config()
def config_protect(self, p):
- p.tun_protect = VppIpsecTunProtect(self,
- p.tun_if,
- p.tun_sa_out,
- [p.tun_sa_in])
+ p.tun_protect = VppIpsecTunProtect(self, p.tun_if, p.tun_sa_out, [p.tun_sa_in])
p.tun_protect.add_vpp_config()
def config_network(self, p):
@@ -2879,15 +3181,24 @@ class TemplateIpsecItf6(object):
p.tun_if.config_ip4()
p.tun_if.config_ip6()
- r = VppIpRoute(self, p.remote_tun_if_host4, 32,
- [VppRoutePath(p.tun_if.remote_ip4,
- 0xffffffff)])
+ r = VppIpRoute(
+ self,
+ p.remote_tun_if_host4,
+ 32,
+ [VppRoutePath(p.tun_if.remote_ip4, 0xFFFFFFFF)],
+ )
r.add_vpp_config()
- p.route = VppIpRoute(self, p.remote_tun_if_host, 128,
- [VppRoutePath(p.tun_if.remote_ip6,
- 0xffffffff,
- proto=DpoProto.DPO_PROTO_IP6)])
+ p.route = VppIpRoute(
+ self,
+ p.remote_tun_if_host,
+ 128,
+ [
+ VppRoutePath(
+ p.tun_if.remote_ip6, 0xFFFFFFFF, proto=DpoProto.DPO_PROTO_IP6
+ )
+ ],
+ )
p.route.add_vpp_config()
def unconfig_network(self, p):
@@ -2903,10 +3214,8 @@ class TemplateIpsecItf6(object):
@tag_fixme_vpp_workers
-class TestIpsecItf6(TemplateIpsec,
- TemplateIpsecItf6,
- IpsecTun6):
- """ IPsec Interface IPv6 """
+class TestIpsecItf6(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
+ """IPsec Interface IPv6"""
def setUp(self):
super(TestIpsecItf6, self).setUp()
@@ -2928,13 +3237,11 @@ class TestIpsecItf6(TemplateIpsec,
p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
self.config_network(p)
- config_tun_params(p, self.encryption_type, None,
- self.pg0.local_ip6,
- self.pg0.remote_ip6)
+ config_tun_params(
+ p, self.encryption_type, None, self.pg0.local_ip6, self.pg0.remote_ip6
+ )
self.verify_drop_tun_66(p, count=n_pkts)
- self.config_sa_tun(p,
- self.pg0.local_ip6,
- self.pg0.remote_ip6)
+ self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
self.config_protect(p)
self.verify_tun_66(p, count=n_pkts)
@@ -2946,15 +3253,15 @@ class TestIpsecItf6(TemplateIpsec,
p.tun_if.admin_up()
self.verify_tun_66(p, count=n_pkts)
- self.assertEqual(p.tun_if.get_rx_stats(), 3*n_pkts)
- self.assertEqual(p.tun_if.get_tx_stats(), 2*n_pkts)
+ self.assertEqual(p.tun_if.get_rx_stats(), 3 * n_pkts)
+ self.assertEqual(p.tun_if.get_tx_stats(), 2 * n_pkts)
# it's a v4 packet when its encrypted
self.tun6_encrypt_node_name = "esp4-encrypt-tun"
self.verify_tun_46(p, count=n_pkts)
- self.assertEqual(p.tun_if.get_rx_stats(), 4*n_pkts)
- self.assertEqual(p.tun_if.get_tx_stats(), 3*n_pkts)
+ self.assertEqual(p.tun_if.get_rx_stats(), 4 * n_pkts)
+ self.assertEqual(p.tun_if.get_tx_stats(), 3 * n_pkts)
self.tun6_encrypt_node_name = "esp6-encrypt-tun"
@@ -2962,7 +3269,7 @@ class TestIpsecItf6(TemplateIpsec,
# rekey - create new SAs and update the tunnel protection
np = copy.copy(p)
- np.crypt_key = b'X' + p.crypt_key[1:]
+ np.crypt_key = b"X" + p.crypt_key[1:]
np.scapy_tun_spi += 100
np.scapy_tun_sa_id += 1
np.vpp_tun_spi += 100
@@ -2971,14 +3278,12 @@ class TestIpsecItf6(TemplateIpsec,
np.tun_if.remote_spi = p.scapy_tun_spi
np.inner_hop_limit = 24
np.outer_hop_limit = 128
- np.inner_flow_label = 0xabcde
- np.outer_flow_label = 0xabcde
+ np.inner_flow_label = 0xABCDE
+ np.outer_flow_label = 0xABCDE
np.hop_limit = 128
np.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_FLOW_LABEL
- self.config_sa_tun(np,
- self.pg0.local_ip6,
- self.pg0.remote_ip6)
+ self.config_sa_tun(np, self.pg0.local_ip6, self.pg0.remote_ip6)
self.config_protect(np)
self.unconfig_sa(p)
@@ -3002,18 +3307,23 @@ class TestIpsecItf6(TemplateIpsec,
p.tun_flags = tf.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_HOP_LIMIT
self.config_network(p)
- self.config_sa_tun(p,
- self.pg0.local_ip6,
- self.pg0.remote_ip6)
+ self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
self.config_protect(p)
action_tx = PolicerAction(
- VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
- 0)
- policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
- conform_action=action_tx,
- exceed_action=action_tx,
- violate_action=action_tx)
+ VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
+ )
+ policer = VppPolicer(
+ self,
+ "pol1",
+ 80,
+ 0,
+ 1000,
+ 0,
+ conform_action=action_tx,
+ exceed_action=action_tx,
+ violate_action=action_tx,
+ )
policer.add_vpp_config()
# Start policing on tun
@@ -3026,9 +3336,9 @@ class TestIpsecItf6(TemplateIpsec,
stats = policer.get_stats()
# Single rate, 2 colour policer - expect conform, violate but no exceed
- self.assertGreater(stats['conform_packets'], 0)
- self.assertEqual(stats['exceed_packets'], 0)
- self.assertGreater(stats['violate_packets'], 0)
+ self.assertGreater(stats["conform_packets"], 0)
+ self.assertEqual(stats["exceed_packets"], 0)
+ self.assertGreater(stats["violate_packets"], 0)
# Stop policing on tun
policer.apply_vpp_config(p.tun_if.sw_if_index, Dir.RX, False)
@@ -3046,27 +3356,31 @@ class TestIpsecItf6(TemplateIpsec,
class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
- """ Ipsec P2MP ESP v4 tests """
+ """Ipsec P2MP ESP v4 tests"""
+
tun4_encrypt_node_name = "esp4-encrypt-tun"
tun4_decrypt_node_name = ["esp4-decrypt-tun", "esp4-decrypt-tun-post"]
encryption_type = ESP
- def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(IP(src=self.pg1.local_ip4,
- dst=self.pg1.remote_ip4) /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size))
- for i in range(count)]
-
- def gen_pkts(self, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- IP(src="1.1.1.1", dst=dst) /
- UDP(sport=1144, dport=2233) /
- Raw(b'X' * payload_size)
- for i in range(count)]
+ def gen_encrypt_pkts(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / sa.encrypt(
+ IP(src=self.pg1.local_ip4, dst=self.pg1.remote_ip4)
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ )
+ for i in range(count)
+ ]
+
+ def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / IP(src="1.1.1.1", dst=dst)
+ / UDP(sport=1144, dport=2233)
+ / Raw(b"X" * payload_size)
+ for i in range(count)
+ ]
def verify_decrypted(self, p, rxs):
for rx in rxs:
@@ -3076,8 +3390,9 @@ class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
def verify_encrypted(self, p, sa, rxs):
for rx in rxs:
try:
- self.assertEqual(rx[IP].tos,
- VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2)
+ self.assertEqual(
+ rx[IP].tos, VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF << 2
+ )
self.assertEqual(rx[IP].ttl, p.hop_limit)
pkt = sa.decrypt(rx[IP])
if not pkt.haslayer(IP):
@@ -3099,9 +3414,9 @@ class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
N_NHS = 16
self.tun_if = self.pg0
p = self.ipv4_params
- p.tun_if = VppIpsecInterface(self,
- mode=(VppEnum.vl_api_tunnel_mode_t.
- TUNNEL_API_MODE_MP))
+ p.tun_if = VppIpsecInterface(
+ self, mode=(VppEnum.vl_api_tunnel_mode_t.TUNNEL_API_MODE_MP)
+ )
p.tun_if.add_vpp_config()
p.tun_if.admin_up()
p.tun_if.config_ip4()
@@ -3111,10 +3426,7 @@ class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
self.pg0.generate_remote_hosts(N_NHS)
self.pg0.configure_ipv4_neighbors()
- r_all = AclRule(True,
- src_prefix="0.0.0.0/0",
- dst_prefix="0.0.0.0/0",
- proto=0)
+ r_all = AclRule(True, src_prefix="0.0.0.0/0", dst_prefix="0.0.0.0/0", proto=0)
a = VppAcl(self, [r_all]).add_vpp_config()
VppAclInterface(self, self.pg0.sw_if_index, [a]).add_vpp_config()
@@ -3136,27 +3448,37 @@ class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
p.scapy_tra_spi = p.scapy_tra_spi + ii
p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
p.vpp_tra_spi = p.vpp_tra_spi + ii
- p.hop_limit = ii+10
+ p.hop_limit = ii + 10
p.tun_sa_out = VppIpsecSA(
- self, p.scapy_tun_sa_id, p.scapy_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
+ self,
+ p.scapy_tun_sa_id,
+ p.scapy_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
self.vpp_esp_protocol,
self.pg0.local_ip4,
self.pg0.remote_hosts[ii].ip4,
dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
- hop_limit=p.hop_limit)
+ hop_limit=p.hop_limit,
+ )
p.tun_sa_out.add_vpp_config()
p.tun_sa_in = VppIpsecSA(
- self, p.vpp_tun_sa_id, p.vpp_tun_spi,
- p.auth_algo_vpp_id, p.auth_key,
- p.crypt_algo_vpp_id, p.crypt_key,
+ self,
+ p.vpp_tun_sa_id,
+ p.vpp_tun_spi,
+ p.auth_algo_vpp_id,
+ p.auth_key,
+ p.crypt_algo_vpp_id,
+ p.crypt_key,
self.vpp_esp_protocol,
self.pg0.remote_hosts[ii].ip4,
self.pg0.local_ip4,
dscp=VppEnum.vl_api_ip_dscp_t.IP_API_DSCP_EF,
- hop_limit=p.hop_limit)
+ hop_limit=p.hop_limit,
+ )
p.tun_sa_in.add_vpp_config()
p.tun_protect = VppIpsecTunProtect(
@@ -3164,17 +3486,24 @@ class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
p.tun_if,
p.tun_sa_out,
[p.tun_sa_in],
- nh=p.tun_if.remote_hosts[ii].ip4)
+ nh=p.tun_if.remote_hosts[ii].ip4,
+ )
p.tun_protect.add_vpp_config()
- config_tun_params(p, self.encryption_type, None,
- self.pg0.local_ip4,
- self.pg0.remote_hosts[ii].ip4)
+ config_tun_params(
+ p,
+ self.encryption_type,
+ None,
+ self.pg0.local_ip4,
+ self.pg0.remote_hosts[ii].ip4,
+ )
self.multi_params.append(p)
p.via_tun_route = VppIpRoute(
- self, p.remote_tun_if_host, 32,
- [VppRoutePath(p.tun_if.remote_hosts[ii].ip4,
- p.tun_if.sw_if_index)]).add_vpp_config()
+ self,
+ p.remote_tun_if_host,
+ 32,
+ [VppRoutePath(p.tun_if.remote_hosts[ii].ip4, p.tun_if.sw_if_index)],
+ ).add_vpp_config()
p.tun_dst = self.pg0.remote_hosts[ii].ip4
@@ -3205,10 +3534,8 @@ class TestIpsecMIfEsp4(TemplateIpsec, IpsecTun4):
self.verify_tun_44(p, count=N_PKTS)
-class TestIpsecItf6MPLS(TemplateIpsec,
- TemplateIpsecItf6,
- IpsecTun6):
- """ IPsec Interface MPLSoIPv6 """
+class TestIpsecItf6MPLS(TemplateIpsec, TemplateIpsecItf6, IpsecTun6):
+ """IPsec Interface MPLSoIPv6"""
tun6_encrypt_node_name = "esp-mpls-encrypt-tun"
@@ -3220,14 +3547,17 @@ class TestIpsecItf6MPLS(TemplateIpsec,
def tearDown(self):
super(TestIpsecItf6MPLS, self).tearDown()
- def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1,
- payload_size=100):
- return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
- sa.encrypt(MPLS(label=66, ttl=3) /
- IPv6(src=src, dst=dst) /
- UDP(sport=1166, dport=2233) /
- Raw(b'X' * payload_size))
- for i in range(count)]
+ def gen_encrypt_pkts6(self, p, sa, sw_intf, src, dst, count=1, payload_size=100):
+ return [
+ Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac)
+ / sa.encrypt(
+ MPLS(label=66, ttl=3)
+ / IPv6(src=src, dst=dst)
+ / UDP(sport=1166, dport=2233)
+ / Raw(b"X" * payload_size)
+ )
+ for i in range(count)
+ ]
def verify_encrypted6(self, p, sa, rxs):
for rx in rxs:
@@ -3258,19 +3588,23 @@ class TestIpsecItf6MPLS(TemplateIpsec,
self.config_network(p)
# deag MPLS routes from the tunnel
- r6 = VppMplsRoute(self, 66, 1,
- [VppRoutePath(
- self.pg1.remote_ip6,
- self.pg1.sw_if_index)],
- eos_proto=f.FIB_PATH_NH_PROTO_IP6).add_vpp_config()
- p.route.modify([VppRoutePath(p.tun_if.remote_ip6,
- p.tun_if.sw_if_index,
- labels=[VppMplsLabel(66)])])
+ r6 = VppMplsRoute(
+ self,
+ 66,
+ 1,
+ [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)],
+ eos_proto=f.FIB_PATH_NH_PROTO_IP6,
+ ).add_vpp_config()
+ p.route.modify(
+ [
+ VppRoutePath(
+ p.tun_if.remote_ip6, p.tun_if.sw_if_index, labels=[VppMplsLabel(66)]
+ )
+ ]
+ )
p.tun_if.enable_mpls()
- self.config_sa_tun(p,
- self.pg0.local_ip6,
- self.pg0.remote_ip6)
+ self.config_sa_tun(p, self.pg0.local_ip6, self.pg0.remote_ip6)
self.config_protect(p)
self.verify_tun_66(p, count=n_pkts)
@@ -3282,5 +3616,5 @@ class TestIpsecItf6MPLS(TemplateIpsec,
self.unconfig_network(p)
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)