aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/template_ipsec.py21
-rw-r--r--test/test_ipsec_spd_fp_input.py152
2 files changed, 166 insertions, 7 deletions
diff --git a/test/template_ipsec.py b/test/template_ipsec.py
index 4e68d44013f..ab5aa9390da 100644
--- a/test/template_ipsec.py
+++ b/test/template_ipsec.py
@@ -16,6 +16,8 @@ from scapy.layers.inet6 import (
IPv6ExtHdrDestOpt,
)
+from scapy.layers.isakmp import ISAKMP
+
from framework import VppTestCase
from asfframework import VppTestRunner
@@ -3246,11 +3248,22 @@ class IPSecIPv6Fwd(VppTestCase):
payload = self.info_to_payload(info)
# create the packet itself
p = (
- Ether(dst=src_if.local_mac, src=src_if.remote_mac)
- / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
- / UDP(sport=src_prt, dport=dst_prt)
- / Raw(payload)
+ (
+ Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+ / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
+ / UDP(sport=src_prt, dport=dst_prt)
+ / ISAKMP()
+ / Raw(payload)
+ )
+ if (src_prt == 500 or src_prt == 4500)
+ else (
+ Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+ / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
+ / UDP(sport=src_prt, dport=dst_prt)
+ / Raw(payload)
+ )
)
+
# store a copy of the packet in the packet info
info.data = p.copy()
# append the packet to the list
diff --git a/test/test_ipsec_spd_fp_input.py b/test/test_ipsec_spd_fp_input.py
index 1953bbe5eaf..ed38a51abdb 100644
--- a/test/test_ipsec_spd_fp_input.py
+++ b/test/test_ipsec_spd_fp_input.py
@@ -835,9 +835,6 @@ class IPSec4SpdTestCaseMultiple(SpdFastPathInbound):
self.verify_policy_match(0, policy_22)
-@unittest.skipIf(
- "ping" in config.excluded_plugins, "Exclude tests requiring Ping plugin"
-)
class IPSec6SpdTestCaseProtect(SpdFastPathIPv6InboundProtect):
""" IPSec/IPv6 inbound: Policy mode test case with fast path \
(add protect)"""
@@ -889,6 +886,155 @@ class IPSec6SpdTestCaseProtect(SpdFastPathIPv6InboundProtect):
self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
+class IPSec6SpdTestCaseBypass(SpdFastPathIPv6Inbound):
+ """ IPSec/IPv6 inbound: Policy mode test case with fast path \
+ (add bypass)"""
+
+ def test_ipsec_spd_inbound_bypass(self):
+ # In this test case, packets in IPv6 FWD path are configured
+ # to go through IPSec inbound SPD policy lookup.
+ #
+ # 2 inbound SPD rules (1 HIGH and 1 LOW) are added.
+ # - High priority rule action is set to DISCARD.
+ # - Low priority rule action is set to BYPASS.
+ #
+ # Since BYPASS rules take precedence over DISCARD
+ # (the order being PROTECT, BYPASS, DISCARD) we expect the
+ # BYPASS rule to match and traffic to be correctly forwarded.
+ self.create_interfaces(2)
+ pkt_count = 5
+
+ self.spd_create_and_intf_add(1, [self.pg1, self.pg0])
+
+ # create input rules
+ # bypass rule should take precedence over discard rule,
+ # even though it's lower priority, because for input policies
+ # matching PROTECT policies precedes matching BYPASS policies
+ # which preceeds matching for DISCARD policies.
+ # Any hit stops the process.
+ policy_0 = self.spd_add_rem_policy( # inbound, priority 10
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ ip_range=True,
+ local_ip_start=self.pg1.remote_ip6,
+ local_ip_stop=self.pg1.remote_ip6,
+ remote_ip_start=self.pg0.remote_ip6,
+ remote_ip_stop=self.pg0.remote_ip6,
+ )
+ policy_1 = self.spd_add_rem_policy( # inbound, priority 15
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=15,
+ policy_type="discard",
+ ip_range=True,
+ local_ip_start=self.pg1.remote_ip6,
+ local_ip_stop=self.pg1.remote_ip6,
+ remote_ip_start=self.pg0.remote_ip6,
+ remote_ip_stop=self.pg0.remote_ip6,
+ )
+
+ # create output rule so we can capture forwarded packets
+ policy_2 = self.spd_add_rem_policy( # outbound, priority 10
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
+
+ # create the packet stream
+ packets = self.create_stream(
+ self.pg0, self.pg1, pkt_count, src_prt=500, dst_prt=500
+ )
+ # add the stream to the source interface
+ self.pg0.add_stream(packets)
+ self.pg1.enable_capture()
+ self.pg_start()
+
+ # check capture on pg1
+ capture = self.pg1.get_capture()
+ for packet in capture:
+ try:
+ self.logger.debug(ppp("SPD Add - Got packet:", packet))
+ except Exception:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+ self.logger.debug("SPD: Num packets: %s", len(capture.res))
+
+ # verify captured packets
+ self.verify_capture(self.pg0, self.pg1, capture)
+ # verify all policies matched the expected number of times
+ self.verify_policy_match(pkt_count, policy_0)
+ self.verify_policy_match(0, policy_1)
+ self.verify_policy_match(pkt_count, policy_2)
+
+
+class IPSec6SpdTestCaseDiscard(SpdFastPathIPv6Inbound):
+ """ IPSec/IPv6 inbound: Policy mode test case with fast path \
+ (add discard)"""
+
+ def test_ipsec_spd_inbound_discard(self):
+ # In this test case, packets in IPv6 FWD path are configured
+ # to go through IPSec inbound SPD policy lookup.
+ #
+ # Rule action is set to DISCARD.
+
+ self.create_interfaces(2)
+ pkt_count = 5
+
+ self.spd_create_and_intf_add(1, [self.pg1, self.pg0])
+
+ # create input rules
+ # bypass rule should take precedence over discard rule,
+ # even though it's lower priority
+ policy_0 = self.spd_add_rem_policy( # inbound, priority 10
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="discard",
+ )
+
+ # create output rule so we can capture forwarded packets
+ policy_1 = self.spd_add_rem_policy( # outbound, priority 10
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
+
+ # create the packet stream
+ packets = self.create_stream(
+ self.pg0, self.pg1, pkt_count, src_prt=500, dst_prt=500
+ )
+ # add the stream to the source interface
+ self.pg0.add_stream(packets)
+ self.pg1.enable_capture()
+ self.pg_start()
+
+ # check capture on pg1
+ capture = self.pg1.assert_nothing_captured()
+
+ # verify all policies matched the expected number of times
+ self.verify_policy_match(pkt_count, policy_0)
+ self.verify_policy_match(0, policy_1)
+
+
class IPSec6SpdTestCaseTunProtect(SpdFastPathIPv6InboundProtect):
"""IPSec/IPv6 inbound: Policy mode test case with fast path"""