From d9b0c6fbf7aa5bd9af84264105b39c82028a4a29 Mon Sep 17 00:00:00 2001 From: Klement Sekera Date: Tue, 26 Apr 2022 19:02:15 +0200 Subject: tests: replace pycodestyle with black Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera Signed-off-by: Dave Wallace --- test/test_ipsec_spd_flow_cache_input.py | 371 ++++++++++++++++++++++++-------- 1 file changed, 277 insertions(+), 94 deletions(-) (limited to 'test/test_ipsec_spd_flow_cache_input.py') diff --git a/test/test_ipsec_spd_flow_cache_input.py b/test/test_ipsec_spd_flow_cache_input.py index 2d70d1540b8..02ecb5625ce 100644 --- a/test/test_ipsec_spd_flow_cache_input.py +++ b/test/test_ipsec_spd_flow_cache_input.py @@ -12,16 +12,14 @@ class SpdFlowCacheInbound(SpdFlowCacheTemplate): @classmethod def setUpConstants(cls): super(SpdFlowCacheInbound, cls).setUpConstants() - cls.vpp_cmdline.extend(["ipsec", "{", - "ipv4-inbound-spd-flow-cache on", - "}"]) - cls.logger.info("VPP modified cmdline is %s" % " " - .join(cls.vpp_cmdline)) + cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-inbound-spd-flow-cache on", "}"]) + cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline)) class IPSec4SpdTestCaseBypass(SpdFlowCacheInbound): """ IPSec/IPv4 inbound: Policy mode test case with flow cache \ (add bypass)""" + def test_ipsec_spd_inbound_bypass(self): # In this test case, packets in IPv4 FWD path are configured # to go through IPSec inbound SPD policy lookup. @@ -42,16 +40,34 @@ class IPSec4SpdTestCaseBypass(SpdFlowCacheInbound): # bypass rule should take precedence over discard rule, # even though it's lower priority policy_0 = self.spd_add_rem_policy( # inbound, priority 10 - 1, self.pg1, self.pg0, socket.IPPROTO_UDP, - is_out=0, priority=10, policy_type="bypass") + 1, + self.pg1, + self.pg0, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="bypass", + ) policy_1 = self.spd_add_rem_policy( # inbound, priority 15 - 1, self.pg1, self.pg0, socket.IPPROTO_UDP, - is_out=0, priority=15, policy_type="discard") + 1, + self.pg1, + self.pg0, + socket.IPPROTO_UDP, + is_out=0, + priority=15, + policy_type="discard", + ) # create output rule so we can capture forwarded packets policy_2 = self.spd_add_rem_policy( # outbound, priority 10 - 1, self.pg0, self.pg1, socket.IPPROTO_UDP, - is_out=1, priority=10, policy_type="bypass") + 1, + self.pg0, + self.pg1, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="bypass", + ) # check flow cache is empty before sending traffic self.verify_num_inbound_flow_cache_entries(0) @@ -85,6 +101,7 @@ class IPSec4SpdTestCaseBypass(SpdFlowCacheInbound): class IPSec4SpdTestCaseDiscard(SpdFlowCacheInbound): """ IPSec/IPv4 inbound: Policy mode test case with flow cache \ (add discard)""" + def test_ipsec_spd_inbound_discard(self): # In this test case, packets in IPv4 FWD path are configured # to go through IPSec inbound SPD policy lookup. @@ -96,13 +113,25 @@ class IPSec4SpdTestCaseDiscard(SpdFlowCacheInbound): # create input rule policy_0 = self.spd_add_rem_policy( # inbound, priority 10 - 1, self.pg1, self.pg0, socket.IPPROTO_UDP, - is_out=0, priority=10, policy_type="discard") + 1, + self.pg1, + self.pg0, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="discard", + ) # create output rule so we can capture forwarded packets policy_1 = self.spd_add_rem_policy( # outbound, priority 10 - 1, self.pg0, self.pg1, socket.IPPROTO_UDP, - is_out=1, priority=10, policy_type="bypass") + 1, + self.pg0, + self.pg1, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="bypass", + ) # check flow cache is empty before sending traffic self.verify_num_inbound_flow_cache_entries(0) @@ -124,6 +153,7 @@ class IPSec4SpdTestCaseDiscard(SpdFlowCacheInbound): class IPSec4SpdTestCaseRemove(SpdFlowCacheInbound): """ IPSec/IPv4 inbound: Policy mode test case with flow cache \ (remove bypass)""" + def test_ipsec_spd_inbound_remove(self): # In this test case, packets in IPv4 FWD path are configured # to go through IPSec inbound SPD policy lookup. @@ -147,16 +177,34 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheInbound): # bypass rule should take precedence over discard rule, # even though it's lower priority policy_0 = self.spd_add_rem_policy( # inbound, priority 10 - 1, self.pg1, self.pg0, socket.IPPROTO_UDP, - is_out=0, priority=10, policy_type="bypass") + 1, + self.pg1, + self.pg0, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="bypass", + ) policy_1 = self.spd_add_rem_policy( # inbound, priority 15 - 1, self.pg1, self.pg0, socket.IPPROTO_UDP, - is_out=0, priority=15, policy_type="discard") + 1, + self.pg1, + self.pg0, + socket.IPPROTO_UDP, + is_out=0, + priority=15, + policy_type="discard", + ) # create output rule so we can capture forwarded packets policy_2 = self.spd_add_rem_policy( # outbound, priority 10 - 1, self.pg0, self.pg1, socket.IPPROTO_UDP, - is_out=1, priority=10, policy_type="bypass") + 1, + self.pg0, + self.pg1, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="bypass", + ) # check flow cache is empty before sending traffic self.verify_num_inbound_flow_cache_entries(0) @@ -188,9 +236,15 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheInbound): # remove the input bypass rule self.spd_add_rem_policy( # inbound, priority 10 - 1, self.pg1, self.pg0, socket.IPPROTO_UDP, - is_out=0, priority=10, policy_type="bypass", - remove=True) + 1, + self.pg1, + self.pg0, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="bypass", + remove=True, + ) # verify flow cache counter has been reset by rule removal self.verify_num_inbound_flow_cache_entries(0) @@ -213,6 +267,7 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheInbound): class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound): """ IPSec/IPv4 inbound: Policy mode test case with flow cache \ (add, remove, re-add bypass)""" + def test_ipsec_spd_inbound_readd(self): # In this test case, packets in IPv4 FWD path are configured # to go through IPSec inbound SPD policy lookup. @@ -239,16 +294,34 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound): # bypass rule should take precedence over discard rule, # even though it's lower priority policy_0 = self.spd_add_rem_policy( # inbound, priority 10 - 1, self.pg1, self.pg0, socket.IPPROTO_UDP, - is_out=0, priority=10, policy_type="bypass") + 1, + self.pg1, + self.pg0, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="bypass", + ) policy_1 = self.spd_add_rem_policy( # inbound, priority 15 - 1, self.pg1, self.pg0, socket.IPPROTO_UDP, - is_out=0, priority=15, policy_type="discard") + 1, + self.pg1, + self.pg0, + socket.IPPROTO_UDP, + is_out=0, + priority=15, + policy_type="discard", + ) # create output rule so we can capture forwarded packets policy_2 = self.spd_add_rem_policy( # outbound, priority 10 - 1, self.pg0, self.pg1, socket.IPPROTO_UDP, - is_out=1, priority=10, policy_type="bypass") + 1, + self.pg0, + self.pg1, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="bypass", + ) # check flow cache is empty before sending traffic self.verify_num_inbound_flow_cache_entries(0) @@ -280,9 +353,15 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound): # remove the input bypass rule self.spd_add_rem_policy( # inbound, priority 10 - 1, self.pg1, self.pg0, socket.IPPROTO_UDP, - is_out=0, priority=10, policy_type="bypass", - remove=True) + 1, + self.pg1, + self.pg0, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="bypass", + remove=True, + ) # verify flow cache counter has been reset by rule removal self.verify_num_inbound_flow_cache_entries(0) @@ -303,8 +382,14 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound): # readd the input bypass rule policy_0 = self.spd_add_rem_policy( # inbound, priority 10 - 1, self.pg1, self.pg0, socket.IPPROTO_UDP, - is_out=0, priority=10, policy_type="bypass") + 1, + self.pg1, + self.pg0, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="bypass", + ) # verify flow cache counter has been reset by rule addition self.verify_num_inbound_flow_cache_entries(0) @@ -327,7 +412,7 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound): # verify all policies matched the expected number of times self.verify_policy_match(pkt_count, policy_0) self.verify_policy_match(pkt_count, policy_1) - self.verify_policy_match(pkt_count*2, policy_2) + self.verify_policy_match(pkt_count * 2, policy_2) # by readding the bypass rule, we reset the flow cache # we only expect the bypass rule to now be in the flow cache self.verify_num_inbound_flow_cache_entries(1) @@ -336,6 +421,7 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheInbound): class IPSec4SpdTestCaseMultiple(SpdFlowCacheInbound): """ IPSec/IPv4 inbound: Policy mode test case with flow cache \ (multiple interfaces, multiple rules)""" + def test_ipsec_spd_inbound_multiple(self): # In this test case, packets in IPv4 FWD path are configured to go # through IPSec outbound SPD policy lookup. @@ -353,23 +439,47 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheInbound): # add input rules on all interfaces # pg0 -> pg1 policy_0 = self.spd_add_rem_policy( # inbound, priority 10 - 1, self.pg1, self.pg0, socket.IPPROTO_UDP, - is_out=0, priority=10, policy_type="bypass") + 1, + self.pg1, + self.pg0, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="bypass", + ) # pg1 -> pg2 policy_1 = self.spd_add_rem_policy( # inbound, priority 10 - 1, self.pg2, self.pg1, socket.IPPROTO_UDP, - is_out=0, priority=10, policy_type="bypass") + 1, + self.pg2, + self.pg1, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="bypass", + ) # pg2 -> pg0 policy_2 = self.spd_add_rem_policy( # inbound, priority 10 - 1, self.pg0, self.pg2, socket.IPPROTO_UDP, - is_out=0, priority=10, policy_type="discard") + 1, + self.pg0, + self.pg2, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="discard", + ) # create output rules covering the the full ip range # 0.0.0.0 -> 255.255.255.255, so we can capture forwarded packets policy_3 = self.spd_add_rem_policy( # outbound, priority 10 - 1, self.pg0, self.pg0, socket.IPPROTO_UDP, - is_out=1, priority=10, policy_type="bypass", - all_ips=True) + 1, + self.pg0, + self.pg0, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="bypass", + all_ips=True, + ) # check flow cache is empty (0 active elements) before sending traffic self.verify_num_inbound_flow_cache_entries(0) @@ -396,8 +506,7 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheInbound): try: self.logger.debug(ppp("SPD Add - Got packet:", packet)) except Exception: - self.logger.error( - ppp("Unexpected or invalid packet:", packet)) + self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise # verify captures that matched BYPASS rules @@ -416,6 +525,7 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheInbound): class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound): """ IPSec/IPv4 inbound: Policy mode test case with flow cache \ (overwrite stale entries)""" + def test_ipsec_spd_inbound_overwrite(self): # The operation of the flow cache is setup so that the entire cache # is invalidated when adding or removing an SPD policy rule. @@ -436,23 +546,47 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound): # add input rules on all interfaces # pg0 -> pg1 policy_0 = self.spd_add_rem_policy( # inbound - 1, self.pg1, self.pg0, socket.IPPROTO_UDP, - is_out=0, priority=10, policy_type="bypass") + 1, + self.pg1, + self.pg0, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="bypass", + ) # pg1 -> pg2 policy_1 = self.spd_add_rem_policy( # inbound - 1, self.pg2, self.pg1, socket.IPPROTO_UDP, - is_out=0, priority=10, policy_type="bypass") + 1, + self.pg2, + self.pg1, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="bypass", + ) # pg2 -> pg0 policy_2 = self.spd_add_rem_policy( # inbound - 1, self.pg0, self.pg2, socket.IPPROTO_UDP, - is_out=0, priority=10, policy_type="discard") + 1, + self.pg0, + self.pg2, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="discard", + ) # create output rules covering the the full ip range # 0.0.0.0 -> 255.255.255.255, so we can capture forwarded packets policy_3 = self.spd_add_rem_policy( # outbound - 1, self.pg0, self.pg0, socket.IPPROTO_UDP, - is_out=1, priority=10, policy_type="bypass", - all_ips=True) + 1, + self.pg0, + self.pg0, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="bypass", + all_ips=True, + ) # check flow cache is empty (0 active elements) before sending traffic self.verify_num_inbound_flow_cache_entries(0) @@ -479,8 +613,7 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound): try: self.logger.debug(ppp("SPD Add - Got packet:", packet)) except Exception: - self.logger.error( - ppp("Unexpected or invalid packet:", packet)) + self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise # verify captures that matched BYPASS rules @@ -497,22 +630,40 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound): # adding an outbound policy should not invalidate output flow cache self.spd_add_rem_policy( # outbound - 1, self.pg0, self.pg0, socket.IPPROTO_UDP, - is_out=1, priority=1, policy_type="bypass", - all_ips=True) + 1, + self.pg0, + self.pg0, + socket.IPPROTO_UDP, + is_out=1, + priority=1, + policy_type="bypass", + all_ips=True, + ) # check inbound flow cache counter has not been reset self.verify_num_inbound_flow_cache_entries(3) # remove + readd bypass policy - flow cache counter will be reset, # and there will be 3x stale entries in flow cache self.spd_add_rem_policy( # inbound, priority 10 - 1, self.pg1, self.pg0, socket.IPPROTO_UDP, - is_out=0, priority=10, policy_type="bypass", - remove=True) + 1, + self.pg1, + self.pg0, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="bypass", + remove=True, + ) # readd policy policy_0 = self.spd_add_rem_policy( # inbound, priority 10 - 1, self.pg1, self.pg0, socket.IPPROTO_UDP, - is_out=0, priority=10, policy_type="bypass") + 1, + self.pg1, + self.pg0, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="bypass", + ) # check counter was reset self.verify_num_inbound_flow_cache_entries(0) @@ -532,8 +683,7 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound): try: self.logger.debug(ppp("SPD Add - Got packet:", packet)) except Exception: - self.logger.error( - ppp("Unexpected or invalid packet:", packet)) + self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise # verify captures that matched BYPASS rules @@ -543,8 +693,8 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound): self.pg0.assert_nothing_captured() # verify all policies matched the expected number of times self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(pkt_count*2, policy_1) - self.verify_policy_match(pkt_count*2, policy_2) + self.verify_policy_match(pkt_count * 2, policy_1) + self.verify_policy_match(pkt_count * 2, policy_2) # we are overwriting 3x stale entries - check flow cache counter # is correct self.verify_num_inbound_flow_cache_entries(3) @@ -553,18 +703,23 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheInbound): class IPSec4SpdTestCaseCollision(SpdFlowCacheInbound): """ IPSec/IPv4 inbound: Policy mode test case with flow cache \ (hash collision)""" + # Override class setup to restrict hash table size to 16 buckets. # This forces using only the lower 4 bits of the hash as a key, # making hash collisions easy to find. @classmethod def setUpConstants(cls): super(SpdFlowCacheInbound, cls).setUpConstants() - cls.vpp_cmdline.extend(["ipsec", "{", - "ipv4-inbound-spd-flow-cache on", - "ipv4-inbound-spd-hash-buckets 16", - "}"]) - cls.logger.info("VPP modified cmdline is %s" % " " - .join(cls.vpp_cmdline)) + cls.vpp_cmdline.extend( + [ + "ipsec", + "{", + "ipv4-inbound-spd-flow-cache on", + "ipv4-inbound-spd-hash-buckets 16", + "}", + ] + ) + cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline)) def test_ipsec_spd_inbound_collision(self): # The flow cache operation is setup to overwrite an entry @@ -588,20 +743,38 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheInbound): # create output rules covering the the full ip range # 0.0.0.0 -> 255.255.255.255, so we can capture forwarded packets policy_0 = self.spd_add_rem_policy( # outbound - 1, self.pg0, self.pg0, socket.IPPROTO_UDP, - is_out=1, priority=10, policy_type="bypass", - all_ips=True) + 1, + self.pg0, + self.pg0, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="bypass", + all_ips=True, + ) capture_intfs = [] if self.crc32_supported(): # create crc32 collision on last 4 bits hashed_with_crc32 = True # add matching rules policy_1 = self.spd_add_rem_policy( # inbound, priority 10 - 1, self.pg1, self.pg2, socket.IPPROTO_UDP, - is_out=0, priority=10, policy_type="bypass") + 1, + self.pg1, + self.pg2, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="bypass", + ) policy_2 = self.spd_add_rem_policy( # inbound, priority 10 - 1, self.pg3, self.pg0, socket.IPPROTO_UDP, - is_out=0, priority=10, policy_type="bypass") + 1, + self.pg3, + self.pg0, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="bypass", + ) # we expect to get captures on pg1 + pg3 capture_intfs.append(self.pg1) @@ -623,11 +796,23 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheInbound): hashed_with_crc32 = False # add matching rules policy_1 = self.spd_add_rem_policy( # inbound, priority 10 - 1, self.pg1, self.pg2, socket.IPPROTO_UDP, - is_out=0, priority=10, policy_type="bypass") + 1, + self.pg1, + self.pg2, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="bypass", + ) policy_2 = self.spd_add_rem_policy( # inbound, priority 10 - 1, self.pg2, self.pg3, socket.IPPROTO_UDP, - is_out=0, priority=10, policy_type="bypass") + 1, + self.pg2, + self.pg3, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="bypass", + ) capture_intfs.append(self.pg1) capture_intfs.append(self.pg2) @@ -655,15 +840,13 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheInbound): if_caps.append(pg.get_capture()) for packet in if_caps[-1]: try: - self.logger.debug(ppp( - "SPD Add - Got packet:", packet)) + self.logger.debug(ppp("SPD Add - Got packet:", packet)) except Exception: - self.logger.error(ppp( - "Unexpected or invalid packet:", packet)) + self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise # verify captures that matched BYPASS rule - if(hashed_with_crc32): + if hashed_with_crc32: self.verify_capture(self.pg2, self.pg1, if_caps[0]) self.verify_capture(self.pg0, self.pg3, if_caps[1]) else: # hashed with xxhash @@ -673,11 +856,11 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheInbound): # verify all policies matched the expected number of times self.verify_policy_match(pkt_count, policy_1) self.verify_policy_match(pkt_count, policy_2) - self.verify_policy_match(pkt_count*2, policy_0) # output policy + self.verify_policy_match(pkt_count * 2, policy_0) # output policy # we have matched 2 policies, but due to the hash collision # one active entry is expected self.verify_num_inbound_flow_cache_entries(1) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) -- cgit 1.2.3-korg