From d9b0c6fbf7aa5bd9af84264105b39c82028a4a29 Mon Sep 17 00:00:00 2001 From: Klement Sekera Date: Tue, 26 Apr 2022 19:02:15 +0200 Subject: tests: replace pycodestyle with black Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera Signed-off-by: Dave Wallace --- test/test_ipsec_spd_flow_cache_output.py | 319 +++++++++++++++++++++++-------- 1 file changed, 240 insertions(+), 79 deletions(-) (limited to 'test/test_ipsec_spd_flow_cache_output.py') diff --git a/test/test_ipsec_spd_flow_cache_output.py b/test/test_ipsec_spd_flow_cache_output.py index 54571c6741a..9852b375a82 100644 --- a/test/test_ipsec_spd_flow_cache_output.py +++ b/test/test_ipsec_spd_flow_cache_output.py @@ -11,16 +11,14 @@ class SpdFlowCacheOutbound(SpdFlowCacheTemplate): @classmethod def setUpConstants(cls): super(SpdFlowCacheOutbound, cls).setUpConstants() - cls.vpp_cmdline.extend(["ipsec", "{", - "ipv4-outbound-spd-flow-cache on", - "}"]) - cls.logger.info("VPP modified cmdline is %s" % " " - .join(cls.vpp_cmdline)) + cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-outbound-spd-flow-cache on", "}"]) + cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline)) class IPSec4SpdTestCaseAdd(SpdFlowCacheOutbound): """ IPSec/IPv4 outbound: Policy mode test case with flow cache \ (add rule)""" + def test_ipsec_spd_outbound_add(self): # In this test case, packets in IPv4 FWD path are configured # to go through IPSec outbound SPD policy lookup. @@ -33,11 +31,23 @@ class IPSec4SpdTestCaseAdd(SpdFlowCacheOutbound): pkt_count = 5 self.spd_create_and_intf_add(1, [self.pg1]) policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, self.pg0, self.pg1, socket.IPPROTO_UDP, - is_out=1, priority=10, policy_type="bypass") + 1, + self.pg0, + self.pg1, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="bypass", + ) policy_1 = self.spd_add_rem_policy( # outbound, priority 5 - 1, self.pg0, self.pg1, socket.IPPROTO_UDP, - is_out=1, priority=5, policy_type="discard") + 1, + self.pg0, + self.pg1, + socket.IPPROTO_UDP, + is_out=1, + priority=5, + policy_type="discard", + ) # check flow cache is empty before sending traffic self.verify_num_outbound_flow_cache_entries(0) @@ -75,6 +85,7 @@ class IPSec4SpdTestCaseAdd(SpdFlowCacheOutbound): class IPSec4SpdTestCaseRemove(SpdFlowCacheOutbound): """ IPSec/IPv4 outbound: Policy mode test case with flow cache \ (remove rule)""" + def test_ipsec_spd_outbound_remove(self): # In this test case, packets in IPv4 FWD path are configured # to go through IPSec outbound SPD policy lookup. @@ -88,11 +99,23 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheOutbound): pkt_count = 5 self.spd_create_and_intf_add(1, [self.pg1]) policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, self.pg0, self.pg1, socket.IPPROTO_UDP, - is_out=1, priority=10, policy_type="bypass") + 1, + self.pg0, + self.pg1, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="bypass", + ) policy_1 = self.spd_add_rem_policy( # outbound, priority 5 - 1, self.pg0, self.pg1, socket.IPPROTO_UDP, - is_out=1, priority=5, policy_type="discard") + 1, + self.pg0, + self.pg1, + socket.IPPROTO_UDP, + is_out=1, + priority=5, + policy_type="discard", + ) # check flow cache is empty before sending traffic self.verify_num_outbound_flow_cache_entries(0) @@ -128,9 +151,15 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheOutbound): # now remove the bypass rule self.spd_add_rem_policy( # outbound, priority 10 - 1, self.pg0, self.pg1, socket.IPPROTO_UDP, - is_out=1, priority=10, policy_type="bypass", - remove=True) + 1, + self.pg0, + self.pg1, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="bypass", + remove=True, + ) # verify flow cache counter has been reset by rule removal self.verify_num_outbound_flow_cache_entries(0) @@ -154,6 +183,7 @@ class IPSec4SpdTestCaseRemove(SpdFlowCacheOutbound): class IPSec4SpdTestCaseReadd(SpdFlowCacheOutbound): """ IPSec/IPv4 outbound: Policy mode test case with flow cache \ (add, remove, re-add)""" + def test_ipsec_spd_outbound_readd(self): # In this test case, packets in IPv4 FWD path are configured # to go through IPSec outbound SPD policy lookup. @@ -172,11 +202,23 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheOutbound): pkt_count = 5 self.spd_create_and_intf_add(1, [self.pg1]) policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, self.pg0, self.pg1, socket.IPPROTO_UDP, - is_out=1, priority=10, policy_type="bypass") + 1, + self.pg0, + self.pg1, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="bypass", + ) policy_1 = self.spd_add_rem_policy( # outbound, priority 5 - 1, self.pg0, self.pg1, socket.IPPROTO_UDP, - is_out=1, priority=5, policy_type="discard") + 1, + self.pg0, + self.pg1, + socket.IPPROTO_UDP, + is_out=1, + priority=5, + policy_type="discard", + ) # check flow cache is empty before sending traffic self.verify_num_outbound_flow_cache_entries(0) @@ -212,9 +254,15 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheOutbound): # now remove the bypass rule, leaving only the discard rule self.spd_add_rem_policy( # outbound, priority 10 - 1, self.pg0, self.pg1, socket.IPPROTO_UDP, - is_out=1, priority=10, policy_type="bypass", - remove=True) + 1, + self.pg0, + self.pg1, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="bypass", + remove=True, + ) # verify flow cache counter has been reset by rule removal self.verify_num_outbound_flow_cache_entries(0) @@ -236,8 +284,14 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheOutbound): # now readd the bypass rule policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, self.pg0, self.pg1, socket.IPPROTO_UDP, - is_out=1, priority=10, policy_type="bypass") + 1, + self.pg0, + self.pg1, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="bypass", + ) # verify flow cache counter has been reset by rule addition self.verify_num_outbound_flow_cache_entries(0) @@ -271,6 +325,7 @@ class IPSec4SpdTestCaseReadd(SpdFlowCacheOutbound): class IPSec4SpdTestCaseMultiple(SpdFlowCacheOutbound): """ IPSec/IPv4 outbound: Policy mode test case with flow cache \ (multiple interfaces, multiple rules)""" + def test_ipsec_spd_outbound_multiple(self): # In this test case, packets in IPv4 FWD path are configured to go # through IPSec outbound SPD policy lookup. @@ -286,32 +341,75 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheOutbound): self.spd_create_and_intf_add(1, self.pg_interfaces) # add rules on all interfaces policy_01 = self.spd_add_rem_policy( # outbound, priority 10 - 1, self.pg0, self.pg1, socket.IPPROTO_UDP, - is_out=1, priority=10, policy_type="bypass") + 1, + self.pg0, + self.pg1, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="bypass", + ) policy_02 = self.spd_add_rem_policy( # outbound, priority 5 - 1, self.pg0, self.pg1, socket.IPPROTO_UDP, - is_out=1, priority=5, policy_type="discard") + 1, + self.pg0, + self.pg1, + socket.IPPROTO_UDP, + is_out=1, + priority=5, + policy_type="discard", + ) policy_11 = self.spd_add_rem_policy( # outbound, priority 10 - 1, self.pg1, self.pg2, socket.IPPROTO_UDP, - is_out=1, priority=10, policy_type="bypass") + 1, + self.pg1, + self.pg2, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="bypass", + ) policy_12 = self.spd_add_rem_policy( # outbound, priority 5 - 1, self.pg1, self.pg2, socket.IPPROTO_UDP, - is_out=1, priority=5, policy_type="discard") + 1, + self.pg1, + self.pg2, + socket.IPPROTO_UDP, + is_out=1, + priority=5, + policy_type="discard", + ) policy_21 = self.spd_add_rem_policy( # outbound, priority 5 - 1, self.pg2, self.pg0, socket.IPPROTO_UDP, - is_out=1, priority=5, policy_type="bypass") + 1, + self.pg2, + self.pg0, + socket.IPPROTO_UDP, + is_out=1, + priority=5, + policy_type="bypass", + ) policy_22 = self.spd_add_rem_policy( # outbound, priority 10 - 1, self.pg2, self.pg0, socket.IPPROTO_UDP, - is_out=1, priority=10, policy_type="discard") + 1, + self.pg2, + self.pg0, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="discard", + ) # interfaces bound to an SPD, will by default drop inbound # traffic with no matching policies. add catch-all inbound # bypass rule to SPD: self.spd_add_rem_policy( # inbound, all interfaces - 1, None, None, socket.IPPROTO_UDP, is_out=0, priority=10, - policy_type="bypass", all_ips=True) + 1, + None, + None, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="bypass", + all_ips=True, + ) # check flow cache is empty (0 active elements) before sending traffic self.verify_num_outbound_flow_cache_entries(0) @@ -338,8 +436,7 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheOutbound): try: self.logger.debug(ppp("SPD - Got packet:", packet)) except Exception: - self.logger.error( - ppp("Unexpected or invalid packet:", packet)) + self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res)) self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res)) @@ -366,6 +463,7 @@ class IPSec4SpdTestCaseMultiple(SpdFlowCacheOutbound): class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound): """ IPSec/IPv4 outbound: Policy mode test case with flow cache \ (overwrite stale entries)""" + def test_ipsec_spd_outbound_overwrite(self): # The operation of the flow cache is setup so that the entire cache # is invalidated when adding or removing an SPD policy rule. @@ -386,23 +484,48 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound): # add output rules on all interfaces # pg0 -> pg1 policy_0 = self.spd_add_rem_policy( # outbound - 1, self.pg0, self.pg1, socket.IPPROTO_UDP, - is_out=1, priority=10, policy_type="bypass") + 1, + self.pg0, + self.pg1, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="bypass", + ) # pg1 -> pg2 policy_1 = self.spd_add_rem_policy( # outbound - 1, self.pg1, self.pg2, socket.IPPROTO_UDP, - is_out=1, priority=10, policy_type="bypass") + 1, + self.pg1, + self.pg2, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="bypass", + ) # pg2 -> pg0 policy_2 = self.spd_add_rem_policy( # outbound - 1, self.pg2, self.pg0, socket.IPPROTO_UDP, - is_out=1, priority=10, policy_type="discard") + 1, + self.pg2, + self.pg0, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="discard", + ) # interfaces bound to an SPD, will by default drop inbound # traffic with no matching policies. add catch-all inbound # bypass rule to SPD: self.spd_add_rem_policy( # inbound, all interfaces - 1, None, None, socket.IPPROTO_UDP, is_out=0, priority=10, - policy_type="bypass", all_ips=True) + 1, + None, + None, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="bypass", + all_ips=True, + ) # check flow cache is empty (0 active elements) before sending traffic self.verify_num_outbound_flow_cache_entries(0) @@ -429,8 +552,7 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound): try: self.logger.debug(ppp("SPD Add - Got packet:", packet)) except Exception: - self.logger.error( - ppp("Unexpected or invalid packet:", packet)) + self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise # verify captures that matched BYPASS rules @@ -447,21 +569,39 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound): # adding an inbound policy should not invalidate output flow cache self.spd_add_rem_policy( # inbound - 1, self.pg0, self.pg1, socket.IPPROTO_UDP, - is_out=0, priority=10, policy_type="bypass") + 1, + self.pg0, + self.pg1, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="bypass", + ) # check flow cache counter has not been reset self.verify_num_outbound_flow_cache_entries(3) # remove a bypass policy - flow cache counter will be reset, and # there will be 3x stale entries in flow cache self.spd_add_rem_policy( # outbound - 1, self.pg0, self.pg1, socket.IPPROTO_UDP, - is_out=1, priority=10, policy_type="bypass", - remove=True) + 1, + self.pg0, + self.pg1, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="bypass", + remove=True, + ) # readd policy policy_0 = self.spd_add_rem_policy( # outbound - 1, self.pg0, self.pg1, socket.IPPROTO_UDP, - is_out=1, priority=10, policy_type="bypass") + 1, + self.pg0, + self.pg1, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="bypass", + ) # check counter was reset with flow cache invalidation self.verify_num_outbound_flow_cache_entries(0) @@ -481,8 +621,7 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound): try: self.logger.debug(ppp("SPD Add - Got packet:", packet)) except Exception: - self.logger.error( - ppp("Unexpected or invalid packet:", packet)) + self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise # verify captures that matched BYPASS rules @@ -492,8 +631,8 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound): self.pg0.assert_nothing_captured() # verify all policies matched the expected number of times self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(pkt_count*2, policy_1) - self.verify_policy_match(pkt_count*2, policy_2) + self.verify_policy_match(pkt_count * 2, policy_1) + self.verify_policy_match(pkt_count * 2, policy_2) # we are overwriting 3x stale entries - check flow cache counter # is correct self.verify_num_outbound_flow_cache_entries(3) @@ -502,18 +641,23 @@ class IPSec4SpdTestCaseOverwriteStale(SpdFlowCacheOutbound): class IPSec4SpdTestCaseCollision(SpdFlowCacheOutbound): """ IPSec/IPv4 outbound: Policy mode test case with flow cache \ (hash collision)""" + # Override class setup to restrict vector size to 16 elements. # This forces using only the lower 4 bits of the hash as a key, # making hash collisions easy to find. @classmethod def setUpConstants(cls): super(SpdFlowCacheOutbound, cls).setUpConstants() - cls.vpp_cmdline.extend(["ipsec", "{", - "ipv4-outbound-spd-flow-cache on", - "ipv4-outbound-spd-hash-buckets 16", - "}"]) - cls.logger.info("VPP modified cmdline is %s" % " " - .join(cls.vpp_cmdline)) + cls.vpp_cmdline.extend( + [ + "ipsec", + "{", + "ipv4-outbound-spd-flow-cache on", + "ipv4-outbound-spd-hash-buckets 16", + "}", + ] + ) + cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline)) def test_ipsec_spd_outbound_collision(self): # The flow cache operation is setup to overwrite an entry @@ -535,18 +679,37 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheOutbound): self.spd_create_and_intf_add(1, self.pg_interfaces) # add rules policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, self.pg1, self.pg2, socket.IPPROTO_UDP, - is_out=1, priority=10, policy_type="bypass") + 1, + self.pg1, + self.pg2, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="bypass", + ) policy_1 = self.spd_add_rem_policy( # outbound, priority 10 - 1, self.pg2, self.pg0, socket.IPPROTO_UDP, - is_out=1, priority=10, policy_type="bypass") + 1, + self.pg2, + self.pg0, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="bypass", + ) # interfaces bound to an SPD, will by default drop inbound # traffic with no matching policies. add catch-all inbound # bypass rule to SPD: self.spd_add_rem_policy( # inbound, all interfaces - 1, None, None, socket.IPPROTO_UDP, is_out=0, priority=10, - policy_type="bypass", all_ips=True) + 1, + None, + None, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="bypass", + all_ips=True, + ) # check flow cache is empty (0 active elements) before sending traffic self.verify_num_outbound_flow_cache_entries(0) @@ -580,11 +743,9 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheOutbound): if_caps.append(pg.get_capture()) for packet in if_caps[-1]: try: - self.logger.debug(ppp( - "SPD - Got packet:", packet)) + self.logger.debug(ppp("SPD - Got packet:", packet)) except Exception: - self.logger.error(ppp( - "Unexpected or invalid packet:", packet)) + self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res)) self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res)) @@ -600,5 +761,5 @@ class IPSec4SpdTestCaseCollision(SpdFlowCacheOutbound): self.verify_num_outbound_flow_cache_entries(1) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) -- cgit 1.2.3-korg