summaryrefslogtreecommitdiffstats
path: root/test/test_ipsec_nat.py
diff options
context:
space:
mode:
authorKlement Sekera <klement.sekera@gmail.com>2022-04-26 19:02:15 +0200
committerOle Tr�an <otroan@employees.org>2022-05-10 18:52:08 +0000
commitd9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch)
tree4f786cfd8ebc2443cb11e11b74c8657204068898 /test/test_ipsec_nat.py
parentf90348bcb4afd0af2611cefc43b17ef3042b511c (diff)
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera <klement.sekera@gmail.com> Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/test_ipsec_nat.py')
-rw-r--r--test/test_ipsec_nat.py334
1 files changed, 204 insertions, 130 deletions
diff --git a/test/test_ipsec_nat.py b/test/test_ipsec_nat.py
index b7ccb2befdc..64a2725dda6 100644
--- a/test/test_ipsec_nat.py
+++ b/test/test_ipsec_nat.py
@@ -9,15 +9,14 @@ from scapy.layers.ipsec import SecurityAssociation, ESP
from util import ppp, ppc
from template_ipsec import TemplateIpsec
-from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
- VppIpsecSpdItfBinding
+from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSpdItfBinding
from vpp_ip_route import VppIpRoute, VppRoutePath
from vpp_ip import DpoProto
from vpp_papi import VppEnum
class IPSecNATTestCase(TemplateIpsec):
- """ IPSec/NAT
+ """IPSec/NAT
TUNNEL MODE::
@@ -53,18 +52,19 @@ class IPSecNATTestCase(TemplateIpsec):
self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
self.tun_spd.add_vpp_config()
- VppIpsecSpdItfBinding(self, self.tun_spd,
- self.tun_if).add_vpp_config()
+ VppIpsecSpdItfBinding(self, self.tun_spd, self.tun_if).add_vpp_config()
p = self.ipv4_params
self.config_esp_tun(p)
self.logger.info(self.vapi.ppcli("show ipsec all"))
d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
- VppIpRoute(self, p.remote_tun_if_host, p.addr_len,
- [VppRoutePath(self.tun_if.remote_addr[p.addr_type],
- 0xffffffff,
- proto=d)]).add_vpp_config()
+ VppIpRoute(
+ self,
+ p.remote_tun_if_host,
+ p.addr_len,
+ [VppRoutePath(self.tun_if.remote_addr[p.addr_type], 0xFFFFFFFF, proto=d)],
+ ).add_vpp_config()
def tearDown(self):
super(IPSecNATTestCase, self).tearDown()
@@ -72,65 +72,84 @@ class IPSecNATTestCase(TemplateIpsec):
def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip):
return [
# TCP
- Ether(src=src_mac, dst=dst_mac) /
- IP(src=src_ip, dst=dst_ip) /
- TCP(sport=self.tcp_port_in, dport=20),
+ Ether(src=src_mac, dst=dst_mac)
+ / IP(src=src_ip, dst=dst_ip)
+ / TCP(sport=self.tcp_port_in, dport=20),
# UDP
- Ether(src=src_mac, dst=dst_mac) /
- IP(src=src_ip, dst=dst_ip) /
- UDP(sport=self.udp_port_in, dport=20),
+ Ether(src=src_mac, dst=dst_mac)
+ / IP(src=src_ip, dst=dst_ip)
+ / UDP(sport=self.udp_port_in, dport=20),
# ICMP
- Ether(src=src_mac, dst=dst_mac) /
- IP(src=src_ip, dst=dst_ip) /
- ICMP(id=self.icmp_id_in, type='echo-request')
+ Ether(src=src_mac, dst=dst_mac)
+ / IP(src=src_ip, dst=dst_ip)
+ / ICMP(id=self.icmp_id_in, type="echo-request"),
]
def create_stream_encrypted(self, src_mac, dst_mac, src_ip, dst_ip, sa):
return [
# TCP
- Ether(src=src_mac, dst=dst_mac) /
- sa.encrypt(IP(src=src_ip, dst=dst_ip) /
- TCP(dport=self.tcp_port_out, sport=20)),
+ Ether(src=src_mac, dst=dst_mac)
+ / sa.encrypt(
+ IP(src=src_ip, dst=dst_ip) / TCP(dport=self.tcp_port_out, sport=20)
+ ),
# UDP
- Ether(src=src_mac, dst=dst_mac) /
- sa.encrypt(IP(src=src_ip, dst=dst_ip) /
- UDP(dport=self.udp_port_out, sport=20)),
+ Ether(src=src_mac, dst=dst_mac)
+ / sa.encrypt(
+ IP(src=src_ip, dst=dst_ip) / UDP(dport=self.udp_port_out, sport=20)
+ ),
# ICMP
- Ether(src=src_mac, dst=dst_mac) /
- sa.encrypt(IP(src=src_ip, dst=dst_ip) /
- ICMP(id=self.icmp_id_out, type='echo-request'))
+ Ether(src=src_mac, dst=dst_mac)
+ / sa.encrypt(
+ IP(src=src_ip, dst=dst_ip)
+ / ICMP(id=self.icmp_id_out, type="echo-request")
+ ),
]
def verify_capture_plain(self, capture):
for packet in capture:
try:
self.assert_packet_checksums_valid(packet)
- self.assert_equal(packet[IP].src, self.tun_if.remote_ip4,
- "decrypted packet source address")
- self.assert_equal(packet[IP].dst, self.pg1.remote_ip4,
- "decrypted packet destination address")
+ self.assert_equal(
+ packet[IP].src,
+ self.tun_if.remote_ip4,
+ "decrypted packet source address",
+ )
+ self.assert_equal(
+ packet[IP].dst,
+ self.pg1.remote_ip4,
+ "decrypted packet destination address",
+ )
if packet.haslayer(TCP):
self.assertFalse(
packet.haslayer(UDP),
- "unexpected UDP header in decrypted packet")
- self.assert_equal(packet[TCP].dport, self.tcp_port_in,
- "decrypted packet TCP destination port")
+ "unexpected UDP header in decrypted packet",
+ )
+ self.assert_equal(
+ packet[TCP].dport,
+ self.tcp_port_in,
+ "decrypted packet TCP destination port",
+ )
elif packet.haslayer(UDP):
if packet[UDP].payload:
self.assertFalse(
packet[UDP][1].haslayer(UDP),
- "unexpected UDP header in decrypted packet")
- self.assert_equal(packet[UDP].dport, self.udp_port_in,
- "decrypted packet UDP destination port")
+ "unexpected UDP header in decrypted packet",
+ )
+ self.assert_equal(
+ packet[UDP].dport,
+ self.udp_port_in,
+ "decrypted packet UDP destination port",
+ )
else:
self.assertFalse(
packet.haslayer(UDP),
- "unexpected UDP header in decrypted packet")
- self.assert_equal(packet[ICMP].id, self.icmp_id_in,
- "decrypted packet ICMP ID")
+ "unexpected UDP header in decrypted packet",
+ )
+ self.assert_equal(
+ packet[ICMP].id, self.icmp_id_in, "decrypted packet ICMP ID"
+ )
except Exception:
- self.logger.error(
- ppp("Unexpected or invalid plain packet:", packet))
+ self.logger.error(ppp("Unexpected or invalid plain packet:", packet))
raise
def verify_capture_encrypted(self, capture, sa):
@@ -139,19 +158,25 @@ class IPSecNATTestCase(TemplateIpsec):
copy = packet.__class__(scapy.compat.raw(packet))
del copy[UDP].len
copy = packet.__class__(scapy.compat.raw(copy))
- self.assert_equal(packet[UDP].len, copy[UDP].len,
- "UDP header length")
+ self.assert_equal(packet[UDP].len, copy[UDP].len, "UDP header length")
self.assert_packet_checksums_valid(packet)
self.assertIn(ESP, packet[IP])
decrypt_pkt = sa.decrypt(packet[IP])
self.assert_packet_checksums_valid(decrypt_pkt)
- self.assert_equal(decrypt_pkt[IP].src, self.pg1.remote_ip4,
- "encrypted packet source address")
- self.assert_equal(decrypt_pkt[IP].dst, self.tun_if.remote_ip4,
- "encrypted packet destination address")
+ self.assert_equal(
+ decrypt_pkt[IP].src,
+ self.pg1.remote_ip4,
+ "encrypted packet source address",
+ )
+ self.assert_equal(
+ decrypt_pkt[IP].dst,
+ self.tun_if.remote_ip4,
+ "encrypted packet destination address",
+ )
except Exception:
self.logger.error(
- ppp("Unexpected or invalid encrypted packet:", packet))
+ ppp("Unexpected or invalid encrypted packet:", packet)
+ )
raise
def config_esp_tun(self, params):
@@ -166,104 +191,153 @@ class IPSecNATTestCase(TemplateIpsec):
crypt_key = params.crypt_key
addr_any = params.addr_any
addr_bcast = params.addr_bcast
- flags = (VppEnum.vl_api_ipsec_sad_flags_t.
- IPSEC_API_SAD_FLAG_UDP_ENCAP)
+ flags = VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_UDP_ENCAP
e = VppEnum.vl_api_ipsec_spd_action_t
- VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi,
- auth_algo_vpp_id, auth_key,
- crypt_algo_vpp_id, crypt_key,
- self.vpp_esp_protocol,
- self.pg1.remote_addr[addr_type],
- self.tun_if.remote_addr[addr_type],
- flags=flags).add_vpp_config()
- VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi,
- auth_algo_vpp_id, auth_key,
- crypt_algo_vpp_id, crypt_key,
- self.vpp_esp_protocol,
- self.tun_if.remote_addr[addr_type],
- self.pg1.remote_addr[addr_type],
- flags=flags).add_vpp_config()
+ VppIpsecSA(
+ self,
+ scapy_tun_sa_id,
+ scapy_tun_spi,
+ auth_algo_vpp_id,
+ auth_key,
+ crypt_algo_vpp_id,
+ crypt_key,
+ self.vpp_esp_protocol,
+ self.pg1.remote_addr[addr_type],
+ self.tun_if.remote_addr[addr_type],
+ flags=flags,
+ ).add_vpp_config()
+ VppIpsecSA(
+ self,
+ vpp_tun_sa_id,
+ vpp_tun_spi,
+ auth_algo_vpp_id,
+ auth_key,
+ crypt_algo_vpp_id,
+ crypt_key,
+ self.vpp_esp_protocol,
+ self.tun_if.remote_addr[addr_type],
+ self.pg1.remote_addr[addr_type],
+ flags=flags,
+ ).add_vpp_config()
- VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
- addr_any, addr_bcast,
- addr_any, addr_bcast,
- socket.IPPROTO_ESP).add_vpp_config()
- VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
- addr_any, addr_bcast,
- addr_any, addr_bcast,
- socket.IPPROTO_ESP,
- is_outbound=0).add_vpp_config()
- VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
- addr_any, addr_bcast,
- addr_any, addr_bcast,
- socket.IPPROTO_UDP,
- remote_port_start=4500,
- remote_port_stop=4500).add_vpp_config()
- VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
- addr_any, addr_bcast,
- addr_any, addr_bcast,
- socket.IPPROTO_UDP,
- remote_port_start=4500,
- remote_port_stop=4500,
- is_outbound=0).add_vpp_config()
- VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
- self.tun_if.remote_addr[addr_type],
- self.tun_if.remote_addr[addr_type],
- self.pg1.remote_addr[addr_type],
- self.pg1.remote_addr[addr_type],
- 0, priority=10,
- policy=e.IPSEC_API_SPD_ACTION_PROTECT,
- is_outbound=0).add_vpp_config()
- VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
- self.pg1.remote_addr[addr_type],
- self.pg1.remote_addr[addr_type],
- self.tun_if.remote_addr[addr_type],
- self.tun_if.remote_addr[addr_type],
- 0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
- priority=10).add_vpp_config()
+ VppIpsecSpdEntry(
+ self,
+ self.tun_spd,
+ scapy_tun_sa_id,
+ addr_any,
+ addr_bcast,
+ addr_any,
+ addr_bcast,
+ socket.IPPROTO_ESP,
+ ).add_vpp_config()
+ VppIpsecSpdEntry(
+ self,
+ self.tun_spd,
+ scapy_tun_sa_id,
+ addr_any,
+ addr_bcast,
+ addr_any,
+ addr_bcast,
+ socket.IPPROTO_ESP,
+ is_outbound=0,
+ ).add_vpp_config()
+ VppIpsecSpdEntry(
+ self,
+ self.tun_spd,
+ scapy_tun_sa_id,
+ addr_any,
+ addr_bcast,
+ addr_any,
+ addr_bcast,
+ socket.IPPROTO_UDP,
+ remote_port_start=4500,
+ remote_port_stop=4500,
+ ).add_vpp_config()
+ VppIpsecSpdEntry(
+ self,
+ self.tun_spd,
+ scapy_tun_sa_id,
+ addr_any,
+ addr_bcast,
+ addr_any,
+ addr_bcast,
+ socket.IPPROTO_UDP,
+ remote_port_start=4500,
+ remote_port_stop=4500,
+ is_outbound=0,
+ ).add_vpp_config()
+ VppIpsecSpdEntry(
+ self,
+ self.tun_spd,
+ vpp_tun_sa_id,
+ self.tun_if.remote_addr[addr_type],
+ self.tun_if.remote_addr[addr_type],
+ self.pg1.remote_addr[addr_type],
+ self.pg1.remote_addr[addr_type],
+ 0,
+ priority=10,
+ policy=e.IPSEC_API_SPD_ACTION_PROTECT,
+ is_outbound=0,
+ ).add_vpp_config()
+ VppIpsecSpdEntry(
+ self,
+ self.tun_spd,
+ scapy_tun_sa_id,
+ self.pg1.remote_addr[addr_type],
+ self.pg1.remote_addr[addr_type],
+ self.tun_if.remote_addr[addr_type],
+ self.tun_if.remote_addr[addr_type],
+ 0,
+ policy=e.IPSEC_API_SPD_ACTION_PROTECT,
+ priority=10,
+ ).add_vpp_config()
def test_ipsec_nat_tun(self):
- """ IPSec/NAT tunnel test case """
+ """IPSec/NAT tunnel test case"""
p = self.ipv4_params
- scapy_tun_sa = SecurityAssociation(ESP, spi=p.scapy_tun_spi,
- crypt_algo=p.crypt_algo,
- crypt_key=p.crypt_key,
- auth_algo=p.auth_algo,
- auth_key=p.auth_key,
- tunnel_header=IP(
- src=self.pg1.remote_ip4,
- dst=self.tun_if.remote_ip4),
- nat_t_header=UDP(
- sport=4500,
- dport=4500))
+ scapy_tun_sa = SecurityAssociation(
+ ESP,
+ spi=p.scapy_tun_spi,
+ crypt_algo=p.crypt_algo,
+ crypt_key=p.crypt_key,
+ auth_algo=p.auth_algo,
+ auth_key=p.auth_key,
+ tunnel_header=IP(src=self.pg1.remote_ip4, dst=self.tun_if.remote_ip4),
+ nat_t_header=UDP(sport=4500, dport=4500),
+ )
# in2out - from private network to public
pkts = self.create_stream_plain(
- self.pg1.remote_mac, self.pg1.local_mac,
- self.pg1.remote_ip4, self.tun_if.remote_ip4)
+ self.pg1.remote_mac,
+ self.pg1.local_mac,
+ self.pg1.remote_ip4,
+ self.tun_if.remote_ip4,
+ )
self.pg1.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
capture = self.tun_if.get_capture(len(pkts))
self.verify_capture_encrypted(capture, scapy_tun_sa)
- vpp_tun_sa = SecurityAssociation(ESP,
- spi=p.vpp_tun_spi,
- crypt_algo=p.crypt_algo,
- crypt_key=p.crypt_key,
- auth_algo=p.auth_algo,
- auth_key=p.auth_key,
- tunnel_header=IP(
- src=self.tun_if.remote_ip4,
- dst=self.pg1.remote_ip4),
- nat_t_header=UDP(
- sport=4500,
- dport=4500))
+ vpp_tun_sa = SecurityAssociation(
+ ESP,
+ spi=p.vpp_tun_spi,
+ crypt_algo=p.crypt_algo,
+ crypt_key=p.crypt_key,
+ auth_algo=p.auth_algo,
+ auth_key=p.auth_key,
+ tunnel_header=IP(src=self.tun_if.remote_ip4, dst=self.pg1.remote_ip4),
+ nat_t_header=UDP(sport=4500, dport=4500),
+ )
# out2in - from public network to private
pkts = self.create_stream_encrypted(
- self.tun_if.remote_mac, self.tun_if.local_mac,
- self.tun_if.remote_ip4, self.pg1.remote_ip4, vpp_tun_sa)
+ self.tun_if.remote_mac,
+ self.tun_if.local_mac,
+ self.tun_if.remote_ip4,
+ self.pg1.remote_ip4,
+ vpp_tun_sa,
+ )
self.logger.info(ppc("Sending packets:", pkts))
self.tun_if.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)