summaryrefslogtreecommitdiffstats
path: root/test/template_ipsec.py
diff options
context:
space:
mode:
authorvinay Tripathi <vinayx.tripathi@intel.com>2023-10-20 05:20:47 +0000
committerFan Zhang <fanzhang.oss@gmail.com>2023-10-31 10:33:13 +0000
commitbc5f5305997e3b8f624b64bcc2d68687f31d515a (patch)
treed7fe0ff7b8881d26936568696353a9ff98faecfc /test/template_ipsec.py
parent75069cee95039172e8d2a3af58e4a8ed0a411d3a (diff)
ipsec: modify IPsec related tests to send and verify UDP-encapsulated ESP traffics
In this patch, IPsec related test files have been modified to send UDP-encapsulated ESP packets,and validate against Inbound and Outbound policies that are configured with Bypass, Discard and Protect action. Type: test Change-Id: I4b8da18270fd177868223bfe1389dc9c50e86cc5 Signed-off-by: vinay Tripathi <vinayx.tripathi@intel.com>
Diffstat (limited to 'test/template_ipsec.py')
-rw-r--r--test/template_ipsec.py164
1 files changed, 157 insertions, 7 deletions
diff --git a/test/template_ipsec.py b/test/template_ipsec.py
index c438fbb7680..4d4e4c60997 100644
--- a/test/template_ipsec.py
+++ b/test/template_ipsec.py
@@ -2806,20 +2806,48 @@ class IPSecIPv4Fwd(VppTestCase):
self.logger.info(self.vapi.ppcli("show ipsec all"))
return spdEntry
- def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
+ def create_stream(
+ self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP"
+ ):
packets = []
+ # create SA
+ sa = SecurityAssociation(
+ ESP,
+ spi=1000,
+ crypt_algo="AES-CBC",
+ crypt_key=b"JPjyOWBeVEQiMe7h",
+ auth_algo="HMAC-SHA1-96",
+ auth_key=b"C91KUR9GYMm5GfkEvNjX",
+ tunnel_header=IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
+ nat_t_header=UDP(sport=src_prt, dport=dst_prt),
+ )
for i in range(pkt_count):
# create packet info stored in the test case instance
info = self.create_packet_info(src_if, dst_if)
# convert the info into packet payload
payload = self.info_to_payload(info)
# create the packet itself
- p = (
- Ether(dst=src_if.local_mac, src=src_if.remote_mac)
- / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
- / UDP(sport=src_prt, dport=dst_prt)
- / Raw(payload)
- )
+ p = []
+ if proto == "UDP-ESP":
+ p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) / sa.encrypt(
+ IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+ / UDP(sport=src_prt, dport=dst_prt)
+ / Raw(payload)
+ )
+ elif proto == "UDP":
+ p = (
+ Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+ / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+ / UDP(sport=src_prt, dport=dst_prt)
+ / Raw(payload)
+ )
+ elif proto == "TCP":
+ p = (
+ Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+ / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+ / TCP(sport=src_prt, dport=dst_prt)
+ / Raw(payload)
+ )
# store a copy of the packet in the packet info
info.data = p.copy()
# append the packet to the list
@@ -2873,6 +2901,50 @@ class IPSecIPv4Fwd(VppTestCase):
self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
self.assert_equal(pkt_count, matched_pkts)
+ # Method verify_l3_l4_capture() will verify network and transport layer
+ # fields of the packet sa.encrypt() gives interface number garbadge.
+ # thus interface validation get failed (scapy bug?). However our intent
+ # is to verify IP layer and above and that is covered.
+
+ def verify_l3_l4_capture(
+ self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+ ):
+ for packet in capture:
+ try:
+ self.assert_packet_checksums_valid(packet)
+ self.assert_equal(
+ packet[IP].src,
+ src_if.remote_ip4,
+ "decrypted packet source address",
+ )
+ self.assert_equal(
+ packet[IP].dst,
+ dst_if.remote_ip4,
+ "decrypted packet destination address",
+ )
+ if packet.haslayer(TCP):
+ self.assertFalse(
+ packet.haslayer(UDP),
+ "unexpected UDP header in decrypted packet",
+ )
+ elif packet.haslayer(UDP):
+ if packet[UDP].payload:
+ self.assertFalse(
+ packet[UDP][1].haslayer(UDP),
+ "unexpected UDP header in decrypted packet",
+ )
+ else:
+ self.assertFalse(
+ packet.haslayer(UDP),
+ "unexpected UDP header in decrypted packet",
+ )
+ self.assert_equal(
+ packet[ICMP].id, self.icmp_id_in, "decrypted packet ICMP ID"
+ )
+ except Exception:
+ self.logger.error(ppp("Unexpected or invalid plain packet:", packet))
+ raise
+
class SpdFlowCacheTemplate(IPSecIPv4Fwd):
@classmethod
@@ -2948,6 +3020,84 @@ class SpdFlowCacheTemplate(IPSecIPv4Fwd):
self.logger.info("\ncrc32 NOT supported:\n" + cpu_info)
return False
+ def create_stream(
+ cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
+ ):
+ packets = []
+ packets = super(SpdFlowCacheTemplate, cls).create_stream(
+ src_if, dst_if, pkt_count, src_prt, dst_prt, proto
+ )
+ return packets
+
+ def verify_capture(
+ self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+ ):
+ super(SpdFlowCacheTemplate, self).verify_l3_l4_capture(
+ src_if, dst_if, capture, tcp_port_in, udp_port_in
+ )
+
+
+class SpdFastPathTemplate(IPSecIPv4Fwd):
+ @classmethod
+ def setUpConstants(cls):
+ super(SpdFastPathTemplate, cls).setUpConstants()
+ # Override this method with required cmdline parameters e.g.
+ # cls.vpp_cmdline.extend(["ipsec", "{",
+ # "ipv4-outbound-spd-flow-cache on",
+ # "}"])
+ # cls.logger.info("VPP modified cmdline is %s" % " "
+ # .join(cls.vpp_cmdline))
+
+ def setUp(self):
+ super(SpdFastPathTemplate, self).setUp()
+
+ def tearDown(self):
+ super(SpdFastPathTemplate, self).tearDown()
+
+ def create_stream(
+ cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
+ ):
+ packets = []
+ packets = super(SpdFastPathTemplate, cls).create_stream(
+ src_if, dst_if, pkt_count, src_prt, dst_prt, proto
+ )
+ return packets
+
+ def verify_capture(
+ self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+ ):
+ super(SpdFastPathTemplate, self).verify_l3_l4_capture(
+ src_if, dst_if, capture, tcp_port_in, udp_port_in
+ )
+
+
+class IpsecDefaultTemplate(IPSecIPv4Fwd):
+ @classmethod
+ def setUpConstants(cls):
+ super(IpsecDefaultTemplate, cls).setUpConstants()
+
+ def setUp(self):
+ super(IpsecDefaultTemplate, self).setUp()
+
+ def tearDown(self):
+ super(IpsecDefaultTemplate, self).tearDown()
+
+ def create_stream(
+ cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
+ ):
+ packets = []
+ packets = super(IpsecDefaultTemplate, cls).create_stream(
+ src_if, dst_if, pkt_count, src_prt, dst_prt, proto
+ )
+ return packets
+
+ def verify_capture(
+ self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+ ):
+ super(IpsecDefaultTemplate, self).verify_l3_l4_capture(
+ src_if, dst_if, capture, tcp_port_in, udp_port_in
+ )
+
class IPSecIPv6Fwd(VppTestCase):
"""Test IPSec by capturing and verifying IPv6 forwarded pkts"""