aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--test/template_ipsec.py195
-rw-r--r--test/test_ipsec_spd_fp_output.py668
2 files changed, 857 insertions, 6 deletions
diff --git a/test/template_ipsec.py b/test/template_ipsec.py
index 1b1c9aaa25d..2295b7579c5 100644
--- a/test/template_ipsec.py
+++ b/test/template_ipsec.py
@@ -2088,5 +2088,200 @@ class SpdFlowCacheTemplate(IPSecIPv4Fwd):
return False
+class IPSecIPv6Fwd(VppTestCase):
+ """Test IPSec by capturing and verifying IPv6 forwarded pkts"""
+
+ @classmethod
+ def setUpConstants(cls):
+ super(IPSecIPv6Fwd, cls).setUpConstants()
+
+ def setUp(self):
+ super(IPSecIPv6Fwd, self).setUp()
+ # store SPD objects so we can remove configs on tear down
+ self.spd_objs = []
+ self.spd_policies = []
+
+ def tearDown(self):
+ # remove SPD policies
+ for obj in self.spd_policies:
+ obj.remove_vpp_config()
+ self.spd_policies = []
+ # remove SPD items (interface bindings first, then SPD)
+ for obj in reversed(self.spd_objs):
+ obj.remove_vpp_config()
+ self.spd_objs = []
+ # close down pg intfs
+ for pg in self.pg_interfaces:
+ pg.unconfig_ip6()
+ pg.admin_down()
+ super(IPSecIPv6Fwd, self).tearDown()
+
+ def create_interfaces(self, num_ifs=2):
+ # create interfaces pg0 ... pg<num_ifs>
+ self.create_pg_interfaces(range(num_ifs))
+ for pg in self.pg_interfaces:
+ # put the interface up
+ pg.admin_up()
+ # configure IPv6 address on the interface
+ pg.config_ip6()
+ pg.resolve_ndp()
+ self.logger.info(self.vapi.ppcli("show int addr"))
+
+ def spd_create_and_intf_add(self, spd_id, pg_list):
+ spd = VppIpsecSpd(self, spd_id)
+ spd.add_vpp_config()
+ self.spd_objs.append(spd)
+ for pg in pg_list:
+ spdItf = VppIpsecSpdItfBinding(self, spd, pg)
+ spdItf.add_vpp_config()
+ self.spd_objs.append(spdItf)
+
+ def get_policy(self, policy_type):
+ e = VppEnum.vl_api_ipsec_spd_action_t
+ if policy_type == "protect":
+ return e.IPSEC_API_SPD_ACTION_PROTECT
+ elif policy_type == "bypass":
+ return e.IPSEC_API_SPD_ACTION_BYPASS
+ elif policy_type == "discard":
+ return e.IPSEC_API_SPD_ACTION_DISCARD
+ else:
+ raise Exception("Invalid policy type: %s", policy_type)
+
+ def spd_add_rem_policy(
+ self,
+ spd_id,
+ src_if,
+ dst_if,
+ proto,
+ is_out,
+ priority,
+ policy_type,
+ remove=False,
+ all_ips=False,
+ ip_range=False,
+ local_ip_start=ip_address("0::0"),
+ local_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
+ remote_ip_start=ip_address("0::0"),
+ remote_ip_stop=ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"),
+ remote_port_start=0,
+ remote_port_stop=65535,
+ local_port_start=0,
+ local_port_stop=65535,
+ ):
+ spd = VppIpsecSpd(self, spd_id)
+
+ if all_ips:
+ src_range_low = ip_address("0::0")
+ src_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
+ dst_range_low = ip_address("0::0")
+ dst_range_high = ip_address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff")
+
+ elif ip_range:
+ src_range_low = local_ip_start
+ src_range_high = local_ip_stop
+ dst_range_low = remote_ip_start
+ dst_range_high = remote_ip_stop
+
+ else:
+ src_range_low = src_if.remote_ip6
+ src_range_high = src_if.remote_ip6
+ dst_range_low = dst_if.remote_ip6
+ dst_range_high = dst_if.remote_ip6
+
+ spdEntry = VppIpsecSpdEntry(
+ self,
+ spd,
+ 0,
+ src_range_low,
+ src_range_high,
+ dst_range_low,
+ dst_range_high,
+ proto,
+ priority=priority,
+ policy=self.get_policy(policy_type),
+ is_outbound=is_out,
+ remote_port_start=remote_port_start,
+ remote_port_stop=remote_port_stop,
+ local_port_start=local_port_start,
+ local_port_stop=local_port_stop,
+ )
+
+ if remove is False:
+ spdEntry.add_vpp_config()
+ self.spd_policies.append(spdEntry)
+ else:
+ spdEntry.remove_vpp_config()
+ self.spd_policies.remove(spdEntry)
+ self.logger.info(self.vapi.ppcli("show ipsec all"))
+ return spdEntry
+
+ def create_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678):
+ packets = []
+ for i in range(pkt_count):
+ # create packet info stored in the test case instance
+ info = self.create_packet_info(src_if, dst_if)
+ # convert the info into packet payload
+ payload = self.info_to_payload(info)
+ # create the packet itself
+ p = (
+ Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+ / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
+ / UDP(sport=src_prt, dport=dst_prt)
+ / Raw(payload)
+ )
+ # store a copy of the packet in the packet info
+ info.data = p.copy()
+ # append the packet to the list
+ packets.append(p)
+ # return the created packet list
+ return packets
+
+ def verify_capture(self, src_if, dst_if, capture):
+ packet_info = None
+ for packet in capture:
+ try:
+ ip = packet[IPv6]
+ udp = packet[UDP]
+ # convert the payload to packet info object
+ payload_info = self.payload_to_info(packet)
+ # make sure the indexes match
+ self.assert_equal(
+ payload_info.src, src_if.sw_if_index, "source sw_if_index"
+ )
+ self.assert_equal(
+ payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
+ )
+ packet_info = self.get_next_packet_info_for_interface2(
+ src_if.sw_if_index, dst_if.sw_if_index, packet_info
+ )
+ # make sure we didn't run out of saved packets
+ self.assertIsNotNone(packet_info)
+ self.assert_equal(
+ payload_info.index, packet_info.index, "packet info index"
+ )
+ saved_packet = packet_info.data # fetch the saved packet
+ # assert the values match
+ self.assert_equal(ip.src, saved_packet[IPv6].src, "IP source address")
+ # ... more assertions here
+ self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
+ except Exception as e:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+ remaining_packet = self.get_next_packet_info_for_interface2(
+ src_if.sw_if_index, dst_if.sw_if_index, packet_info
+ )
+ self.assertIsNone(
+ remaining_packet,
+ "Interface %s: Packet expected from interface "
+ "%s didn't arrive" % (dst_if.name, src_if.name),
+ )
+
+ def verify_policy_match(self, pkt_count, spdEntry):
+ self.logger.info("XXXX %s %s", str(spdEntry), str(spdEntry.get_stats()))
+ matched_pkts = spdEntry.get_stats().get("packets")
+ self.logger.info("Policy %s matched: %d pkts", str(spdEntry), matched_pkts)
+ self.assert_equal(pkt_count, matched_pkts)
+
+
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)
diff --git a/test/test_ipsec_spd_fp_output.py b/test/test_ipsec_spd_fp_output.py
index 645939c98a9..242d30dcbab 100644
--- a/test/test_ipsec_spd_fp_output.py
+++ b/test/test_ipsec_spd_fp_output.py
@@ -5,6 +5,7 @@ import ipaddress
from util import ppp
from framework import VppTestRunner
from template_ipsec import IPSecIPv4Fwd
+from template_ipsec import IPSecIPv6Fwd
class SpdFastPathOutbound(IPSecIPv4Fwd):
@@ -16,6 +17,15 @@ class SpdFastPathOutbound(IPSecIPv4Fwd):
cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
+class SpdFastPathIPv6Outbound(IPSecIPv6Fwd):
+ # Override setUpConstants to enable outbound fast path in config
+ @classmethod
+ def setUpConstants(cls):
+ super(SpdFastPathIPv6Outbound, cls).setUpConstants()
+ cls.vpp_cmdline.extend(["ipsec", "{", "ipv6-outbound-spd-fast-path on", "}"])
+ cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
+
+
class IPSec4SpdTestCaseAdd(SpdFastPathOutbound):
""" IPSec/IPv4 outbound: Policy mode test case with fast path \
(add rule)"""
@@ -467,8 +477,6 @@ class IPSec4SpdTestCaseRemove(SpdFastPathOutbound):
# verify all policies matched the expected number of times
self.verify_policy_match(pkt_count, policy_0)
self.verify_policy_match(0, policy_1)
- # check policy in SPD has been cached after traffic
- # matched BYPASS rule in SPD
# now remove the bypass rule
self.spd_add_rem_policy( # outbound, priority 10
1,
@@ -560,10 +568,7 @@ class IPSec4SpdTestCaseReadd(SpdFastPathOutbound):
# verify all policies matched the expected number of times
self.verify_policy_match(pkt_count, policy_0)
self.verify_policy_match(0, policy_1)
- # check policy in SPD has been cached after traffic
- # matched BYPASS rule in SPD
-
- # now remove the bypass rule, leaving only the discard rule
+ # remove the bypass rule, leaving only the discard rule
self.spd_add_rem_policy( # outbound, priority 10
1,
self.pg0,
@@ -758,5 +763,656 @@ class IPSec4SpdTestCaseMultiple(SpdFastPathOutbound):
self.verify_policy_match(pkt_count, policy_22)
+class IPSec6SpdTestCaseAdd(SpdFastPathIPv6Outbound):
+ """ IPSec/IPv6 outbound: Policy mode test case with fast path \
+ (add rule)"""
+
+ def test_ipsec_spd_outbound_add(self):
+ # In this test case, packets in IPv4 FWD path are configured
+ # to go through IPSec outbound SPD policy lookup.
+ # 2 SPD rules (1 HIGH and 1 LOW) are added.
+ # High priority rule action is set to BYPASS.
+ # Low priority rule action is set to DISCARD.
+ # Traffic sent on pg0 interface should match high priority
+ # rule and should be sent out on pg1 interface.
+ self.create_interfaces(2)
+ pkt_count = 5
+ s_port_s = 1111
+ s_port_e = 1111
+ d_port_s = 2222
+ d_port_e = 2222
+ self.spd_create_and_intf_add(1, [self.pg1])
+ policy_0 = self.spd_add_rem_policy( # outbound, priority 10
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ local_port_start=s_port_s,
+ local_port_stop=s_port_e,
+ remote_port_start=d_port_s,
+ remote_port_stop=d_port_e,
+ )
+ policy_1 = self.spd_add_rem_policy( # outbound, priority 5
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=5,
+ policy_type="discard",
+ local_port_start=s_port_s,
+ local_port_stop=s_port_e,
+ remote_port_start=d_port_s,
+ remote_port_stop=d_port_e,
+ )
+
+ # create the packet stream
+ packets = self.create_stream(self.pg0, self.pg1, pkt_count, s_port_s, d_port_s)
+ # add the stream to the source interface + enable capture
+ self.pg0.add_stream(packets)
+ self.pg0.enable_capture()
+ self.pg1.enable_capture()
+ # start the packet generator
+ self.pg_start()
+ # get capture
+ capture = self.pg1.get_capture()
+ for packet in capture:
+ try:
+ self.logger.debug(ppp("SPD - Got packet:", packet))
+ except Exception:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+ self.logger.debug("SPD: Num packets: %s", len(capture.res))
+
+ # assert nothing captured on pg0
+ self.pg0.assert_nothing_captured()
+ # verify captured packets
+ self.verify_capture(self.pg0, self.pg1, capture)
+ # verify all policies matched the expected number of times
+ self.verify_policy_match(pkt_count, policy_0)
+ self.verify_policy_match(0, policy_1)
+
+
+class IPSec6SpdTestCaseAddAll(SpdFastPathIPv6Outbound):
+ """ IPSec/IPv6 outbound: Policy mode test case with fast path \
+ (add all ips ports rule)"""
+
+ def test_ipsec_spd_outbound_add(self):
+ # In this test case, packets in IPv4 FWD path are configured
+ # to go through IPSec outbound SPD policy lookup.
+ # 2 SPD rules (1 HIGH and 1 LOW) are added.
+ # Low priority rule action is set to BYPASS all ips.
+ # High priority rule action is set to DISCARD all ips.
+ # Traffic sent on pg0 interface when LOW priority rule is added,
+ # expect the packet is being sent out to pg1. Then HIGH priority
+ # rule is added and send the same traffic to pg0, this time expect
+ # the traffic is dropped.
+ self.create_interfaces(2)
+ pkt_count = 5
+ self.spd_create_and_intf_add(1, [self.pg1])
+ policy_0 = self.spd_add_rem_policy( # outbound, priority 10
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ all_ips=True,
+ )
+
+ # create the packet stream
+ packets = self.create_stream(self.pg0, self.pg1, pkt_count)
+ # add the stream to the source interface + enable capture
+ self.pg0.add_stream(packets)
+ self.pg0.enable_capture()
+ self.pg1.enable_capture()
+ # start the packet generator
+ self.pg_start()
+ # get capture
+ capture = self.pg1.get_capture()
+ for packet in capture:
+ try:
+ self.logger.debug(ppp("SPD - Got packet:", packet))
+ except Exception:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+ self.logger.debug("SPD: Num packets: %s", len(capture.res))
+
+ # assert nothing captured on pg0
+ self.pg0.assert_nothing_captured()
+ # verify captured packets
+ self.verify_capture(self.pg0, self.pg1, capture)
+ # verify all policies matched the expected number of times
+ self.verify_policy_match(pkt_count, policy_0)
+
+ policy_1 = self.spd_add_rem_policy( # outbound, priority 20
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=20,
+ policy_type="discard",
+ all_ips=True,
+ )
+
+ # create the packet stream
+ packets = self.create_stream(self.pg0, self.pg1, pkt_count)
+ # add the stream to the source interface + enable capture
+ self.pg0.add_stream(packets)
+ self.pg0.enable_capture()
+ self.pg1.enable_capture()
+ # start the packet generator
+ self.pg_start()
+ # assert nothing captured on pg0 and pg1
+ self.pg0.assert_nothing_captured()
+ self.pg1.assert_nothing_captured()
+
+
+class IPSec6SpdTestCaseAddPortRange(SpdFastPathIPv6Outbound):
+ """ IPSec/IPv6 outbound: Policy mode test case with fast path \
+ (add all ips port range rule)"""
+
+ def test_ipsec_spd_outbound_add(self):
+ # In this test case, packets in IPv4 FWD path are configured
+ # to go through IPSec outbound SPD policy lookup.
+ # 2 SPD rules (1 HIGH and 1 LOW) are added.
+ # High priority rule action is set to BYPASS.
+ # Low priority rule action is set to DISCARD.
+ # Traffic sent on pg0 interface should match high priority
+ # rule and should be sent out on pg1 interface.
+ self.create_interfaces(2)
+ pkt_count = 5
+ s_port_s = 1000
+ s_port_e = 2023
+ d_port_s = 5000
+ d_port_e = 6023
+ self.spd_create_and_intf_add(1, [self.pg1])
+ policy_0 = self.spd_add_rem_policy( # outbound, priority 10
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ all_ips=True,
+ local_port_start=s_port_s,
+ local_port_stop=s_port_e,
+ remote_port_start=d_port_s,
+ remote_port_stop=d_port_e,
+ )
+ policy_1 = self.spd_add_rem_policy( # outbound, priority 5
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=5,
+ policy_type="discard",
+ all_ips=True,
+ local_port_start=s_port_s,
+ local_port_stop=s_port_e,
+ remote_port_start=d_port_s,
+ remote_port_stop=d_port_e,
+ )
+
+ # create the packet stream
+ packets = self.create_stream(self.pg0, self.pg1, pkt_count, 1333, 5444)
+ # add the stream to the source interface + enable capture
+ self.pg0.add_stream(packets)
+ self.pg0.enable_capture()
+ self.pg1.enable_capture()
+ # start the packet generator
+ self.pg_start()
+ # get capture
+ capture = self.pg1.get_capture()
+ for packet in capture:
+ try:
+ self.logger.debug(ppp("SPD - Got packet:", packet))
+ except Exception:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+ self.logger.debug("SPD: Num packets: %s", len(capture.res))
+
+ # assert nothing captured on pg0
+ self.pg0.assert_nothing_captured()
+ # verify captured packets
+ self.verify_capture(self.pg0, self.pg1, capture)
+ # verify all policies matched the expected number of times
+ self.verify_policy_match(pkt_count, policy_0)
+ self.verify_policy_match(0, policy_1)
+
+
+class IPSec6SpdTestCaseAddIPRange(SpdFastPathIPv6Outbound):
+ """ IPSec/IPv6 outbound: Policy mode test case with fast path \
+ (add ips range with any port rule)"""
+
+ def test_ipsec_spd_outbound_add(self):
+ # In this test case, packets in IPv4 FWD path are configured
+ # to go through IPSec outbound SPD policy lookup.
+ # 2 SPD rules (1 HIGH and 1 LOW) are added.
+ # High priority rule action is set to BYPASS.
+ # Low priority rule action is set to DISCARD.
+ # Traffic sent on pg0 interface should match high priority
+ # rule and should be sent out on pg1 interface.
+ self.create_interfaces(2)
+ pkt_count = 5
+ s_ip_s = ipaddress.ip_address(self.pg0.remote_ip6)
+ s_ip_e = ipaddress.ip_address(int(s_ip_s) + 5)
+ d_ip_s = ipaddress.ip_address(self.pg1.remote_ip6)
+ d_ip_e = ipaddress.ip_address(int(d_ip_s) + 0)
+ self.spd_create_and_intf_add(1, [self.pg1])
+ policy_0 = self.spd_add_rem_policy( # outbound, priority 10
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ ip_range=True,
+ local_ip_start=s_ip_s,
+ local_ip_stop=s_ip_e,
+ remote_ip_start=d_ip_s,
+ remote_ip_stop=d_ip_e,
+ )
+ policy_1 = self.spd_add_rem_policy( # outbound, priority 5
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=5,
+ policy_type="discard",
+ ip_range=True,
+ local_ip_start=s_ip_s,
+ local_ip_stop=s_ip_e,
+ remote_ip_start=d_ip_s,
+ remote_ip_stop=d_ip_e,
+ )
+
+ # create the packet stream
+ packets = self.create_stream(self.pg0, self.pg1, pkt_count)
+ # add the stream to the source interface + enable capture
+ self.pg0.add_stream(packets)
+ self.pg0.enable_capture()
+ self.pg1.enable_capture()
+ # start the packet generator
+ self.pg_start()
+ # get capture
+ capture = self.pg1.get_capture()
+ for packet in capture:
+ try:
+ self.logger.debug(ppp("SPD - Got packet:", packet))
+ except Exception:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+ self.logger.debug("SPD: Num packets: %s", len(capture.res))
+
+ # assert nothing captured on pg0
+ self.pg0.assert_nothing_captured()
+ # verify captured packets
+ self.verify_capture(self.pg0, self.pg1, capture)
+ # verify all policies matched the expected number of times
+ self.verify_policy_match(pkt_count, policy_0)
+ self.verify_policy_match(0, policy_1)
+
+
+class IPSec6SpdTestCaseAddIPAndPortRange(SpdFastPathIPv6Outbound):
+ """ IPSec/IPvr6 outbound: Policy mode test case with fast path \
+ (add all ips range rule)"""
+
+ def test_ipsec_spd_outbound_add(self):
+ # In this test case, packets in IPv4 FWD path are configured
+ # to go through IPSec outbound SPD policy lookup.
+ # 2 SPD rules (1 HIGH and 1 LOW) are added.
+ # High priority rule action is set to BYPASS.
+ # Low priority rule action is set to DISCARD.
+ # Traffic sent on pg0 interface should match high priority
+ # rule and should be sent out on pg1 interface.
+ # in this test we define ranges of ports and ip addresses.
+ self.create_interfaces(2)
+ pkt_count = 5
+ s_port_s = 1000
+ s_port_e = 1000 + 1023
+ d_port_s = 5000
+ d_port_e = 5000 + 1023
+
+ s_ip_s = ipaddress.ip_address(
+ int(ipaddress.ip_address(self.pg0.remote_ip6)) - 24
+ )
+ s_ip_e = ipaddress.ip_address(int(s_ip_s) + 255)
+ d_ip_s = ipaddress.ip_address(self.pg1.remote_ip6)
+ d_ip_e = ipaddress.ip_address(int(d_ip_s) + 255)
+ self.spd_create_and_intf_add(1, [self.pg1])
+ policy_0 = self.spd_add_rem_policy( # outbound, priority 10
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ ip_range=True,
+ local_ip_start=s_ip_s,
+ local_ip_stop=s_ip_e,
+ remote_ip_start=d_ip_s,
+ remote_ip_stop=d_ip_e,
+ local_port_start=s_port_s,
+ local_port_stop=s_port_e,
+ remote_port_start=d_port_s,
+ remote_port_stop=d_port_e,
+ )
+ policy_1 = self.spd_add_rem_policy( # outbound, priority 5
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=5,
+ policy_type="discard",
+ ip_range=True,
+ local_ip_start=s_ip_s,
+ local_ip_stop=s_ip_e,
+ remote_ip_start=d_ip_s,
+ remote_ip_stop=d_ip_e,
+ local_port_start=s_port_s,
+ local_port_stop=s_port_e,
+ remote_port_start=d_port_s,
+ remote_port_stop=d_port_e,
+ )
+
+ # create the packet stream
+ packets = self.create_stream(self.pg0, self.pg1, pkt_count)
+ # add the stream to the source interface + enable capture
+ self.pg0.add_stream(packets)
+ self.pg0.enable_capture()
+ self.pg1.enable_capture()
+ # start the packet generator
+ self.pg_start()
+ # get capture
+ capture = self.pg1.get_capture()
+ for packet in capture:
+ try:
+ self.logger.debug(ppp("SPD - Got packet:", packet))
+ except Exception:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+ self.logger.debug("SPD: Num packets: %s", len(capture.res))
+
+ # assert nothing captured on pg0
+ self.pg0.assert_nothing_captured()
+ # verify captured packets
+ self.verify_capture(self.pg0, self.pg1, capture)
+ # verify all policies matched the expected number of times
+ self.verify_policy_match(pkt_count, policy_0)
+ self.verify_policy_match(0, policy_1)
+
+
+class IPSec6SpdTestCaseReadd(SpdFastPathIPv6Outbound):
+ """ IPSec/IPv6 outbound: Policy mode test case with fast path \
+ (add, remove, re-add)"""
+
+ def test_ipsec_spd_outbound_readd(self):
+ # In this test case, packets in IPv4 FWD path are configured
+ # to go through IPSec outbound SPD policy lookup.
+ # 2 SPD rules (1 HIGH and 1 LOW) are added.
+ # High priority rule action is set to BYPASS.
+ # Low priority rule action is set to DISCARD.
+ # Traffic sent on pg0 interface should match high priority
+ # rule and should be sent out on pg1 interface.
+ # High priority rule is then removed.
+ # Traffic sent on pg0 interface should match low priority
+ # rule and should be discarded after SPD lookup.
+ # Readd high priority rule.
+ # Traffic sent on pg0 interface should match high priority
+ # rule and should be sent out on pg1 interface.
+ self.create_interfaces(2)
+ pkt_count = 5
+ self.spd_create_and_intf_add(1, [self.pg1])
+ policy_0 = self.spd_add_rem_policy( # outbound, priority 10
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
+ policy_1 = self.spd_add_rem_policy( # outbound, priority 5
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=5,
+ policy_type="discard",
+ )
+
+ # create the packet stream
+ packets = self.create_stream(self.pg0, self.pg1, pkt_count)
+ # add the stream to the source interface + enable capture
+ self.pg0.add_stream(packets)
+ self.pg0.enable_capture()
+ self.pg1.enable_capture()
+ # start the packet generator
+ self.pg_start()
+ # get capture
+ capture = self.pg1.get_capture()
+ for packet in capture:
+ try:
+ self.logger.debug(ppp("SPD - Got packet:", packet))
+ except Exception:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+ self.logger.debug("SPD: Num packets: %s", len(capture.res))
+
+ # assert nothing captured on pg0
+ self.pg0.assert_nothing_captured()
+ # verify capture on pg1
+ self.verify_capture(self.pg0, self.pg1, capture)
+ # verify all policies matched the expected number of times
+ self.verify_policy_match(pkt_count, policy_0)
+ self.verify_policy_match(0, policy_1)
+ # remove the bypass rule, leaving only the discard rule
+ self.spd_add_rem_policy( # outbound, priority 10
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ remove=True,
+ )
+
+ # resend the same packets
+ self.pg0.add_stream(packets)
+ self.pg0.enable_capture() # flush the old captures
+ self.pg1.enable_capture()
+ self.pg_start()
+
+ # assert nothing captured on pg0
+ self.pg0.assert_nothing_captured()
+ # all packets will be dropped by SPD rule
+ self.pg1.assert_nothing_captured()
+ # verify all policies matched the expected number of times
+ self.verify_policy_match(pkt_count, policy_0)
+ self.verify_policy_match(pkt_count, policy_1)
+
+ # now readd the bypass rule
+ policy_0 = self.spd_add_rem_policy( # outbound, priority 10
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
+
+ # resend the same packets
+ self.pg0.add_stream(packets)
+ self.pg0.enable_capture() # flush the old captures
+ self.pg1.enable_capture()
+ self.pg_start()
+
+ # get capture
+ capture = self.pg1.get_capture(pkt_count)
+ for packet in capture:
+ try:
+ self.logger.debug(ppp("SPD - Got packet:", packet))
+ except Exception:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+ self.logger.debug("SPD: Num packets: %s", len(capture.res))
+
+ # assert nothing captured on pg0
+ self.pg0.assert_nothing_captured()
+ # verify captured packets
+ self.verify_capture(self.pg0, self.pg1, capture)
+ # verify all policies matched the expected number of times
+ self.verify_policy_match(pkt_count, policy_0)
+ self.verify_policy_match(pkt_count, policy_1)
+
+
+class IPSec6SpdTestCaseMultiple(SpdFastPathIPv6Outbound):
+ """ IPSec/IPv6 outbound: Policy mode test case with fast path \
+ (multiple interfaces, multiple rules)"""
+
+ def test_ipsec_spd_outbound_multiple(self):
+ # In this test case, packets in IPv4 FWD path are configured to go
+ # through IPSec outbound SPD policy lookup.
+ # Multiples rules on multiple interfaces are tested at the same time.
+ # 3x interfaces are configured, binding the same SPD to each.
+ # Each interface has 2 SPD rules (1 BYPASS and 1 DISCARD).
+ # On pg0 & pg1, the BYPASS rule is HIGH priority
+ # On pg2, the DISCARD rule is HIGH priority
+ # Traffic should be received on pg0 & pg1 and dropped on pg2.
+ self.create_interfaces(3)
+ pkt_count = 5
+ # bind SPD to all interfaces
+ self.spd_create_and_intf_add(1, self.pg_interfaces)
+ # add rules on all interfaces
+ policy_01 = self.spd_add_rem_policy( # outbound, priority 10
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
+ policy_02 = self.spd_add_rem_policy( # outbound, priority 5
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=5,
+ policy_type="discard",
+ )
+
+ policy_11 = self.spd_add_rem_policy( # outbound, priority 10
+ 1,
+ self.pg1,
+ self.pg2,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
+ policy_12 = self.spd_add_rem_policy( # outbound, priority 5
+ 1,
+ self.pg1,
+ self.pg2,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=5,
+ policy_type="discard",
+ )
+
+ policy_21 = self.spd_add_rem_policy( # outbound, priority 5
+ 1,
+ self.pg2,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=5,
+ policy_type="bypass",
+ )
+ policy_22 = self.spd_add_rem_policy( # outbound, priority 10
+ 1,
+ self.pg2,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="discard",
+ )
+
+ # interfaces bound to an SPD, will by default drop inbound
+ # traffic with no matching policies. add catch-all inbound
+ # bypass rule to SPD:
+ self.spd_add_rem_policy( # inbound, all interfaces
+ 1,
+ None,
+ None,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ all_ips=True,
+ )
+
+ # create the packet streams
+ packets0 = self.create_stream(self.pg0, self.pg1, pkt_count)
+ packets1 = self.create_stream(self.pg1, self.pg2, pkt_count)
+ packets2 = self.create_stream(self.pg2, self.pg0, pkt_count)
+ # add the streams to the source interfaces
+ self.pg0.add_stream(packets0)
+ self.pg1.add_stream(packets1)
+ self.pg2.add_stream(packets2)
+ # enable capture on all interfaces
+ for pg in self.pg_interfaces:
+ pg.enable_capture()
+ # start the packet generator
+ self.pg_start()
+
+ # get captures
+ if_caps = []
+ for pg in [self.pg1, self.pg2]: # we are expecting captures on pg1/pg2
+ if_caps.append(pg.get_capture())
+ for packet in if_caps[-1]:
+ try:
+ self.logger.debug(ppp("SPD - Got packet:", packet))
+ except Exception:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+ self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res))
+ self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res))
+
+ # verify captures that matched BYPASS rule
+ self.verify_capture(self.pg0, self.pg1, if_caps[0])
+ self.verify_capture(self.pg1, self.pg2, if_caps[1])
+ # verify that traffic to pg0 matched DISCARD rule and was dropped
+ self.pg0.assert_nothing_captured()
+ # verify all packets that were expected to match rules, matched
+ # pg0 -> pg1
+ self.verify_policy_match(pkt_count, policy_01)
+ self.verify_policy_match(0, policy_02)
+ # pg1 -> pg2
+ self.verify_policy_match(pkt_count, policy_11)
+ self.verify_policy_match(0, policy_12)
+ # pg2 -> pg0
+ self.verify_policy_match(0, policy_21)
+ self.verify_policy_match(pkt_count, policy_22)
+
+
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)