summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvinay Tripathi <vinayx.tripathi@intel.com>2023-10-20 05:20:47 +0000
committerFan Zhang <fanzhang.oss@gmail.com>2023-10-31 10:33:13 +0000
commitbc5f5305997e3b8f624b64bcc2d68687f31d515a (patch)
treed7fe0ff7b8881d26936568696353a9ff98faecfc
parent75069cee95039172e8d2a3af58e4a8ed0a411d3a (diff)
ipsec: modify IPsec related tests to send and verify UDP-encapsulated ESP traffics
In this patch, IPsec related test files have been modified to send UDP-encapsulated ESP packets,and validate against Inbound and Outbound policies that are configured with Bypass, Discard and Protect action. Type: test Change-Id: I4b8da18270fd177868223bfe1389dc9c50e86cc5 Signed-off-by: vinay Tripathi <vinayx.tripathi@intel.com>
-rw-r--r--test/asf/test_ipsec_default.py6
-rw-r--r--test/template_ipsec.py164
-rw-r--r--test/test_ipsec_spd_fp_input.py4
3 files changed, 162 insertions, 12 deletions
diff --git a/test/asf/test_ipsec_default.py b/test/asf/test_ipsec_default.py
index e97e2ef6b50..2fefb771c48 100644
--- a/test/asf/test_ipsec_default.py
+++ b/test/asf/test_ipsec_default.py
@@ -3,7 +3,7 @@ import unittest
from util import ppp
from asfframework import VppTestRunner
-from template_ipsec import IPSecIPv4Fwd
+from template_ipsec import IpsecDefaultTemplate
"""
When an IPSec SPD is configured on an interface, any inbound packets
@@ -32,7 +32,7 @@ packets are dropped as expected.
"""
-class IPSecInboundDefaultDrop(IPSecIPv4Fwd):
+class IPSecInboundDefaultDrop(IpsecDefaultTemplate):
"""IPSec: inbound packets drop by default with no matching rule"""
def test_ipsec_inbound_default_drop(self):
@@ -114,7 +114,7 @@ class IPSecInboundDefaultDrop(IPSecIPv4Fwd):
self.verify_policy_match(pkt_count, inbound_policy)
-class IPSecOutboundDefaultDrop(IPSecIPv4Fwd):
+class IPSecOutboundDefaultDrop(IpsecDefaultTemplate):
"""IPSec: outbound packets drop by default with no matching rule"""
def test_ipsec_inbound_default_drop(self):
diff --git a/test/template_ipsec.py b/test/template_ipsec.py
index c438fbb7680..4d4e4c60997 100644
--- a/test/template_ipsec.py
+++ b/test/template_ipsec.py
@@ -2806,20 +2806,48 @@ class IPSecIPv4Fwd(VppTestCase):
self.logger.info(self.vapi.ppcli("show ipsec all"))
return spdEntry
- def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
+ def create_stream(
+ self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP"
+ ):
packets = []
+ # create SA
+ sa = SecurityAssociation(
+ ESP,
+ spi=1000,
+ crypt_algo="AES-CBC",
+ crypt_key=b"JPjyOWBeVEQiMe7h",
+ auth_algo="HMAC-SHA1-96",
+ auth_key=b"C91KUR9GYMm5GfkEvNjX",
+ tunnel_header=IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4),
+ nat_t_header=UDP(sport=src_prt, dport=dst_prt),
+ )
for i in range(pkt_count):
# create packet info stored in the test case instance
info = self.create_packet_info(src_if, dst_if)
# convert the info into packet payload
payload = self.info_to_payload(info)
# create the packet itself
- p = (
- Ether(dst=src_if.local_mac, src=src_if.remote_mac)
- / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
- / UDP(sport=src_prt, dport=dst_prt)
- / Raw(payload)
- )
+ p = []
+ if proto == "UDP-ESP":
+ p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) / sa.encrypt(
+ IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+ / UDP(sport=src_prt, dport=dst_prt)
+ / Raw(payload)
+ )
+ elif proto == "UDP":
+ p = (
+ Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+ / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+ / UDP(sport=src_prt, dport=dst_prt)
+ / Raw(payload)
+ )
+ elif proto == "TCP":
+ p = (
+ Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+ / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+ / TCP(sport=src_prt, dport=dst_prt)
+ / Raw(payload)
+ )
# store a copy of the packet in the packet info
info.data = p.copy()
# append the packet to the list
@@ -2873,6 +2901,50 @@ class IPSecIPv4Fwd(VppTestCase):
self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
self.assert_equal(pkt_count, matched_pkts)
+ # Method verify_l3_l4_capture() will verify network and transport layer
+ # fields of the packet sa.encrypt() gives interface number garbadge.
+ # thus interface validation get failed (scapy bug?). However our intent
+ # is to verify IP layer and above and that is covered.
+
+ def verify_l3_l4_capture(
+ self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+ ):
+ for packet in capture:
+ try:
+ self.assert_packet_checksums_valid(packet)
+ self.assert_equal(
+ packet[IP].src,
+ src_if.remote_ip4,
+ "decrypted packet source address",
+ )
+ self.assert_equal(
+ packet[IP].dst,
+ dst_if.remote_ip4,
+ "decrypted packet destination address",
+ )
+ if packet.haslayer(TCP):
+ self.assertFalse(
+ packet.haslayer(UDP),
+ "unexpected UDP header in decrypted packet",
+ )
+ elif packet.haslayer(UDP):
+ if packet[UDP].payload:
+ self.assertFalse(
+ packet[UDP][1].haslayer(UDP),
+ "unexpected UDP header in decrypted packet",
+ )
+ else:
+ self.assertFalse(
+ packet.haslayer(UDP),
+ "unexpected UDP header in decrypted packet",
+ )
+ self.assert_equal(
+ packet[ICMP].id, self.icmp_id_in, "decrypted packet ICMP ID"
+ )
+ except Exception:
+ self.logger.error(ppp("Unexpected or invalid plain packet:", packet))
+ raise
+
class SpdFlowCacheTemplate(IPSecIPv4Fwd):
@classmethod
@@ -2948,6 +3020,84 @@ class SpdFlowCacheTemplate(IPSecIPv4Fwd):
self.logger.info("\ncrc32 NOT supported:\n" + cpu_info)
return False
+ def create_stream(
+ cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
+ ):
+ packets = []
+ packets = super(SpdFlowCacheTemplate, cls).create_stream(
+ src_if, dst_if, pkt_count, src_prt, dst_prt, proto
+ )
+ return packets
+
+ def verify_capture(
+ self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+ ):
+ super(SpdFlowCacheTemplate, self).verify_l3_l4_capture(
+ src_if, dst_if, capture, tcp_port_in, udp_port_in
+ )
+
+
+class SpdFastPathTemplate(IPSecIPv4Fwd):
+ @classmethod
+ def setUpConstants(cls):
+ super(SpdFastPathTemplate, cls).setUpConstants()
+ # Override this method with required cmdline parameters e.g.
+ # cls.vpp_cmdline.extend(["ipsec", "{",
+ # "ipv4-outbound-spd-flow-cache on",
+ # "}"])
+ # cls.logger.info("VPP modified cmdline is %s" % " "
+ # .join(cls.vpp_cmdline))
+
+ def setUp(self):
+ super(SpdFastPathTemplate, self).setUp()
+
+ def tearDown(self):
+ super(SpdFastPathTemplate, self).tearDown()
+
+ def create_stream(
+ cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
+ ):
+ packets = []
+ packets = super(SpdFastPathTemplate, cls).create_stream(
+ src_if, dst_if, pkt_count, src_prt, dst_prt, proto
+ )
+ return packets
+
+ def verify_capture(
+ self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+ ):
+ super(SpdFastPathTemplate, self).verify_l3_l4_capture(
+ src_if, dst_if, capture, tcp_port_in, udp_port_in
+ )
+
+
+class IpsecDefaultTemplate(IPSecIPv4Fwd):
+ @classmethod
+ def setUpConstants(cls):
+ super(IpsecDefaultTemplate, cls).setUpConstants()
+
+ def setUp(self):
+ super(IpsecDefaultTemplate, self).setUp()
+
+ def tearDown(self):
+ super(IpsecDefaultTemplate, self).tearDown()
+
+ def create_stream(
+ cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP"
+ ):
+ packets = []
+ packets = super(IpsecDefaultTemplate, cls).create_stream(
+ src_if, dst_if, pkt_count, src_prt, dst_prt, proto
+ )
+ return packets
+
+ def verify_capture(
+ self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678
+ ):
+ super(IpsecDefaultTemplate, self).verify_l3_l4_capture(
+ src_if, dst_if, capture, tcp_port_in, udp_port_in
+ )
+
class IPSecIPv6Fwd(VppTestCase):
"""Test IPSec by capturing and verifying IPv6 forwarded pkts"""
diff --git a/test/test_ipsec_spd_fp_input.py b/test/test_ipsec_spd_fp_input.py
index d70623ef2ea..03800325f05 100644
--- a/test/test_ipsec_spd_fp_input.py
+++ b/test/test_ipsec_spd_fp_input.py
@@ -4,9 +4,9 @@ import ipaddress
from util import ppp
from framework import VppTestRunner
-from template_ipsec import IPSecIPv4Fwd
from template_ipsec import IPSecIPv6Fwd
from test_ipsec_esp import TemplateIpsecEsp
+from template_ipsec import SpdFastPathTemplate
def debug_signal_handler(signal, frame):
@@ -20,7 +20,7 @@ import signal
signal.signal(signal.SIGINT, debug_signal_handler)
-class SpdFastPathInbound(IPSecIPv4Fwd):
+class SpdFastPathInbound(SpdFastPathTemplate):
# In test cases derived from this class, packets in IPv4 FWD path
# are configured to go through IPSec inbound SPD policy lookup.
# Note that order in which the rules are applied is