summaryrefslogtreecommitdiffstats
path: root/test/test_ipsec_spd_flow_cache_output.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_ipsec_spd_flow_cache_output.py')
-rw-r--r--test/test_ipsec_spd_flow_cache_output.py319
1 files changed, 240 insertions, 79 deletions
diff --git a/test/test_ipsec_spd_flow_cache_output.py b/test/test_ipsec_spd_flow_cache_output.py
index 54571c6741a..9852b375a82 100644
--- a/test/test_ipsec_spd_flow_cache_output.py
+++ b/test/test_ipsec_spd_flow_cache_output.py
@@ -11,16 +11,14 @@ class SpdFlowCacheOutbound(SpdFlowCacheTemplate):
@classmethod
def setUpConstants(cls):
super(SpdFlowCacheOutbound, cls).setUpConstants()
- cls.vpp_cmdline.extend(["ipsec", "{",
- "ipv4-outbound-spd-flow-cache on",
- "}"])
- cls.logger.info("VPP modified cmdline is %s" % " "
- .join(cls.vpp_cmdline))
+ cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-outbound-spd-flow-cache on", "}"])
+ cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
class IPSec4SpdTestCaseAdd(SpdFlowCacheOutbound):
""" IPSec/IPv4 outbound: Policy mode test case with flow cache \
(add rule)"""
+
def test_ipsec_spd_outbound_add(self):
# In this test case, packets in IPv4 FWD path are configured
# to go through IPSec outbound SPD policy lookup.
@@ -33,11 +31,23 @@ class IPSec4SpdTestCaseAdd(SpdFlowCacheOutbound):
pkt_count = 5
self.spd_create_and_intf_add(1, [self.pg1])
policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
policy_1 = self.spd_add_rem_policy( # outbound, priority 5
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=5, policy_type="discard")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=5,
+ policy_type="discard",
+ )
# check flow cache is empty before sending traffic
self.verify_num_outbound_flow_cache_entries(0)
@@ -75,6 +85,7 @@ class IPSec4SpdTestCaseAdd(SpdFlowCacheOutbound):
class IPSec4SpdTestCaseRemove(SpdFlowCacheOutbound):
""" IPSec/IPv4 outbound: Policy mode test case with flow cache \
(remove rule)"""
+
def test_ipsec_spd_outbound_remove(self):
# In this test case, packets in IPv4 FWD path are configured
# to go through IPSec outbound SPD policy lookup.
@@ -88,11 +99,23 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheOutbound):
pkt_count = 5
self.spd_create_and_intf_add(1, [self.pg1])
policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
policy_1 = self.spd_add_rem_policy( # outbound, priority 5
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=5, policy_type="discard")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=5,
+ policy_type="discard",
+ )
# check flow cache is empty before sending traffic
self.verify_num_outbound_flow_cache_entries(0)
@@ -128,9 +151,15 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheOutbound):
# now remove the bypass rule
self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass",
- remove=True)
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ remove=True,
+ )
# verify flow cache counter has been reset by rule removal
self.verify_num_outbound_flow_cache_entries(0)
@@ -154,6 +183,7 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheOutbound):
class IPSec4SpdTestCaseReadd(SpdFlowCacheOutbound):
""" IPSec/IPv4 outbound: Policy mode test case with flow cache \
(add, remove, re-add)"""
+
def test_ipsec_spd_outbound_readd(self):
# In this test case, packets in IPv4 FWD path are configured
# to go through IPSec outbound SPD policy lookup.
@@ -172,11 +202,23 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheOutbound):
pkt_count = 5
self.spd_create_and_intf_add(1, [self.pg1])
policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
policy_1 = self.spd_add_rem_policy( # outbound, priority 5
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=5, policy_type="discard")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=5,
+ policy_type="discard",
+ )
# check flow cache is empty before sending traffic
self.verify_num_outbound_flow_cache_entries(0)
@@ -212,9 +254,15 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheOutbound):
# now remove the bypass rule, leaving only the discard rule
self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass",
- remove=True)
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ remove=True,
+ )
# verify flow cache counter has been reset by rule removal
self.verify_num_outbound_flow_cache_entries(0)
@@ -236,8 +284,14 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheOutbound):
# now readd the bypass rule
policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
# verify flow cache counter has been reset by rule addition
self.verify_num_outbound_flow_cache_entries(0)
@@ -271,6 +325,7 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheOutbound):
class IPSec4SpdTestCaseMultiple(SpdFlowCacheOutbound):
""" IPSec/IPv4 outbound: Policy mode test case with flow cache \
(multiple interfaces, multiple rules)"""
+
def test_ipsec_spd_outbound_multiple(self):
# In this test case, packets in IPv4 FWD path are configured to go
# through IPSec outbound SPD policy lookup.
@@ -286,32 +341,75 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheOutbound):
self.spd_create_and_intf_add(1, self.pg_interfaces)
# add rules on all interfaces
policy_01 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
policy_02 = self.spd_add_rem_policy( # outbound, priority 5
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=5, policy_type="discard")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=5,
+ policy_type="discard",
+ )
policy_11 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg1, self.pg2, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg2,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
policy_12 = self.spd_add_rem_policy( # outbound, priority 5
- 1, self.pg1, self.pg2, socket.IPPROTO_UDP,
- is_out=1, priority=5, policy_type="discard")
+ 1,
+ self.pg1,
+ self.pg2,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=5,
+ policy_type="discard",
+ )
policy_21 = self.spd_add_rem_policy( # outbound, priority 5
- 1, self.pg2, self.pg0, socket.IPPROTO_UDP,
- is_out=1, priority=5, policy_type="bypass")
+ 1,
+ self.pg2,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=5,
+ policy_type="bypass",
+ )
policy_22 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg2, self.pg0, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="discard")
+ 1,
+ self.pg2,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="discard",
+ )
# interfaces bound to an SPD, will by default drop inbound
# traffic with no matching policies. add catch-all inbound
# bypass rule to SPD:
self.spd_add_rem_policy( # inbound, all interfaces
- 1, None, None, socket.IPPROTO_UDP, is_out=0, priority=10,
- policy_type="bypass", all_ips=True)
+ 1,
+ None,
+ None,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ all_ips=True,
+ )
# check flow cache is empty (0 active elements) before sending traffic
self.verify_num_outbound_flow_cache_entries(0)
@@ -338,8 +436,7 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheOutbound):
try:
self.logger.debug(ppp("SPD - Got packet:", packet))
except Exception:
- self.logger.error(
- ppp("Unexpected or invalid packet:", packet))
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res))
self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res))
@@ -366,6 +463,7 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheOutbound):
class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound):
""" IPSec/IPv4 outbound: Policy mode test case with flow cache \
(overwrite stale entries)"""
+
def test_ipsec_spd_outbound_overwrite(self):
# The operation of the flow cache is setup so that the entire cache
# is invalidated when adding or removing an SPD policy rule.
@@ -386,23 +484,48 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound):
# add output rules on all interfaces
# pg0 -> pg1
policy_0 = self.spd_add_rem_policy( # outbound
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
# pg1 -> pg2
policy_1 = self.spd_add_rem_policy( # outbound
- 1, self.pg1, self.pg2, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg2,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
# pg2 -> pg0
policy_2 = self.spd_add_rem_policy( # outbound
- 1, self.pg2, self.pg0, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="discard")
+ 1,
+ self.pg2,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="discard",
+ )
# interfaces bound to an SPD, will by default drop inbound
# traffic with no matching policies. add catch-all inbound
# bypass rule to SPD:
self.spd_add_rem_policy( # inbound, all interfaces
- 1, None, None, socket.IPPROTO_UDP, is_out=0, priority=10,
- policy_type="bypass", all_ips=True)
+ 1,
+ None,
+ None,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ all_ips=True,
+ )
# check flow cache is empty (0 active elements) before sending traffic
self.verify_num_outbound_flow_cache_entries(0)
@@ -429,8 +552,7 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound):
try:
self.logger.debug(ppp("SPD Add - Got packet:", packet))
except Exception:
- self.logger.error(
- ppp("Unexpected or invalid packet:", packet))
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
# verify captures that matched BYPASS rules
@@ -447,21 +569,39 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound):
# adding an inbound policy should not invalidate output flow cache
self.spd_add_rem_policy( # inbound
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
# check flow cache counter has not been reset
self.verify_num_outbound_flow_cache_entries(3)
# remove a bypass policy - flow cache counter will be reset, and
# there will be 3x stale entries in flow cache
self.spd_add_rem_policy( # outbound
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass",
- remove=True)
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ remove=True,
+ )
# readd policy
policy_0 = self.spd_add_rem_policy( # outbound
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
# check counter was reset with flow cache invalidation
self.verify_num_outbound_flow_cache_entries(0)
@@ -481,8 +621,7 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound):
try:
self.logger.debug(ppp("SPD Add - Got packet:", packet))
except Exception:
- self.logger.error(
- ppp("Unexpected or invalid packet:", packet))
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
# verify captures that matched BYPASS rules
@@ -492,8 +631,8 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound):
self.pg0.assert_nothing_captured()
# verify all policies matched the expected number of times
self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(pkt_count*2, policy_1)
- self.verify_policy_match(pkt_count*2, policy_2)
+ self.verify_policy_match(pkt_count * 2, policy_1)
+ self.verify_policy_match(pkt_count * 2, policy_2)
# we are overwriting 3x stale entries - check flow cache counter
# is correct
self.verify_num_outbound_flow_cache_entries(3)
@@ -502,18 +641,23 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound):
class IPSec4SpdTestCaseCollision(SpdFlowCacheOutbound):
""" IPSec/IPv4 outbound: Policy mode test case with flow cache \
(hash collision)"""
+
# Override class setup to restrict vector size to 16 elements.
# This forces using only the lower 4 bits of the hash as a key,
# making hash collisions easy to find.
@classmethod
def setUpConstants(cls):
super(SpdFlowCacheOutbound, cls).setUpConstants()
- cls.vpp_cmdline.extend(["ipsec", "{",
- "ipv4-outbound-spd-flow-cache on",
- "ipv4-outbound-spd-hash-buckets 16",
- "}"])
- cls.logger.info("VPP modified cmdline is %s" % " "
- .join(cls.vpp_cmdline))
+ cls.vpp_cmdline.extend(
+ [
+ "ipsec",
+ "{",
+ "ipv4-outbound-spd-flow-cache on",
+ "ipv4-outbound-spd-hash-buckets 16",
+ "}",
+ ]
+ )
+ cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
def test_ipsec_spd_outbound_collision(self):
# The flow cache operation is setup to overwrite an entry
@@ -535,18 +679,37 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheOutbound):
self.spd_create_and_intf_add(1, self.pg_interfaces)
# add rules
policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg1, self.pg2, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg2,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
policy_1 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg2, self.pg0, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg2,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
# interfaces bound to an SPD, will by default drop inbound
# traffic with no matching policies. add catch-all inbound
# bypass rule to SPD:
self.spd_add_rem_policy( # inbound, all interfaces
- 1, None, None, socket.IPPROTO_UDP, is_out=0, priority=10,
- policy_type="bypass", all_ips=True)
+ 1,
+ None,
+ None,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ all_ips=True,
+ )
# check flow cache is empty (0 active elements) before sending traffic
self.verify_num_outbound_flow_cache_entries(0)
@@ -580,11 +743,9 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheOutbound):
if_caps.append(pg.get_capture())
for packet in if_caps[-1]:
try:
- self.logger.debug(ppp(
- "SPD - Got packet:", packet))
+ self.logger.debug(ppp("SPD - Got packet:", packet))
except Exception:
- self.logger.error(ppp(
- "Unexpected or invalid packet:", packet))
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res))
self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res))
@@ -600,5 +761,5 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheOutbound):
self.verify_num_outbound_flow_cache_entries(1)
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)