diff options
author | vinay Tripathi <vinayx.tripathi@intel.com> | 2023-10-20 05:20:47 +0000 |
---|---|---|
committer | Fan Zhang <fanzhang.oss@gmail.com> | 2023-10-31 10:33:13 +0000 |
commit | bc5f5305997e3b8f624b64bcc2d68687f31d515a (patch) | |
tree | d7fe0ff7b8881d26936568696353a9ff98faecfc /test/template_ipsec.py | |
parent | 75069cee95039172e8d2a3af58e4a8ed0a411d3a (diff) |
ipsec: modify IPsec related tests to send and verify UDP-encapsulated ESP traffics
In this patch, IPsec related test files have been modified to send UDP-encapsulated
ESP packets,and validate against Inbound and Outbound policies that are configured
with Bypass, Discard and Protect action.
Type: test
Change-Id: I4b8da18270fd177868223bfe1389dc9c50e86cc5
Signed-off-by: vinay Tripathi <vinayx.tripathi@intel.com>
Diffstat (limited to 'test/template_ipsec.py')
-rw-r--r-- | test/template_ipsec.py | 164 |
1 files changed, 157 insertions, 7 deletions
diff --git a/test/template_ipsec.py b/test/template_ipsec.py index c438fbb7680..4d4e4c60997 100644 --- a/test/template_ipsec.py +++ b/test/template_ipsec.py @@ -2806,20 +2806,48 @@ class IPSecIPv4Fwd(VppTestCase): self.logger.info(self.vapi.ppcli("show ipsec all")) return spdEntry - def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678): + def create_stream( + self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP" + ): packets = [] + # create SA + sa = SecurityAssociation( + ESP, + spi=1000, + crypt_algo="AES-CBC", + crypt_key=b"JPjyOWBeVEQiMe7h", + auth_algo="HMAC-SHA1-96", + auth_key=b"C91KUR9GYMm5GfkEvNjX", + tunnel_header=IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4), + nat_t_header=UDP(sport=src_prt, dport=dst_prt), + ) for i in range(pkt_count): # create packet info stored in the test case instance info = self.create_packet_info(src_if, dst_if) # convert the info into packet payload payload = self.info_to_payload(info) # create the packet itself - p = ( - Ether(dst=src_if.local_mac, src=src_if.remote_mac) - / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) - / UDP(sport=src_prt, dport=dst_prt) - / Raw(payload) - ) + p = [] + if proto == "UDP-ESP": + p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) / sa.encrypt( + IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) + / UDP(sport=src_prt, dport=dst_prt) + / Raw(payload) + ) + elif proto == "UDP": + p = ( + Ether(dst=src_if.local_mac, src=src_if.remote_mac) + / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) + / UDP(sport=src_prt, dport=dst_prt) + / Raw(payload) + ) + elif proto == "TCP": + p = ( + Ether(dst=src_if.local_mac, src=src_if.remote_mac) + / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) + / TCP(sport=src_prt, dport=dst_prt) + / Raw(payload) + ) # store a copy of the packet in the packet info info.data = p.copy() # append the packet to the list @@ -2873,6 +2901,50 @@ class IPSecIPv4Fwd(VppTestCase): self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts) self.assert_equal(pkt_count, matched_pkts) + # Method verify_l3_l4_capture() will verify network and transport layer + # fields of the packet sa.encrypt() gives interface number garbadge. + # thus interface validation get failed (scapy bug?). However our intent + # is to verify IP layer and above and that is covered. + + def verify_l3_l4_capture( + self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678 + ): + for packet in capture: + try: + self.assert_packet_checksums_valid(packet) + self.assert_equal( + packet[IP].src, + src_if.remote_ip4, + "decrypted packet source address", + ) + self.assert_equal( + packet[IP].dst, + dst_if.remote_ip4, + "decrypted packet destination address", + ) + if packet.haslayer(TCP): + self.assertFalse( + packet.haslayer(UDP), + "unexpected UDP header in decrypted packet", + ) + elif packet.haslayer(UDP): + if packet[UDP].payload: + self.assertFalse( + packet[UDP][1].haslayer(UDP), + "unexpected UDP header in decrypted packet", + ) + else: + self.assertFalse( + packet.haslayer(UDP), + "unexpected UDP header in decrypted packet", + ) + self.assert_equal( + packet[ICMP].id, self.icmp_id_in, "decrypted packet ICMP ID" + ) + except Exception: + self.logger.error(ppp("Unexpected or invalid plain packet:", packet)) + raise + class SpdFlowCacheTemplate(IPSecIPv4Fwd): @classmethod @@ -2948,6 +3020,84 @@ class SpdFlowCacheTemplate(IPSecIPv4Fwd): self.logger.info("\ncrc32 NOT supported:\n" + cpu_info) return False + def create_stream( + cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP" + ): + packets = [] + packets = super(SpdFlowCacheTemplate, cls).create_stream( + src_if, dst_if, pkt_count, src_prt, dst_prt, proto + ) + return packets + + def verify_capture( + self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678 + ): + super(SpdFlowCacheTemplate, self).verify_l3_l4_capture( + src_if, dst_if, capture, tcp_port_in, udp_port_in + ) + + +class SpdFastPathTemplate(IPSecIPv4Fwd): + @classmethod + def setUpConstants(cls): + super(SpdFastPathTemplate, cls).setUpConstants() + # Override this method with required cmdline parameters e.g. + # cls.vpp_cmdline.extend(["ipsec", "{", + # "ipv4-outbound-spd-flow-cache on", + # "}"]) + # cls.logger.info("VPP modified cmdline is %s" % " " + # .join(cls.vpp_cmdline)) + + def setUp(self): + super(SpdFastPathTemplate, self).setUp() + + def tearDown(self): + super(SpdFastPathTemplate, self).tearDown() + + def create_stream( + cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP" + ): + packets = [] + packets = super(SpdFastPathTemplate, cls).create_stream( + src_if, dst_if, pkt_count, src_prt, dst_prt, proto + ) + return packets + + def verify_capture( + self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678 + ): + super(SpdFastPathTemplate, self).verify_l3_l4_capture( + src_if, dst_if, capture, tcp_port_in, udp_port_in + ) + + +class IpsecDefaultTemplate(IPSecIPv4Fwd): + @classmethod + def setUpConstants(cls): + super(IpsecDefaultTemplate, cls).setUpConstants() + + def setUp(self): + super(IpsecDefaultTemplate, self).setUp() + + def tearDown(self): + super(IpsecDefaultTemplate, self).tearDown() + + def create_stream( + cls, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678, proto="UDP-ESP" + ): + packets = [] + packets = super(IpsecDefaultTemplate, cls).create_stream( + src_if, dst_if, pkt_count, src_prt, dst_prt, proto + ) + return packets + + def verify_capture( + self, src_if, dst_if, capture, tcp_port_in=1234, udp_port_in=5678 + ): + super(IpsecDefaultTemplate, self).verify_l3_l4_capture( + src_if, dst_if, capture, tcp_port_in, udp_port_in + ) + class IPSecIPv6Fwd(VppTestCase): """Test IPSec by capturing and verifying IPv6 forwarded pkts""" |