From d9b0c6fbf7aa5bd9af84264105b39c82028a4a29 Mon Sep 17 00:00:00 2001 From: Klement Sekera Date: Tue, 26 Apr 2022 19:02:15 +0200 Subject: tests: replace pycodestyle with black Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera Signed-off-by: Dave Wallace --- test/test_flowprobe.py | 307 ++++++++++++++++++++++++++----------------------- 1 file changed, 165 insertions(+), 142 deletions(-) (limited to 'test/test_flowprobe.py') diff --git a/test/test_flowprobe.py b/test/test_flowprobe.py index 5bafd39eceb..6b271790f76 100644 --- a/test/test_flowprobe.py +++ b/test/test_flowprobe.py @@ -29,17 +29,26 @@ from vpp_papi import VppEnum class VppCFLOW(VppObject): """CFLOW object for IPFIX exporter and Flowprobe feature""" - def __init__(self, test, intf='pg2', active=0, passive=0, timeout=100, - mtu=1024, datapath='l2', layer='l2 l3 l4'): + def __init__( + self, + test, + intf="pg2", + active=0, + passive=0, + timeout=100, + mtu=1024, + datapath="l2", + layer="l2 l3 l4", + ): self._test = test self._intf = intf self._active = active if passive == 0 or passive < active: - self._passive = active+1 + self._passive = active + 1 else: self._passive = passive - self._datapath = datapath # l2 ip4 ip6 - self._collect = layer # l2 l3 l4 + self._datapath = datapath # l2 ip4 ip6 + self._collect = layer # l2 l3 l4 self._timeout = timeout self._mtu = mtu self._configured = False @@ -49,18 +58,17 @@ class VppCFLOW(VppObject): l2_flag = 0 l3_flag = 0 l4_flag = 0 - if 'l2' in self._collect.lower(): - l2_flag = (VppEnum.vl_api_flowprobe_record_flags_t. - FLOWPROBE_RECORD_FLAG_L2) - if 'l3' in self._collect.lower(): - l3_flag = (VppEnum.vl_api_flowprobe_record_flags_t. - FLOWPROBE_RECORD_FLAG_L3) - if 'l4' in self._collect.lower(): - l4_flag = (VppEnum.vl_api_flowprobe_record_flags_t. - FLOWPROBE_RECORD_FLAG_L4) + if "l2" in self._collect.lower(): + l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2 + if "l3" in self._collect.lower(): + l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3 + if "l4" in self._collect.lower(): + l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4 self._test.vapi.flowprobe_params( record_flags=(l2_flag | l3_flag | l4_flag), - active_timer=self._active, passive_timer=self._passive) + active_timer=self._active, + passive_timer=self._passive, + ) self.enable_flowprobe_feature() self._test.vapi.cli("ipfix flush") self._configured = True @@ -76,18 +84,21 @@ class VppCFLOW(VppObject): collector_address=self._test.pg0.remote_ip4, src_address=self._test.pg0.local_ip4, path_mtu=self._mtu, - template_interval=self._timeout) + template_interval=self._timeout, + ) def enable_flowprobe_feature(self): - self._test.vapi.ppcli("flowprobe feature add-del %s %s" % - (self._intf, self._datapath)) + self._test.vapi.ppcli( + "flowprobe feature add-del %s %s" % (self._intf, self._datapath) + ) def disable_exporter(self): self._test.vapi.cli("set ipfix exporter collector 0.0.0.0") def disable_flowprobe_feature(self): - self._test.vapi.cli("flowprobe feature add-del %s %s disable" % - (self._intf, self._datapath)) + self._test.vapi.cli( + "flowprobe feature add-del %s %s disable" % (self._intf, self._datapath) + ) def object_id(self): return "ipfix-collector-%s-%s" % (self._src, self.dst) @@ -99,8 +110,7 @@ class VppCFLOW(VppObject): templates = [] self._test.assertIn(count, (1, 2, 3)) for _ in range(count): - p = self._test.wait_for_cflow_packet(self._test.collector, 2, - timeout) + p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout) self._test.assertTrue(p.haslayer(IPFIX)) if decoder is not None and p.haslayer(Template): templates.append(p[Template].templateID) @@ -109,7 +119,7 @@ class VppCFLOW(VppObject): class MethodHolder(VppTestCase): - """ Flow-per-packet plugin: test L2, IP4, IP6 reporting """ + """Flow-per-packet plugin: test L2, IP4, IP6 reporting""" # Test variables debug_print = False @@ -135,9 +145,11 @@ class MethodHolder(VppTestCase): # and put interfaces to this BD cls.vapi.bridge_domain_add_del(bd_id=1, uu_flood=1, learn=1) cls.vapi.sw_interface_set_l2_bridge( - rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1) + rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1 + ) cls.vapi.sw_interface_set_l2_bridge( - rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1) + rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1 + ) # Set up all interfaces for i in cls.pg_interfaces: @@ -173,8 +185,9 @@ class MethodHolder(VppTestCase): def tearDownClass(cls): super(MethodHolder, cls).tearDownClass() - def create_stream(self, src_if=None, dst_if=None, packets=None, - size=None, ip_ver='v4'): + def create_stream( + self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4" + ): """Create a packet stream to tickle the plugin :param VppInterface src_if: Source interface for packet stream @@ -194,7 +207,7 @@ class MethodHolder(VppTestCase): info = self.create_packet_info(src_if, dst_if) payload = self.info_to_payload(info) p = Ether(src=src_if.remote_mac, dst=src_if.local_mac) - if ip_ver == 'v4': + if ip_ver == "v4": p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) else: p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) @@ -226,16 +239,16 @@ class MethodHolder(VppTestCase): self.pg_start() return dst_if.get_capture(len(self.pkts)) - def verify_cflow_data_detail(self, decoder, capture, cflow, - data_set={1: 'octets', 2: 'packets'}, - ip_ver='v4'): + def verify_cflow_data_detail( + self, decoder, capture, cflow, data_set={1: "octets", 2: "packets"}, ip_ver="v4" + ): if self.debug_print: print(capture[0].show()) if cflow.haslayer(Data): data = decoder.decode_data_set(cflow.getlayer(Set)) if self.debug_print: print(data) - if ip_ver == 'v4': + if ip_ver == "v4": ip_layer = capture[0][IP] else: ip_layer = capture[0][IPv6] @@ -251,35 +264,31 @@ class MethodHolder(VppTestCase): if field not in record.keys(): continue value = data_set[field] - if value == 'octets': + if value == "octets": value = ip_layer.len - if ip_ver == 'v6': - value += 40 # ??? is this correct - elif value == 'packets': + if ip_ver == "v6": + value += 40 # ??? is this correct + elif value == "packets": value = 1 - elif value == 'src_ip': - if ip_ver == 'v4': - ip = socket.inet_pton(socket.AF_INET, - ip_layer.src) + elif value == "src_ip": + if ip_ver == "v4": + ip = socket.inet_pton(socket.AF_INET, ip_layer.src) else: - ip = socket.inet_pton(socket.AF_INET6, - ip_layer.src) + ip = socket.inet_pton(socket.AF_INET6, ip_layer.src) value = int(binascii.hexlify(ip), 16) - elif value == 'dst_ip': - if ip_ver == 'v4': - ip = socket.inet_pton(socket.AF_INET, - ip_layer.dst) + elif value == "dst_ip": + if ip_ver == "v4": + ip = socket.inet_pton(socket.AF_INET, ip_layer.dst) else: - ip = socket.inet_pton(socket.AF_INET6, - ip_layer.dst) + ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst) value = int(binascii.hexlify(ip), 16) - elif value == 'sport': + elif value == "sport": value = int(capture[0][UDP].sport) - elif value == 'dport': + elif value == "dport": value = int(capture[0][UDP].dport) - self.assertEqual(int(binascii.hexlify( - record[field]), 16), - value) + self.assertEqual( + int(binascii.hexlify(record[field]), 16), value + ) def verify_cflow_data_notimer(self, decoder, capture, cflows): idx = 0 @@ -292,14 +301,12 @@ class MethodHolder(VppTestCase): for rec in data: p = capture[idx] idx += 1 - self.assertEqual(p[IP].len, int( - binascii.hexlify(rec[1]), 16)) - self.assertEqual(1, int( - binascii.hexlify(rec[2]), 16)) + self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16)) + self.assertEqual(1, int(binascii.hexlify(rec[2]), 16)) self.assertEqual(len(capture), idx) def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1): - """ wait for CFLOW packet and verify its correctness + """wait for CFLOW packet and verify its correctness :param timeout: how long to wait @@ -327,7 +334,7 @@ class Flowprobe(MethodHolder): super(Flowprobe, cls).tearDownClass() def test_0001(self): - """ timer less than template timeout""" + """timer less than template timeout""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] @@ -351,7 +358,7 @@ class Flowprobe(MethodHolder): self.logger.info("FFP_TEST_FINISH_0001") def test_0002(self): - """ timer greater than template timeout""" + """timer greater than template timeout""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] @@ -384,26 +391,33 @@ class Flowprobe(MethodHolder): self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg8', datapath="ip4", - layer='l2 l3 l4', active=2) + ipfix = VppCFLOW( + test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2 + ) ipfix.add_vpp_config() - route_9001 = VppIpRoute(self, "9.0.0.0", 24, - [VppRoutePath(self.pg8._remote_hosts[0].ip4, - self.pg8.sw_if_index)]) + route_9001 = VppIpRoute( + self, + "9.0.0.0", + 24, + [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)], + ) route_9001.add_vpp_config() ipfix_decoder = IPFIXDecoder() templates = ipfix.verify_templates(ipfix_decoder, count=1) - self.pkts = [(Ether(dst=self.pg7.local_mac, - src=self.pg7.remote_mac) / - IP(src=self.pg7.remote_ip4, dst="9.0.0.100") / - TCP(sport=1234, dport=4321, flags=80) / - Raw(b'\xa5' * 100))] + self.pkts = [ + ( + Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac) + / IP(src=self.pg7.remote_ip4, dst="9.0.0.100") + / TCP(sport=1234, dport=4321, flags=80) + / Raw(b"\xa5" * 100) + ) + ] nowUTC = int(time.time()) - nowUNIX = nowUTC+2208988800 + nowUNIX = nowUTC + 2208988800 self.send_packets(src_if=self.pg7, dst_if=self.pg8) cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10) @@ -435,11 +449,9 @@ class Flowprobe(MethodHolder): # ethernet type self.assertEqual(int(binascii.hexlify(record[256]), 16), 8) # src ip - self.assertEqual(inet_ntop(socket.AF_INET, record[8]), - self.pg7.remote_ip4) + self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4) # dst ip - self.assertEqual(inet_ntop(socket.AF_INET, record[12]), - "9.0.0.100") + self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100") # protocol (TCP) self.assertEqual(int(binascii.hexlify(record[4]), 16), 6) # src port @@ -466,11 +478,11 @@ class Datapath(MethodHolder): super(Datapath, cls).tearDownClass() def test_templatesL2(self): - """ verify template on L2 datapath""" + """verify template on L2 datapath""" self.logger.info("FFP_TEST_START_0000") self.pg_enable_capture(self.pg_interfaces) - ipfix = VppCFLOW(test=self, layer='l2') + ipfix = VppCFLOW(test=self, layer="l2") ipfix.add_vpp_config() # template packet should arrive immediately @@ -482,12 +494,12 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0000") def test_L2onL2(self): - """ L2 data on L2 datapath""" + """L2 data on L2 datapath""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, layer='l2') + ipfix = VppCFLOW(test=self, layer="l2") ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() @@ -500,20 +512,21 @@ class Datapath(MethodHolder): # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 256: 8}) + self.verify_cflow_data_detail( + ipfix_decoder, capture, cflow, {2: "packets", 256: 8} + ) self.collector.get_capture(2) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") def test_L3onL2(self): - """ L3 data on L2 datapath""" + """L3 data on L2 datapath""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, layer='l3') + ipfix = VppCFLOW(test=self, layer="l3") ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() @@ -526,9 +539,12 @@ class Datapath(MethodHolder): # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 4: 17, - 8: 'src_ip', 12: 'dst_ip'}) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {2: "packets", 4: 17, 8: "src_ip", 12: "dst_ip"}, + ) self.collector.get_capture(3) @@ -536,12 +552,12 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0002") def test_L4onL2(self): - """ L4 data on L2 datapath""" + """L4 data on L2 datapath""" self.logger.info("FFP_TEST_START_0003") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, layer='l4') + ipfix = VppCFLOW(test=self, layer="l4") ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() @@ -554,8 +570,9 @@ class Datapath(MethodHolder): # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 7: 'sport', 11: 'dport'}) + self.verify_cflow_data_detail( + ipfix_decoder, capture, cflow, {2: "packets", 7: "sport", 11: "dport"} + ) self.collector.get_capture(3) @@ -563,12 +580,12 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0003") def test_templatesIp4(self): - """ verify templates on IP4 datapath""" + """verify templates on IP4 datapath""" self.logger.info("FFP_TEST_START_0000") self.pg_enable_capture(self.pg_interfaces) - ipfix = VppCFLOW(test=self, datapath='ip4') + ipfix = VppCFLOW(test=self, datapath="ip4") ipfix.add_vpp_config() # template packet should arrive immediately @@ -581,12 +598,12 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0000") def test_L2onIP4(self): - """ L2 data on IP4 datapath""" + """L2 data on IP4 datapath""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg4', layer='l2', datapath='ip4') + ipfix = VppCFLOW(test=self, intf="pg4", layer="l2", datapath="ip4") ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() @@ -599,8 +616,9 @@ class Datapath(MethodHolder): # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 256: 8}) + self.verify_cflow_data_detail( + ipfix_decoder, capture, cflow, {2: "packets", 256: 8} + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -609,12 +627,12 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0001") def test_L3onIP4(self): - """ L3 data on IP4 datapath""" + """L3 data on IP4 datapath""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg4', layer='l3', datapath='ip4') + ipfix = VppCFLOW(test=self, intf="pg4", layer="l3", datapath="ip4") ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() @@ -627,9 +645,12 @@ class Datapath(MethodHolder): # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {1: 'octets', 2: 'packets', - 8: 'src_ip', 12: 'dst_ip'}) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {1: "octets", 2: "packets", 8: "src_ip", 12: "dst_ip"}, + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -638,12 +659,12 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0002") def test_L4onIP4(self): - """ L4 data on IP4 datapath""" + """L4 data on IP4 datapath""" self.logger.info("FFP_TEST_START_0003") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg4', layer='l4', datapath='ip4') + ipfix = VppCFLOW(test=self, intf="pg4", layer="l4", datapath="ip4") ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() @@ -656,8 +677,9 @@ class Datapath(MethodHolder): # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 7: 'sport', 11: 'dport'}) + self.verify_cflow_data_detail( + ipfix_decoder, capture, cflow, {2: "packets", 7: "sport", 11: "dport"} + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -666,11 +688,11 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0003") def test_templatesIP6(self): - """ verify templates on IP6 datapath""" + """verify templates on IP6 datapath""" self.logger.info("FFP_TEST_START_0000") self.pg_enable_capture(self.pg_interfaces) - ipfix = VppCFLOW(test=self, datapath='ip6') + ipfix = VppCFLOW(test=self, datapath="ip6") ipfix.add_vpp_config() # template packet should arrive immediately @@ -682,28 +704,27 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0000") def test_L2onIP6(self): - """ L2 data on IP6 datapath""" + """L2 data on IP6 datapath""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg6', layer='l2', datapath='ip6') + ipfix = VppCFLOW(test=self, intf="pg6", layer="l2", datapath="ip6") ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately templates = ipfix.verify_templates(ipfix_decoder, count=1) - self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, - ip_ver='IPv6') + self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6") capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6) # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 256: 56710}, - ip_ver='v6') + self.verify_cflow_data_detail( + ipfix_decoder, capture, cflow, {2: "packets", 256: 56710}, ip_ver="v6" + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -712,29 +733,31 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0001") def test_L3onIP6(self): - """ L3 data on IP6 datapath""" + """L3 data on IP6 datapath""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg6', layer='l3', datapath='ip6') + ipfix = VppCFLOW(test=self, intf="pg6", layer="l3", datapath="ip6") ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately templates = ipfix.verify_templates(ipfix_decoder, count=1) - self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, - ip_ver='IPv6') + self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6") capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6) # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', - 27: 'src_ip', 28: 'dst_ip'}, - ip_ver='v6') + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {2: "packets", 27: "src_ip", 28: "dst_ip"}, + ip_ver="v6", + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -743,28 +766,31 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0002") def test_L4onIP6(self): - """ L4 data on IP6 datapath""" + """L4 data on IP6 datapath""" self.logger.info("FFP_TEST_START_0003") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg6', layer='l4', datapath='ip6') + ipfix = VppCFLOW(test=self, intf="pg6", layer="l4", datapath="ip6") ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately templates = ipfix.verify_templates(ipfix_decoder, count=1) - self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, - ip_ver='IPv6') + self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6") capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6) # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 7: 'sport', 11: 'dport'}, - ip_ver='v6') + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {2: "packets", 7: "sport", 11: "dport"}, + ip_ver="v6", + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -773,7 +799,7 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0003") def test_0001(self): - """ no timers, one CFLOW packet, 9 Flows inside""" + """no timers, one CFLOW packet, 9 Flows inside""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] @@ -798,7 +824,7 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0001") def test_0002(self): - """ no timers, two CFLOW packets (mtu=256), 3 Flows in each""" + """no timers, two CFLOW packets (mtu=256), 3 Flows in each""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] @@ -817,10 +843,8 @@ class Datapath(MethodHolder): # make sure the one packet we expect actually showed up cflows = [] self.vapi.ipfix_flush() - cflows.append(self.wait_for_cflow_packet(self.collector, - templates[1])) - cflows.append(self.wait_for_cflow_packet(self.collector, - templates[1])) + cflows.append(self.wait_for_cflow_packet(self.collector, templates[1])) + cflows.append(self.wait_for_cflow_packet(self.collector, templates[1])) self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows) self.collector.get_capture(5) @@ -841,7 +865,7 @@ class DisableIPFIX(MethodHolder): super(DisableIPFIX, cls).tearDownClass() def test_0001(self): - """ disable IPFIX after first packets""" + """disable IPFIX after first packets""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] @@ -889,8 +913,7 @@ class ReenableIPFIX(MethodHolder): super(ReenableIPFIX, cls).tearDownClass() def test_0011(self): - """ disable IPFIX after first packets and re-enable after few packets - """ + """disable IPFIX after first packets and re-enable after few packets""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] @@ -957,7 +980,7 @@ class DisableFP(MethodHolder): super(DisableFP, cls).tearDownClass() def test_0001(self): - """ disable flowprobe feature after first packets""" + """disable flowprobe feature after first packets""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] @@ -1004,8 +1027,8 @@ class ReenableFP(MethodHolder): super(ReenableFP, cls).tearDownClass() def test_0001(self): - """ disable flowprobe feature after first packets and re-enable - after few packets """ + """disable flowprobe feature after first packets and re-enable + after few packets""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] @@ -1054,5 +1077,5 @@ class ReenableFP(MethodHolder): self.logger.info("FFP_TEST_FINISH_0001") -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) -- cgit 1.2.3-korg