summaryrefslogtreecommitdiffstats
path: root/test/test_ipsec_spd_flow_cache_output.py
diff options
context:
space:
mode:
authorKlement Sekera <klement.sekera@gmail.com>2022-04-26 19:02:15 +0200
committerOle Tr�an <otroan@employees.org>2022-05-10 18:52:08 +0000
commitd9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch)
tree4f786cfd8ebc2443cb11e11b74c8657204068898 /test/test_ipsec_spd_flow_cache_output.py
parentf90348bcb4afd0af2611cefc43b17ef3042b511c (diff)
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera <klement.sekera@gmail.com> Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/test_ipsec_spd_flow_cache_output.py')
-rw-r--r--test/test_ipsec_spd_flow_cache_output.py319
1 files changed, 240 insertions, 79 deletions
diff --git a/test/test_ipsec_spd_flow_cache_output.py b/test/test_ipsec_spd_flow_cache_output.py
index 54571c6741a..9852b375a82 100644
--- a/test/test_ipsec_spd_flow_cache_output.py
+++ b/test/test_ipsec_spd_flow_cache_output.py
@@ -11,16 +11,14 @@ class SpdFlowCacheOutbound(SpdFlowCacheTemplate):
@classmethod
def setUpConstants(cls):
super(SpdFlowCacheOutbound, cls).setUpConstants()
- cls.vpp_cmdline.extend(["ipsec", "{",
- "ipv4-outbound-spd-flow-cache on",
- "}"])
- cls.logger.info("VPP modified cmdline is %s" % " "
- .join(cls.vpp_cmdline))
+ cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-outbound-spd-flow-cache on", "}"])
+ cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
class IPSec4SpdTestCaseAdd(SpdFlowCacheOutbound):
""" IPSec/IPv4 outbound: Policy mode test case with flow cache \
(add rule)"""
+
def test_ipsec_spd_outbound_add(self):
# In this test case, packets in IPv4 FWD path are configured
# to go through IPSec outbound SPD policy lookup.
@@ -33,11 +31,23 @@ class IPSec4SpdTestCaseAdd(SpdFlowCacheOutbound):
pkt_count = 5
self.spd_create_and_intf_add(1, [self.pg1])
policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
policy_1 = self.spd_add_rem_policy( # outbound, priority 5
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=5, policy_type="discard")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=5,
+ policy_type="discard",
+ )
# check flow cache is empty before sending traffic
self.verify_num_outbound_flow_cache_entries(0)
@@ -75,6 +85,7 @@ class IPSec4SpdTestCaseAdd(SpdFlowCacheOutbound):
class IPSec4SpdTestCaseRemove(SpdFlowCacheOutbound):
""" IPSec/IPv4 outbound: Policy mode test case with flow cache \
(remove rule)"""
+
def test_ipsec_spd_outbound_remove(self):
# In this test case, packets in IPv4 FWD path are configured
# to go through IPSec outbound SPD policy lookup.
@@ -88,11 +99,23 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheOutbound):
pkt_count = 5
self.spd_create_and_intf_add(1, [self.pg1])
policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
policy_1 = self.spd_add_rem_policy( # outbound, priority 5
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=5, policy_type="discard")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=5,
+ policy_type="discard",
+ )
# check flow cache is empty before sending traffic
self.verify_num_outbound_flow_cache_entries(0)
@@ -128,9 +151,15 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheOutbound):
# now remove the bypass rule
self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass",
- remove=True)
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ remove=True,
+ )
# verify flow cache counter has been reset by rule removal
self.verify_num_outbound_flow_cache_entries(0)
@@ -154,6 +183,7 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheOutbound):
class IPSec4SpdTestCaseReadd(SpdFlowCacheOutbound):
""" IPSec/IPv4 outbound: Policy mode test case with flow cache \
(add, remove, re-add)"""
+
def test_ipsec_spd_outbound_readd(self):
# In this test case, packets in IPv4 FWD path are configured
# to go through IPSec outbound SPD policy lookup.
@@ -172,11 +202,23 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheOutbound):
pkt_count = 5
self.spd_create_and_intf_add(1, [self.pg1])
policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
policy_1 = self.spd_add_rem_policy( # outbound, priority 5
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=5, policy_type="discard")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=5,
+ policy_type="discard",
+ )
# check flow cache is empty before sending traffic
self.verify_num_outbound_flow_cache_entries(0)
@@ -212,9 +254,15 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheOutbound):
# now remove the bypass rule, leaving only the discard rule
self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass",
- remove=True)
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ remove=True,
+ )
# verify flow cache counter has been reset by rule removal
self.verify_num_outbound_flow_cache_entries(0)
@@ -236,8 +284,14 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheOutbound):
# now readd the bypass rule
policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
# verify flow cache counter has been reset by rule addition
self.verify_num_outbound_flow_cache_entries(0)
@@ -271,6 +325,7 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheOutbound):
class IPSec4SpdTestCaseMultiple(SpdFlowCacheOutbound):
""" IPSec/IPv4 outbound: Policy mode test case with flow cache \
(multiple interfaces, multiple rules)"""
+
def test_ipsec_spd_outbound_multiple(self):
# In this test case, packets in IPv4 FWD path are configured to go
# through IPSec outbound SPD policy lookup.
@@ -286,32 +341,75 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheOutbound):
self.spd_create_and_intf_add(1, self.pg_interfaces)
# add rules on all interfaces
policy_01 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
policy_02 = self.spd_add_rem_policy( # outbound, priority 5
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=5, policy_type="discard")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=5,
+ policy_type="discard",
+ )
policy_11 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg1, self.pg2, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg2,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
policy_12 = self.spd_add_rem_policy( # outbound, priority 5
- 1, self.pg1, self.pg2, socket.IPPROTO_UDP,
- is_out=1, priority=5, policy_type="discard")
+ 1,
+ self.pg1,
+ self.pg2,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=5,
+ policy_type="discard",
+ )
policy_21 = self.spd_add_rem_policy( # outbound, priority 5
- 1, self.pg2, self.pg0, socket.IPPROTO_UDP,
- is_out=1, priority=5, policy_type="bypass")
+ 1,
+ self.pg2,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=5,
+ policy_type="bypass",
+ )
policy_22 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg2, self.pg0, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="discard")
+ 1,
+ self.pg2,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="discard",
+ )
# interfaces bound to an SPD, will by default drop inbound
# traffic with no matching policies. add catch-all inbound
# bypass rule to SPD:
self.spd_add_rem_policy( # inbound, all interfaces
- 1, None, None, socket.IPPROTO_UDP, is_out=0, priority=10,
- policy_type="bypass", all_ips=True)
+ 1,
+ None,
+ None,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ all_ips=True,
+ )
# check flow cache is empty (0 active elements) before sending traffic
self.verify_num_outbound_flow_cache_entries(0)
@@ -338,8 +436,7 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheOutbound):
try:
self.logger.debug(ppp("SPD - Got packet:", packet))
except Exception:
- self.logger.error(
- ppp("Unexpected or invalid packet:", packet))
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res))
self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res))
@@ -366,6 +463,7 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheOutbound):
class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound):
""" IPSec/IPv4 outbound: Policy mode test case with flow cache \
(overwrite stale entries)"""
+
def test_ipsec_spd_outbound_overwrite(self):
# The operation of the flow cache is setup so that the entire cache
# is invalidated when adding or removing an SPD policy rule.
@@ -386,23 +484,48 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound):
# add output rules on all interfaces
# pg0 -> pg1
policy_0 = self.spd_add_rem_policy( # outbound
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
# pg1 -> pg2
policy_1 = self.spd_add_rem_policy( # outbound
- 1, self.pg1, self.pg2, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg2,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
# pg2 -> pg0
policy_2 = self.spd_add_rem_policy( # outbound
- 1, self.pg2, self.pg0, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="discard")
+ 1,
+ self.pg2,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="discard",
+ )
# interfaces bound to an SPD, will by default drop inbound
# traffic with no matching policies. add catch-all inbound
# bypass rule to SPD:
self.spd_add_rem_policy( # inbound, all interfaces
- 1, None, None, socket.IPPROTO_UDP, is_out=0, priority=10,
- policy_type="bypass", all_ips=True)
+ 1,
+ None,
+ None,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ all_ips=True,
+ )
# check flow cache is empty (0 active elements) before sending traffic
self.verify_num_outbound_flow_cache_entries(0)
@@ -429,8 +552,7 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound):
try:
self.logger.debug(ppp("SPD Add - Got packet:", packet))
except Exception:
- self.logger.error(
- ppp("Unexpected or invalid packet:", packet))
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
# verify captures that matched BYPASS rules
@@ -447,21 +569,39 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound):
# adding an inbound policy should not invalidate output flow cache
self.spd_add_rem_policy( # inbound
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
# check flow cache counter has not been reset
self.verify_num_outbound_flow_cache_entries(3)
# remove a bypass policy - flow cache counter will be reset, and
# there will be 3x stale entries in flow cache
self.spd_add_rem_policy( # outbound
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass",
- remove=True)
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ remove=True,
+ )
# readd policy
policy_0 = self.spd_add_rem_policy( # outbound
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
# check counter was reset with flow cache invalidation
self.verify_num_outbound_flow_cache_entries(0)
@@ -481,8 +621,7 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound):
try:
self.logger.debug(ppp("SPD Add - Got packet:", packet))
except Exception:
- self.logger.error(
- ppp("Unexpected or invalid packet:", packet))
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
# verify captures that matched BYPASS rules
@@ -492,8 +631,8 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound):
self.pg0.assert_nothing_captured()
# verify all policies matched the expected number of times
self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(pkt_count*2, policy_1)
- self.verify_policy_match(pkt_count*2, policy_2)
+ self.verify_policy_match(pkt_count * 2, policy_1)
+ self.verify_policy_match(pkt_count * 2, policy_2)
# we are overwriting 3x stale entries - check flow cache counter
# is correct
self.verify_num_outbound_flow_cache_entries(3)
@@ -502,18 +641,23 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound):
class IPSec4SpdTestCaseCollision(SpdFlowCacheOutbound):
""" IPSec/IPv4 outbound: Policy mode test case with flow cache \
(hash collision)"""
+
# Override class setup to restrict vector size to 16 elements.
# This forces using only the lower 4 bits of the hash as a key,
# making hash collisions easy to find.
@classmethod
def setUpConstants(cls):
super(SpdFlowCacheOutbound, cls).setUpConstants()
- cls.vpp_cmdline.extend(["ipsec", "{",
- "ipv4-outbound-spd-flow-cache on",
- "ipv4-outbound-spd-hash-buckets 16",
- "}"])
- cls.logger.info("VPP modified cmdline is %s" % " "
- .join(cls.vpp_cmdline))
+ cls.vpp_cmdline.extend(
+ [
+ "ipsec",
+ "{",
+ "ipv4-outbound-spd-flow-cache on",
+ "ipv4-outbound-spd-hash-buckets 16",
+ "}",
+ ]
+ )
+ cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
def test_ipsec_spd_outbound_collision(self):
# The flow cache operation is setup to overwrite an entry
@@ -535,18 +679,37 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheOutbound):
self.spd_create_and_intf_add(1, self.pg_interfaces)
# add rules
policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg1, self.pg2, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg2,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
policy_1 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg2, self.pg0, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg2,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
# interfaces bound to an SPD, will by default drop inbound
# traffic with no matching policies. add catch-all inbound
# bypass rule to SPD:
self.spd_add_rem_policy( # inbound, all interfaces
- 1, None, None, socket.IPPROTO_UDP, is_out=0, priority=10,
- policy_type="bypass", all_ips=True)
+ 1,
+ None,
+ None,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ all_ips=True,
+ )
# check flow cache is empty (0 active elements) before sending traffic
self.verify_num_outbound_flow_cache_entries(0)
@@ -580,11 +743,9 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheOutbound):
if_caps.append(pg.get_capture())
for packet in if_caps[-1]:
try:
- self.logger.debug(ppp(
- "SPD - Got packet:", packet))
+ self.logger.debug(ppp("SPD - Got packet:", packet))
except Exception:
- self.logger.error(ppp(
- "Unexpected or invalid packet:", packet))
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res))
self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res))
@@ -600,5 +761,5 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheOutbound):
self.verify_num_outbound_flow_cache_entries(1)
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)