summaryrefslogtreecommitdiffstats
path: root/test/test_ipsec_spd_flow_cache_input.py
diff options
context:
space:
mode:
authorKlement Sekera <klement.sekera@gmail.com>2022-04-26 19:02:15 +0200
committerOle Tr�an <otroan@employees.org>2022-05-10 18:52:08 +0000
commitd9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch)
tree4f786cfd8ebc2443cb11e11b74c8657204068898 /test/test_ipsec_spd_flow_cache_input.py
parentf90348bcb4afd0af2611cefc43b17ef3042b511c (diff)
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera <klement.sekera@gmail.com> Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/test_ipsec_spd_flow_cache_input.py')
-rw-r--r--test/test_ipsec_spd_flow_cache_input.py371
1 files changed, 277 insertions, 94 deletions
diff --git a/test/test_ipsec_spd_flow_cache_input.py b/test/test_ipsec_spd_flow_cache_input.py
index 2d70d1540b8..02ecb5625ce 100644
--- a/test/test_ipsec_spd_flow_cache_input.py
+++ b/test/test_ipsec_spd_flow_cache_input.py
@@ -12,16 +12,14 @@ class SpdFlowCacheInbound(SpdFlowCacheTemplate):
@classmethod
def setUpConstants(cls):
super(SpdFlowCacheInbound, cls).setUpConstants()
- cls.vpp_cmdline.extend(["ipsec", "{",
- "ipv4-inbound-spd-flow-cache on",
- "}"])
- cls.logger.info("VPP modified cmdline is %s" % " "
- .join(cls.vpp_cmdline))
+ cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-inbound-spd-flow-cache on", "}"])
+ cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
class IPSec4SpdTestCaseBypass(SpdFlowCacheInbound):
""" IPSec/IPv4 inbound: Policy mode test case with flow cache \
(add bypass)"""
+
def test_ipsec_spd_inbound_bypass(self):
# In this test case, packets in IPv4 FWD path are configured
# to go through IPSec inbound SPD policy lookup.
@@ -42,16 +40,34 @@ class IPSec4SpdTestCaseBypass(SpdFlowCacheInbound):
# bypass rule should take precedence over discard rule,
# even though it's lower priority
policy_0 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
policy_1 = self.spd_add_rem_policy( # inbound, priority 15
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=15, policy_type="discard")
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=15,
+ policy_type="discard",
+ )
# create output rule so we can capture forwarded packets
policy_2 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
# check flow cache is empty before sending traffic
self.verify_num_inbound_flow_cache_entries(0)
@@ -85,6 +101,7 @@ class IPSec4SpdTestCaseBypass(SpdFlowCacheInbound):
class IPSec4SpdTestCaseDiscard(SpdFlowCacheInbound):
""" IPSec/IPv4 inbound: Policy mode test case with flow cache \
(add discard)"""
+
def test_ipsec_spd_inbound_discard(self):
# In this test case, packets in IPv4 FWD path are configured
# to go through IPSec inbound SPD policy lookup.
@@ -96,13 +113,25 @@ class IPSec4SpdTestCaseDiscard(SpdFlowCacheInbound):
# create input rule
policy_0 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="discard")
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="discard",
+ )
# create output rule so we can capture forwarded packets
policy_1 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
# check flow cache is empty before sending traffic
self.verify_num_inbound_flow_cache_entries(0)
@@ -124,6 +153,7 @@ class IPSec4SpdTestCaseDiscard(SpdFlowCacheInbound):
class IPSec4SpdTestCaseRemove(SpdFlowCacheInbound):
""" IPSec/IPv4 inbound: Policy mode test case with flow cache \
(remove bypass)"""
+
def test_ipsec_spd_inbound_remove(self):
# In this test case, packets in IPv4 FWD path are configured
# to go through IPSec inbound SPD policy lookup.
@@ -147,16 +177,34 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheInbound):
# bypass rule should take precedence over discard rule,
# even though it's lower priority
policy_0 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
policy_1 = self.spd_add_rem_policy( # inbound, priority 15
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=15, policy_type="discard")
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=15,
+ policy_type="discard",
+ )
# create output rule so we can capture forwarded packets
policy_2 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
# check flow cache is empty before sending traffic
self.verify_num_inbound_flow_cache_entries(0)
@@ -188,9 +236,15 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheInbound):
# remove the input bypass rule
self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass",
- remove=True)
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ remove=True,
+ )
# verify flow cache counter has been reset by rule removal
self.verify_num_inbound_flow_cache_entries(0)
@@ -213,6 +267,7 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheInbound):
class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound):
""" IPSec/IPv4 inbound: Policy mode test case with flow cache \
(add, remove, re-add bypass)"""
+
def test_ipsec_spd_inbound_readd(self):
# In this test case, packets in IPv4 FWD path are configured
# to go through IPSec inbound SPD policy lookup.
@@ -239,16 +294,34 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound):
# bypass rule should take precedence over discard rule,
# even though it's lower priority
policy_0 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
policy_1 = self.spd_add_rem_policy( # inbound, priority 15
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=15, policy_type="discard")
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=15,
+ policy_type="discard",
+ )
# create output rule so we can capture forwarded packets
policy_2 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg1, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass")
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
# check flow cache is empty before sending traffic
self.verify_num_inbound_flow_cache_entries(0)
@@ -280,9 +353,15 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound):
# remove the input bypass rule
self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass",
- remove=True)
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ remove=True,
+ )
# verify flow cache counter has been reset by rule removal
self.verify_num_inbound_flow_cache_entries(0)
@@ -303,8 +382,14 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound):
# readd the input bypass rule
policy_0 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
# verify flow cache counter has been reset by rule addition
self.verify_num_inbound_flow_cache_entries(0)
@@ -327,7 +412,7 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound):
# verify all policies matched the expected number of times
self.verify_policy_match(pkt_count, policy_0)
self.verify_policy_match(pkt_count, policy_1)
- self.verify_policy_match(pkt_count*2, policy_2)
+ self.verify_policy_match(pkt_count * 2, policy_2)
# by readding the bypass rule, we reset the flow cache
# we only expect the bypass rule to now be in the flow cache
self.verify_num_inbound_flow_cache_entries(1)
@@ -336,6 +421,7 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound):
class IPSec4SpdTestCaseMultiple(SpdFlowCacheInbound):
""" IPSec/IPv4 inbound: Policy mode test case with flow cache \
(multiple interfaces, multiple rules)"""
+
def test_ipsec_spd_inbound_multiple(self):
# In this test case, packets in IPv4 FWD path are configured to go
# through IPSec outbound SPD policy lookup.
@@ -353,23 +439,47 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheInbound):
# add input rules on all interfaces
# pg0 -> pg1
policy_0 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
# pg1 -> pg2
policy_1 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg2, self.pg1, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg2,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
# pg2 -> pg0
policy_2 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg0, self.pg2, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="discard")
+ 1,
+ self.pg0,
+ self.pg2,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="discard",
+ )
# create output rules covering the the full ip range
# 0.0.0.0 -> 255.255.255.255, so we can capture forwarded packets
policy_3 = self.spd_add_rem_policy( # outbound, priority 10
- 1, self.pg0, self.pg0, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass",
- all_ips=True)
+ 1,
+ self.pg0,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ all_ips=True,
+ )
# check flow cache is empty (0 active elements) before sending traffic
self.verify_num_inbound_flow_cache_entries(0)
@@ -396,8 +506,7 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheInbound):
try:
self.logger.debug(ppp("SPD Add - Got packet:", packet))
except Exception:
- self.logger.error(
- ppp("Unexpected or invalid packet:", packet))
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
# verify captures that matched BYPASS rules
@@ -416,6 +525,7 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheInbound):
class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound):
""" IPSec/IPv4 inbound: Policy mode test case with flow cache \
(overwrite stale entries)"""
+
def test_ipsec_spd_inbound_overwrite(self):
# The operation of the flow cache is setup so that the entire cache
# is invalidated when adding or removing an SPD policy rule.
@@ -436,23 +546,47 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound):
# add input rules on all interfaces
# pg0 -> pg1
policy_0 = self.spd_add_rem_policy( # inbound
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
# pg1 -> pg2
policy_1 = self.spd_add_rem_policy( # inbound
- 1, self.pg2, self.pg1, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg2,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
# pg2 -> pg0
policy_2 = self.spd_add_rem_policy( # inbound
- 1, self.pg0, self.pg2, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="discard")
+ 1,
+ self.pg0,
+ self.pg2,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="discard",
+ )
# create output rules covering the the full ip range
# 0.0.0.0 -> 255.255.255.255, so we can capture forwarded packets
policy_3 = self.spd_add_rem_policy( # outbound
- 1, self.pg0, self.pg0, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass",
- all_ips=True)
+ 1,
+ self.pg0,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ all_ips=True,
+ )
# check flow cache is empty (0 active elements) before sending traffic
self.verify_num_inbound_flow_cache_entries(0)
@@ -479,8 +613,7 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound):
try:
self.logger.debug(ppp("SPD Add - Got packet:", packet))
except Exception:
- self.logger.error(
- ppp("Unexpected or invalid packet:", packet))
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
# verify captures that matched BYPASS rules
@@ -497,22 +630,40 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound):
# adding an outbound policy should not invalidate output flow cache
self.spd_add_rem_policy( # outbound
- 1, self.pg0, self.pg0, socket.IPPROTO_UDP,
- is_out=1, priority=1, policy_type="bypass",
- all_ips=True)
+ 1,
+ self.pg0,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=1,
+ policy_type="bypass",
+ all_ips=True,
+ )
# check inbound flow cache counter has not been reset
self.verify_num_inbound_flow_cache_entries(3)
# remove + readd bypass policy - flow cache counter will be reset,
# and there will be 3x stale entries in flow cache
self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass",
- remove=True)
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ remove=True,
+ )
# readd policy
policy_0 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
# check counter was reset
self.verify_num_inbound_flow_cache_entries(0)
@@ -532,8 +683,7 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound):
try:
self.logger.debug(ppp("SPD Add - Got packet:", packet))
except Exception:
- self.logger.error(
- ppp("Unexpected or invalid packet:", packet))
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
# verify captures that matched BYPASS rules
@@ -543,8 +693,8 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound):
self.pg0.assert_nothing_captured()
# verify all policies matched the expected number of times
self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(pkt_count*2, policy_1)
- self.verify_policy_match(pkt_count*2, policy_2)
+ self.verify_policy_match(pkt_count * 2, policy_1)
+ self.verify_policy_match(pkt_count * 2, policy_2)
# we are overwriting 3x stale entries - check flow cache counter
# is correct
self.verify_num_inbound_flow_cache_entries(3)
@@ -553,18 +703,23 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound):
class IPSec4SpdTestCaseCollision(SpdFlowCacheInbound):
""" IPSec/IPv4 inbound: Policy mode test case with flow cache \
(hash collision)"""
+
# Override class setup to restrict hash table size to 16 buckets.
# This forces using only the lower 4 bits of the hash as a key,
# making hash collisions easy to find.
@classmethod
def setUpConstants(cls):
super(SpdFlowCacheInbound, cls).setUpConstants()
- cls.vpp_cmdline.extend(["ipsec", "{",
- "ipv4-inbound-spd-flow-cache on",
- "ipv4-inbound-spd-hash-buckets 16",
- "}"])
- cls.logger.info("VPP modified cmdline is %s" % " "
- .join(cls.vpp_cmdline))
+ cls.vpp_cmdline.extend(
+ [
+ "ipsec",
+ "{",
+ "ipv4-inbound-spd-flow-cache on",
+ "ipv4-inbound-spd-hash-buckets 16",
+ "}",
+ ]
+ )
+ cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
def test_ipsec_spd_inbound_collision(self):
# The flow cache operation is setup to overwrite an entry
@@ -588,20 +743,38 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheInbound):
# create output rules covering the the full ip range
# 0.0.0.0 -> 255.255.255.255, so we can capture forwarded packets
policy_0 = self.spd_add_rem_policy( # outbound
- 1, self.pg0, self.pg0, socket.IPPROTO_UDP,
- is_out=1, priority=10, policy_type="bypass",
- all_ips=True)
+ 1,
+ self.pg0,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ all_ips=True,
+ )
capture_intfs = []
if self.crc32_supported(): # create crc32 collision on last 4 bits
hashed_with_crc32 = True
# add matching rules
policy_1 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg2, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg2,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
policy_2 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg3, self.pg0, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg3,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
# we expect to get captures on pg1 + pg3
capture_intfs.append(self.pg1)
@@ -623,11 +796,23 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheInbound):
hashed_with_crc32 = False
# add matching rules
policy_1 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg1, self.pg2, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg1,
+ self.pg2,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
policy_2 = self.spd_add_rem_policy( # inbound, priority 10
- 1, self.pg2, self.pg3, socket.IPPROTO_UDP,
- is_out=0, priority=10, policy_type="bypass")
+ 1,
+ self.pg2,
+ self.pg3,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ )
capture_intfs.append(self.pg1)
capture_intfs.append(self.pg2)
@@ -655,15 +840,13 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheInbound):
if_caps.append(pg.get_capture())
for packet in if_caps[-1]:
try:
- self.logger.debug(ppp(
- "SPD Add - Got packet:", packet))
+ self.logger.debug(ppp("SPD Add - Got packet:", packet))
except Exception:
- self.logger.error(ppp(
- "Unexpected or invalid packet:", packet))
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
# verify captures that matched BYPASS rule
- if(hashed_with_crc32):
+ if hashed_with_crc32:
self.verify_capture(self.pg2, self.pg1, if_caps[0])
self.verify_capture(self.pg0, self.pg3, if_caps[1])
else: # hashed with xxhash
@@ -673,11 +856,11 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheInbound):
# verify all policies matched the expected number of times
self.verify_policy_match(pkt_count, policy_1)
self.verify_policy_match(pkt_count, policy_2)
- self.verify_policy_match(pkt_count*2, policy_0) # output policy
+ self.verify_policy_match(pkt_count * 2, policy_0) # output policy
# we have matched 2 policies, but due to the hash collision
# one active entry is expected
self.verify_num_inbound_flow_cache_entries(1)
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)