summaryrefslogtreecommitdiffstats
path: root/test/test_ipsec_spd_flow_cache_input.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_ipsec_spd_flow_cache_input.py')
-rw-r--r--test/test_ipsec_spd_flow_cache_input.py371
1 files changed, 277 insertions, 94 deletions
diff --git a/test/test_ipsec_spd_flow_cache_input.py b/test/test_ipsec_spd_flow_cache_input.py
index 2d70d1540b8..02ecb5625ce 100644
--- a/test/test_ipsec_spd_flow_cache_input.py
+++ b/test/test_ipsec_spd_flow_cache_input.py
@@ -12,16 +12,14 @@ class SpdFlowCacheInbound(SpdFlowCacheTemplate):
@classmethod
def setUpConstants(cls):
super(SpdFlowCacheInbound, cls).setUpConstants()
- cls.vpp_cmdline.extend(["ipsec", "{",
- "ipv4-inbound-spd-flow-cache on",
- "}"])
- cls.logger.info("VPP modified cmdline is %s" % " "
- .join(cls.vpp_cmdline))
+ cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-inbound-spd-flow-cache on", "}"])
+ cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
class IPSec4SpdTestCaseBypass(SpdFlowCacheInbound):
""" IPSec/IPv4 inbound: Policy mode test case with flow cache \
(add bypass)"""
+
def test_ipsec_spd_inbound_bypass(self):
# In this test case, packets in IPv4 FWD path are configured
# to go through IPSec inbound SPD policy lookup.
@@ -42,16 +40,34 @@ class IPSec4SpdTestCaseBypass(SpdFlowCacheInbound):
# bypass rule should take precedence over discard rule,
# even though it's lower priority
policy_0 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
policy_1 = self.spd_add_rem_policy( # inbound, priority 15
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=15, policy_type="discard")
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=15,
+ policy_type="discard",
+ )
# create output rule so we can capture forwarded packets
policy_2 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
# check flow cache is empty before sending traffic
self.verify_num_inbound_flow_cache_entries(0)
@@ -85,6 +101,7 @@ class IPSec4SpdTestCaseBypass(SpdFlowCacheInbound):
class IPSec4SpdTestCaseDiscard(SpdFlowCacheInbound):
""" IPSec/IPv4 inbound: Policy mode test case with flow cache \
(add discard)"""
+
def test_ipsec_spd_inbound_discard(self):
# In this test case, packets in IPv4 FWD path are configured
# to go through IPSec inbound SPD policy lookup.
@@ -96,13 +113,25 @@ class IPSec4SpdTestCaseDiscard(SpdFlowCacheInbound):
# create input rule
policy_0 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="discard")
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="discard",
+ )
# create output rule so we can capture forwarded packets
policy_1 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
# check flow cache is empty before sending traffic
self.verify_num_inbound_flow_cache_entries(0)
@@ -124,6 +153,7 @@ class IPSec4SpdTestCaseDiscard(SpdFlowCacheInbound):
class IPSec4SpdTestCaseRemove(SpdFlowCacheInbound):
""" IPSec/IPv4 inbound: Policy mode test case with flow cache \
(remove bypass)"""
+
def test_ipsec_spd_inbound_remove(self):
# In this test case, packets in IPv4 FWD path are configured
# to go through IPSec inbound SPD policy lookup.
@@ -147,16 +177,34 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheInbound):
# bypass rule should take precedence over discard rule,
# even though it's lower priority
policy_0 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
policy_1 = self.spd_add_rem_policy( # inbound, priority 15
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=15, policy_type="discard")
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=15,
+ policy_type="discard",
+ )
# create output rule so we can capture forwarded packets
policy_2 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
# check flow cache is empty before sending traffic
self.verify_num_inbound_flow_cache_entries(0)
@@ -188,9 +236,15 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheInbound):
# remove the input bypass rule
self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass",
- remove=True)
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ remove=True,
+ )
# verify flow cache counter has been reset by rule removal
self.verify_num_inbound_flow_cache_entries(0)
@@ -213,6 +267,7 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheInbound):
class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound):
""" IPSec/IPv4 inbound: Policy mode test case with flow cache \
(add, remove, re-add bypass)"""
+
def test_ipsec_spd_inbound_readd(self):
# In this test case, packets in IPv4 FWD path are configured
# to go through IPSec inbound SPD policy lookup.
@@ -239,16 +294,34 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound):
# bypass rule should take precedence over discard rule,
# even though it's lower priority
policy_0 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
policy_1 = self.spd_add_rem_policy( # inbound, priority 15
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=15, policy_type="discard")
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=15,
+ policy_type="discard",
+ )
# create output rule so we can capture forwarded packets
policy_2 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
# check flow cache is empty before sending traffic
self.verify_num_inbound_flow_cache_entries(0)
@@ -280,9 +353,15 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound):
# remove the input bypass rule
self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass",
- remove=True)
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ remove=True,
+ )
# verify flow cache counter has been reset by rule removal
self.verify_num_inbound_flow_cache_entries(0)
@@ -303,8 +382,14 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound):
# readd the input bypass rule
policy_0 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
# verify flow cache counter has been reset by rule addition
self.verify_num_inbound_flow_cache_entries(0)
@@ -327,7 +412,7 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound):
# verify all policies matched the expected number of times
self.verify_policy_match(pkt_count, policy_0)
self.verify_policy_match(pkt_count, policy_1)
- self.verify_policy_match(pkt_count*2, policy_2)
+ self.verify_policy_match(pkt_count * 2, policy_2)
# by readding the bypass rule, we reset the flow cache
# we only expect the bypass rule to now be in the flow cache
self.verify_num_inbound_flow_cache_entries(1)
@@ -336,6 +421,7 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound):
class IPSec4SpdTestCaseMultiple(SpdFlowCacheInbound):
""" IPSec/IPv4 inbound: Policy mode test case with flow cache \
(multiple interfaces, multiple rules)"""
+
def test_ipsec_spd_inbound_multiple(self):
# In this test case, packets in IPv4 FWD path are configured to go
# through IPSec outbound SPD policy lookup.
@@ -353,23 +439,47 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheInbound):
# add input rules on all interfaces
# pg0 -> pg1
policy_0 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
# pg1 -> pg2
policy_1 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg2, self.pg1, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg2,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
# pg2 -> pg0
policy_2 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg0, self.pg2, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="discard")
+ 1,
+ self.pg0,
+ self.pg2,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="discard",
+ )
# create output rules covering the the full ip range
# 0.0.0.0 -> 255.255.255.255, so we can capture forwarded packets
policy_3 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg0, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass",
- all_ips=True)
+ 1,
+ self.pg0,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ all_ips=True,
+ )
# check flow cache is empty (0 active elements) before sending traffic
self.verify_num_inbound_flow_cache_entries(0)
@@ -396,8 +506,7 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheInbound):
try:
self.logger.debug(ppp("SPD Add - Got packet:", packet))
except Exception:
- self.logger.error(
- ppp("Unexpected or invalid packet:", packet))
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
# verify captures that matched BYPASS rules
@@ -416,6 +525,7 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheInbound):
class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound):
""" IPSec/IPv4 inbound: Policy mode test case with flow cache \
(overwrite stale entries)"""
+
def test_ipsec_spd_inbound_overwrite(self):
# The operation of the flow cache is setup so that the entire cache
# is invalidated when adding or removing an SPD policy rule.
@@ -436,23 +546,47 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound):
# add input rules on all interfaces
# pg0 -> pg1
policy_0 = self.spd_add_rem_policy( # inbound
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
# pg1 -> pg2
policy_1 = self.spd_add_rem_policy( # inbound
- 1, self.pg2, self.pg1, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg2,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
# pg2 -> pg0
policy_2 = self.spd_add_rem_policy( # inbound
- 1, self.pg0, self.pg2, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="discard")
+ 1,
+ self.pg0,
+ self.pg2,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="discard",
+ )
# create output rules covering the the full ip range
# 0.0.0.0 -> 255.255.255.255, so we can capture forwarded packets
policy_3 = self.spd_add_rem_policy( # outbound
- 1, self.pg0, self.pg0, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass",
- all_ips=True)
+ 1,
+ self.pg0,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ all_ips=True,
+ )
# check flow cache is empty (0 active elements) before sending traffic
self.verify_num_inbound_flow_cache_entries(0)
@@ -479,8 +613,7 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound):
try:
self.logger.debug(ppp("SPD Add - Got packet:", packet))
except Exception:
- self.logger.error(
- ppp("Unexpected or invalid packet:", packet))
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
# verify captures that matched BYPASS rules
@@ -497,22 +630,40 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound):
# adding an outbound policy should not invalidate output flow cache
self.spd_add_rem_policy( # outbound
- 1, self.pg0, self.pg0, socket.IPPROTO_UDP,
- is_out=1, priority=1, policy_type="bypass",
- all_ips=True)
+ 1,
+ self.pg0,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=1,
+ policy_type="bypass",
+ all_ips=True,
+ )
# check inbound flow cache counter has not been reset
self.verify_num_inbound_flow_cache_entries(3)
# remove + readd bypass policy - flow cache counter will be reset,
# and there will be 3x stale entries in flow cache
self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass",
- remove=True)
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ remove=True,
+ )
# readd policy
policy_0 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
# check counter was reset
self.verify_num_inbound_flow_cache_entries(0)
@@ -532,8 +683,7 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound):
try:
self.logger.debug(ppp("SPD Add - Got packet:", packet))
except Exception:
- self.logger.error(
- ppp("Unexpected or invalid packet:", packet))
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
# verify captures that matched BYPASS rules
@@ -543,8 +693,8 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound):
self.pg0.assert_nothing_captured()
# verify all policies matched the expected number of times
self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(pkt_count*2, policy_1)
- self.verify_policy_match(pkt_count*2, policy_2)
+ self.verify_policy_match(pkt_count * 2, policy_1)
+ self.verify_policy_match(pkt_count * 2, policy_2)
# we are overwriting 3x stale entries - check flow cache counter
# is correct
self.verify_num_inbound_flow_cache_entries(3)
@@ -553,18 +703,23 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound):
class IPSec4SpdTestCaseCollision(SpdFlowCacheInbound):
""" IPSec/IPv4 inbound: Policy mode test case with flow cache \
(hash collision)"""
+
# Override class setup to restrict hash table size to 16 buckets.
# This forces using only the lower 4 bits of the hash as a key,
# making hash collisions easy to find.
@classmethod
def setUpConstants(cls):
super(SpdFlowCacheInbound, cls).setUpConstants()
- cls.vpp_cmdline.extend(["ipsec", "{",
- "ipv4-inbound-spd-flow-cache on",
- "ipv4-inbound-spd-hash-buckets 16",
- "}"])
- cls.logger.info("VPP modified cmdline is %s" % " "
- .join(cls.vpp_cmdline))
+ cls.vpp_cmdline.extend(
+ [
+ "ipsec",
+ "{",
+ "ipv4-inbound-spd-flow-cache on",
+ "ipv4-inbound-spd-hash-buckets 16",
+ "}",
+ ]
+ )
+ cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
def test_ipsec_spd_inbound_collision(self):
# The flow cache operation is setup to overwrite an entry
@@ -588,20 +743,38 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheInbound):
# create output rules covering the the full ip range
# 0.0.0.0 -> 255.255.255.255, so we can capture forwarded packets
policy_0 = self.spd_add_rem_policy( # outbound
- 1, self.pg0, self.pg0, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass",
- all_ips=True)
+ 1,
+ self.pg0,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ all_ips=True,
+ )
capture_intfs = []
if self.crc32_supported(): # create crc32 collision on last 4 bits
hashed_with_crc32 = True
# add matching rules
policy_1 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg2, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg2,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
policy_2 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg3, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg3,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
# we expect to get captures on pg1 + pg3
capture_intfs.append(self.pg1)
@@ -623,11 +796,23 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheInbound):
hashed_with_crc32 = False
# add matching rules
policy_1 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg2, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg2,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
policy_2 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg2, self.pg3, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg2,
+ self.pg3,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
capture_intfs.append(self.pg1)
capture_intfs.append(self.pg2)
@@ -655,15 +840,13 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheInbound):
if_caps.append(pg.get_capture())
for packet in if_caps[-1]:
try:
- self.logger.debug(ppp(
- "SPD Add - Got packet:", packet))
+ self.logger.debug(ppp("SPD Add - Got packet:", packet))
except Exception:
- self.logger.error(ppp(
- "Unexpected or invalid packet:", packet))
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
# verify captures that matched BYPASS rule
- if(hashed_with_crc32):
+ if hashed_with_crc32:
self.verify_capture(self.pg2, self.pg1, if_caps[0])
self.verify_capture(self.pg0, self.pg3, if_caps[1])
else: # hashed with xxhash
@@ -673,11 +856,11 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheInbound):
# verify all policies matched the expected number of times
self.verify_policy_match(pkt_count, policy_1)
self.verify_policy_match(pkt_count, policy_2)
- self.verify_policy_match(pkt_count*2, policy_0) # output policy
+ self.verify_policy_match(pkt_count * 2, policy_0) # output policy
# we have matched 2 policies, but due to the hash collision
# one active entry is expected
self.verify_num_inbound_flow_cache_entries(1)
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)