diff options
author | Klement Sekera <klement.sekera@gmail.com> | 2022-04-26 19:02:15 +0200 |
---|---|---|
committer | Ole Tr�an <otroan@employees.org> | 2022-05-10 18:52:08 +0000 |
commit | d9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch) | |
tree | 4f786cfd8ebc2443cb11e11b74c8657204068898 /test/test_flowprobe.py | |
parent | f90348bcb4afd0af2611cefc43b17ef3042b511c (diff) |
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is
much faster, stable PEP8 compliant code style checker offering also
automatic formatting. It aims to be very stable and produce smallest
diffs. It's used by many small and big projects.
Running checkstyle with black takes a few seconds with a terse output.
Thus, test-checkstyle-diff is no longer necessary.
Expand scope of checkstyle to all python files in the repo, replacing
test-checkstyle with checkstyle-python.
Also, fixstyle-python is now available for automatic style formatting.
Note: python virtualenv has been consolidated in test/Makefile,
test/requirements*.txt which will eventually be moved to a central
location. This is required to simply the automated generation of
docker executor images in the CI.
Type: improvement
Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/test_flowprobe.py')
-rw-r--r-- | test/test_flowprobe.py | 307 |
1 files changed, 165 insertions, 142 deletions
diff --git a/test/test_flowprobe.py b/test/test_flowprobe.py index 5bafd39eceb..6b271790f76 100644 --- a/test/test_flowprobe.py +++ b/test/test_flowprobe.py @@ -29,17 +29,26 @@ from vpp_papi import VppEnum class VppCFLOW(VppObject): """CFLOW object for IPFIX exporter and Flowprobe feature""" - def __init__(self, test, intf='pg2', active=0, passive=0, timeout=100, - mtu=1024, datapath='l2', layer='l2 l3 l4'): + def __init__( + self, + test, + intf="pg2", + active=0, + passive=0, + timeout=100, + mtu=1024, + datapath="l2", + layer="l2 l3 l4", + ): self._test = test self._intf = intf self._active = active if passive == 0 or passive < active: - self._passive = active+1 + self._passive = active + 1 else: self._passive = passive - self._datapath = datapath # l2 ip4 ip6 - self._collect = layer # l2 l3 l4 + self._datapath = datapath # l2 ip4 ip6 + self._collect = layer # l2 l3 l4 self._timeout = timeout self._mtu = mtu self._configured = False @@ -49,18 +58,17 @@ class VppCFLOW(VppObject): l2_flag = 0 l3_flag = 0 l4_flag = 0 - if 'l2' in self._collect.lower(): - l2_flag = (VppEnum.vl_api_flowprobe_record_flags_t. - FLOWPROBE_RECORD_FLAG_L2) - if 'l3' in self._collect.lower(): - l3_flag = (VppEnum.vl_api_flowprobe_record_flags_t. - FLOWPROBE_RECORD_FLAG_L3) - if 'l4' in self._collect.lower(): - l4_flag = (VppEnum.vl_api_flowprobe_record_flags_t. - FLOWPROBE_RECORD_FLAG_L4) + if "l2" in self._collect.lower(): + l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2 + if "l3" in self._collect.lower(): + l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3 + if "l4" in self._collect.lower(): + l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4 self._test.vapi.flowprobe_params( record_flags=(l2_flag | l3_flag | l4_flag), - active_timer=self._active, passive_timer=self._passive) + active_timer=self._active, + passive_timer=self._passive, + ) self.enable_flowprobe_feature() self._test.vapi.cli("ipfix flush") self._configured = True @@ -76,18 +84,21 @@ class VppCFLOW(VppObject): collector_address=self._test.pg0.remote_ip4, src_address=self._test.pg0.local_ip4, path_mtu=self._mtu, - template_interval=self._timeout) + template_interval=self._timeout, + ) def enable_flowprobe_feature(self): - self._test.vapi.ppcli("flowprobe feature add-del %s %s" % - (self._intf, self._datapath)) + self._test.vapi.ppcli( + "flowprobe feature add-del %s %s" % (self._intf, self._datapath) + ) def disable_exporter(self): self._test.vapi.cli("set ipfix exporter collector 0.0.0.0") def disable_flowprobe_feature(self): - self._test.vapi.cli("flowprobe feature add-del %s %s disable" % - (self._intf, self._datapath)) + self._test.vapi.cli( + "flowprobe feature add-del %s %s disable" % (self._intf, self._datapath) + ) def object_id(self): return "ipfix-collector-%s-%s" % (self._src, self.dst) @@ -99,8 +110,7 @@ class VppCFLOW(VppObject): templates = [] self._test.assertIn(count, (1, 2, 3)) for _ in range(count): - p = self._test.wait_for_cflow_packet(self._test.collector, 2, - timeout) + p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout) self._test.assertTrue(p.haslayer(IPFIX)) if decoder is not None and p.haslayer(Template): templates.append(p[Template].templateID) @@ -109,7 +119,7 @@ class VppCFLOW(VppObject): class MethodHolder(VppTestCase): - """ Flow-per-packet plugin: test L2, IP4, IP6 reporting """ + """Flow-per-packet plugin: test L2, IP4, IP6 reporting""" # Test variables debug_print = False @@ -135,9 +145,11 @@ class MethodHolder(VppTestCase): # and put interfaces to this BD cls.vapi.bridge_domain_add_del(bd_id=1, uu_flood=1, learn=1) cls.vapi.sw_interface_set_l2_bridge( - rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1) + rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1 + ) cls.vapi.sw_interface_set_l2_bridge( - rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1) + rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1 + ) # Set up all interfaces for i in cls.pg_interfaces: @@ -173,8 +185,9 @@ class MethodHolder(VppTestCase): def tearDownClass(cls): super(MethodHolder, cls).tearDownClass() - def create_stream(self, src_if=None, dst_if=None, packets=None, - size=None, ip_ver='v4'): + def create_stream( + self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4" + ): """Create a packet stream to tickle the plugin :param VppInterface src_if: Source interface for packet stream @@ -194,7 +207,7 @@ class MethodHolder(VppTestCase): info = self.create_packet_info(src_if, dst_if) payload = self.info_to_payload(info) p = Ether(src=src_if.remote_mac, dst=src_if.local_mac) - if ip_ver == 'v4': + if ip_ver == "v4": p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) else: p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) @@ -226,16 +239,16 @@ class MethodHolder(VppTestCase): self.pg_start() return dst_if.get_capture(len(self.pkts)) - def verify_cflow_data_detail(self, decoder, capture, cflow, - data_set={1: 'octets', 2: 'packets'}, - ip_ver='v4'): + def verify_cflow_data_detail( + self, decoder, capture, cflow, data_set={1: "octets", 2: "packets"}, ip_ver="v4" + ): if self.debug_print: print(capture[0].show()) if cflow.haslayer(Data): data = decoder.decode_data_set(cflow.getlayer(Set)) if self.debug_print: print(data) - if ip_ver == 'v4': + if ip_ver == "v4": ip_layer = capture[0][IP] else: ip_layer = capture[0][IPv6] @@ -251,35 +264,31 @@ class MethodHolder(VppTestCase): if field not in record.keys(): continue value = data_set[field] - if value == 'octets': + if value == "octets": value = ip_layer.len - if ip_ver == 'v6': - value += 40 # ??? is this correct - elif value == 'packets': + if ip_ver == "v6": + value += 40 # ??? is this correct + elif value == "packets": value = 1 - elif value == 'src_ip': - if ip_ver == 'v4': - ip = socket.inet_pton(socket.AF_INET, - ip_layer.src) + elif value == "src_ip": + if ip_ver == "v4": + ip = socket.inet_pton(socket.AF_INET, ip_layer.src) else: - ip = socket.inet_pton(socket.AF_INET6, - ip_layer.src) + ip = socket.inet_pton(socket.AF_INET6, ip_layer.src) value = int(binascii.hexlify(ip), 16) - elif value == 'dst_ip': - if ip_ver == 'v4': - ip = socket.inet_pton(socket.AF_INET, - ip_layer.dst) + elif value == "dst_ip": + if ip_ver == "v4": + ip = socket.inet_pton(socket.AF_INET, ip_layer.dst) else: - ip = socket.inet_pton(socket.AF_INET6, - ip_layer.dst) + ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst) value = int(binascii.hexlify(ip), 16) - elif value == 'sport': + elif value == "sport": value = int(capture[0][UDP].sport) - elif value == 'dport': + elif value == "dport": value = int(capture[0][UDP].dport) - self.assertEqual(int(binascii.hexlify( - record[field]), 16), - value) + self.assertEqual( + int(binascii.hexlify(record[field]), 16), value + ) def verify_cflow_data_notimer(self, decoder, capture, cflows): idx = 0 @@ -292,14 +301,12 @@ class MethodHolder(VppTestCase): for rec in data: p = capture[idx] idx += 1 - self.assertEqual(p[IP].len, int( - binascii.hexlify(rec[1]), 16)) - self.assertEqual(1, int( - binascii.hexlify(rec[2]), 16)) + self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16)) + self.assertEqual(1, int(binascii.hexlify(rec[2]), 16)) self.assertEqual(len(capture), idx) def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1): - """ wait for CFLOW packet and verify its correctness + """wait for CFLOW packet and verify its correctness :param timeout: how long to wait @@ -327,7 +334,7 @@ class Flowprobe(MethodHolder): super(Flowprobe, cls).tearDownClass() def test_0001(self): - """ timer less than template timeout""" + """timer less than template timeout""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] @@ -351,7 +358,7 @@ class Flowprobe(MethodHolder): self.logger.info("FFP_TEST_FINISH_0001") def test_0002(self): - """ timer greater than template timeout""" + """timer greater than template timeout""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] @@ -384,26 +391,33 @@ class Flowprobe(MethodHolder): self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg8', datapath="ip4", - layer='l2 l3 l4', active=2) + ipfix = VppCFLOW( + test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2 + ) ipfix.add_vpp_config() - route_9001 = VppIpRoute(self, "9.0.0.0", 24, - [VppRoutePath(self.pg8._remote_hosts[0].ip4, - self.pg8.sw_if_index)]) + route_9001 = VppIpRoute( + self, + "9.0.0.0", + 24, + [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)], + ) route_9001.add_vpp_config() ipfix_decoder = IPFIXDecoder() templates = ipfix.verify_templates(ipfix_decoder, count=1) - self.pkts = [(Ether(dst=self.pg7.local_mac, - src=self.pg7.remote_mac) / - IP(src=self.pg7.remote_ip4, dst="9.0.0.100") / - TCP(sport=1234, dport=4321, flags=80) / - Raw(b'\xa5' * 100))] + self.pkts = [ + ( + Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac) + / IP(src=self.pg7.remote_ip4, dst="9.0.0.100") + / TCP(sport=1234, dport=4321, flags=80) + / Raw(b"\xa5" * 100) + ) + ] nowUTC = int(time.time()) - nowUNIX = nowUTC+2208988800 + nowUNIX = nowUTC + 2208988800 self.send_packets(src_if=self.pg7, dst_if=self.pg8) cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10) @@ -435,11 +449,9 @@ class Flowprobe(MethodHolder): # ethernet type self.assertEqual(int(binascii.hexlify(record[256]), 16), 8) # src ip - self.assertEqual(inet_ntop(socket.AF_INET, record[8]), - self.pg7.remote_ip4) + self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4) # dst ip - self.assertEqual(inet_ntop(socket.AF_INET, record[12]), - "9.0.0.100") + self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100") # protocol (TCP) self.assertEqual(int(binascii.hexlify(record[4]), 16), 6) # src port @@ -466,11 +478,11 @@ class Datapath(MethodHolder): super(Datapath, cls).tearDownClass() def test_templatesL2(self): - """ verify template on L2 datapath""" + """verify template on L2 datapath""" self.logger.info("FFP_TEST_START_0000") self.pg_enable_capture(self.pg_interfaces) - ipfix = VppCFLOW(test=self, layer='l2') + ipfix = VppCFLOW(test=self, layer="l2") ipfix.add_vpp_config() # template packet should arrive immediately @@ -482,12 +494,12 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0000") def test_L2onL2(self): - """ L2 data on L2 datapath""" + """L2 data on L2 datapath""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, layer='l2') + ipfix = VppCFLOW(test=self, layer="l2") ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() @@ -500,20 +512,21 @@ class Datapath(MethodHolder): # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 256: 8}) + self.verify_cflow_data_detail( + ipfix_decoder, capture, cflow, {2: "packets", 256: 8} + ) self.collector.get_capture(2) ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0001") def test_L3onL2(self): - """ L3 data on L2 datapath""" + """L3 data on L2 datapath""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, layer='l3') + ipfix = VppCFLOW(test=self, layer="l3") ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() @@ -526,9 +539,12 @@ class Datapath(MethodHolder): # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 4: 17, - 8: 'src_ip', 12: 'dst_ip'}) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {2: "packets", 4: 17, 8: "src_ip", 12: "dst_ip"}, + ) self.collector.get_capture(3) @@ -536,12 +552,12 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0002") def test_L4onL2(self): - """ L4 data on L2 datapath""" + """L4 data on L2 datapath""" self.logger.info("FFP_TEST_START_0003") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, layer='l4') + ipfix = VppCFLOW(test=self, layer="l4") ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() @@ -554,8 +570,9 @@ class Datapath(MethodHolder): # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 7: 'sport', 11: 'dport'}) + self.verify_cflow_data_detail( + ipfix_decoder, capture, cflow, {2: "packets", 7: "sport", 11: "dport"} + ) self.collector.get_capture(3) @@ -563,12 +580,12 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0003") def test_templatesIp4(self): - """ verify templates on IP4 datapath""" + """verify templates on IP4 datapath""" self.logger.info("FFP_TEST_START_0000") self.pg_enable_capture(self.pg_interfaces) - ipfix = VppCFLOW(test=self, datapath='ip4') + ipfix = VppCFLOW(test=self, datapath="ip4") ipfix.add_vpp_config() # template packet should arrive immediately @@ -581,12 +598,12 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0000") def test_L2onIP4(self): - """ L2 data on IP4 datapath""" + """L2 data on IP4 datapath""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg4', layer='l2', datapath='ip4') + ipfix = VppCFLOW(test=self, intf="pg4", layer="l2", datapath="ip4") ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() @@ -599,8 +616,9 @@ class Datapath(MethodHolder): # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 256: 8}) + self.verify_cflow_data_detail( + ipfix_decoder, capture, cflow, {2: "packets", 256: 8} + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -609,12 +627,12 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0001") def test_L3onIP4(self): - """ L3 data on IP4 datapath""" + """L3 data on IP4 datapath""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg4', layer='l3', datapath='ip4') + ipfix = VppCFLOW(test=self, intf="pg4", layer="l3", datapath="ip4") ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() @@ -627,9 +645,12 @@ class Datapath(MethodHolder): # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {1: 'octets', 2: 'packets', - 8: 'src_ip', 12: 'dst_ip'}) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {1: "octets", 2: "packets", 8: "src_ip", 12: "dst_ip"}, + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -638,12 +659,12 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0002") def test_L4onIP4(self): - """ L4 data on IP4 datapath""" + """L4 data on IP4 datapath""" self.logger.info("FFP_TEST_START_0003") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg4', layer='l4', datapath='ip4') + ipfix = VppCFLOW(test=self, intf="pg4", layer="l4", datapath="ip4") ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() @@ -656,8 +677,9 @@ class Datapath(MethodHolder): # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 7: 'sport', 11: 'dport'}) + self.verify_cflow_data_detail( + ipfix_decoder, capture, cflow, {2: "packets", 7: "sport", 11: "dport"} + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -666,11 +688,11 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0003") def test_templatesIP6(self): - """ verify templates on IP6 datapath""" + """verify templates on IP6 datapath""" self.logger.info("FFP_TEST_START_0000") self.pg_enable_capture(self.pg_interfaces) - ipfix = VppCFLOW(test=self, datapath='ip6') + ipfix = VppCFLOW(test=self, datapath="ip6") ipfix.add_vpp_config() # template packet should arrive immediately @@ -682,28 +704,27 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0000") def test_L2onIP6(self): - """ L2 data on IP6 datapath""" + """L2 data on IP6 datapath""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg6', layer='l2', datapath='ip6') + ipfix = VppCFLOW(test=self, intf="pg6", layer="l2", datapath="ip6") ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately templates = ipfix.verify_templates(ipfix_decoder, count=1) - self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, - ip_ver='IPv6') + self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6") capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6) # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 256: 56710}, - ip_ver='v6') + self.verify_cflow_data_detail( + ipfix_decoder, capture, cflow, {2: "packets", 256: 56710}, ip_ver="v6" + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -712,29 +733,31 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0001") def test_L3onIP6(self): - """ L3 data on IP6 datapath""" + """L3 data on IP6 datapath""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg6', layer='l3', datapath='ip6') + ipfix = VppCFLOW(test=self, intf="pg6", layer="l3", datapath="ip6") ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately templates = ipfix.verify_templates(ipfix_decoder, count=1) - self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, - ip_ver='IPv6') + self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6") capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6) # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', - 27: 'src_ip', 28: 'dst_ip'}, - ip_ver='v6') + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {2: "packets", 27: "src_ip", 28: "dst_ip"}, + ip_ver="v6", + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -743,28 +766,31 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0002") def test_L4onIP6(self): - """ L4 data on IP6 datapath""" + """L4 data on IP6 datapath""" self.logger.info("FFP_TEST_START_0003") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] - ipfix = VppCFLOW(test=self, intf='pg6', layer='l4', datapath='ip6') + ipfix = VppCFLOW(test=self, intf="pg6", layer="l4", datapath="ip6") ipfix.add_vpp_config() ipfix_decoder = IPFIXDecoder() # template packet should arrive immediately templates = ipfix.verify_templates(ipfix_decoder, count=1) - self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, - ip_ver='IPv6') + self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6") capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6) # make sure the one packet we expect actually showed up self.vapi.ipfix_flush() cflow = self.wait_for_cflow_packet(self.collector, templates[0]) - self.verify_cflow_data_detail(ipfix_decoder, capture, cflow, - {2: 'packets', 7: 'sport', 11: 'dport'}, - ip_ver='v6') + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {2: "packets", 7: "sport", 11: "dport"}, + ip_ver="v6", + ) # expected two templates and one cflow packet self.collector.get_capture(2) @@ -773,7 +799,7 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0003") def test_0001(self): - """ no timers, one CFLOW packet, 9 Flows inside""" + """no timers, one CFLOW packet, 9 Flows inside""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] @@ -798,7 +824,7 @@ class Datapath(MethodHolder): self.logger.info("FFP_TEST_FINISH_0001") def test_0002(self): - """ no timers, two CFLOW packets (mtu=256), 3 Flows in each""" + """no timers, two CFLOW packets (mtu=256), 3 Flows in each""" self.logger.info("FFP_TEST_START_0002") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] @@ -817,10 +843,8 @@ class Datapath(MethodHolder): # make sure the one packet we expect actually showed up cflows = [] self.vapi.ipfix_flush() - cflows.append(self.wait_for_cflow_packet(self.collector, - templates[1])) - cflows.append(self.wait_for_cflow_packet(self.collector, - templates[1])) + cflows.append(self.wait_for_cflow_packet(self.collector, templates[1])) + cflows.append(self.wait_for_cflow_packet(self.collector, templates[1])) self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows) self.collector.get_capture(5) @@ -841,7 +865,7 @@ class DisableIPFIX(MethodHolder): super(DisableIPFIX, cls).tearDownClass() def test_0001(self): - """ disable IPFIX after first packets""" + """disable IPFIX after first packets""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] @@ -889,8 +913,7 @@ class ReenableIPFIX(MethodHolder): super(ReenableIPFIX, cls).tearDownClass() def test_0011(self): - """ disable IPFIX after first packets and re-enable after few packets - """ + """disable IPFIX after first packets and re-enable after few packets""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] @@ -957,7 +980,7 @@ class DisableFP(MethodHolder): super(DisableFP, cls).tearDownClass() def test_0001(self): - """ disable flowprobe feature after first packets""" + """disable flowprobe feature after first packets""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] @@ -1004,8 +1027,8 @@ class ReenableFP(MethodHolder): super(ReenableFP, cls).tearDownClass() def test_0001(self): - """ disable flowprobe feature after first packets and re-enable - after few packets """ + """disable flowprobe feature after first packets and re-enable + after few packets""" self.logger.info("FFP_TEST_START_0001") self.pg_enable_capture(self.pg_interfaces) self.pkts = [] @@ -1054,5 +1077,5 @@ class ReenableFP(MethodHolder): self.logger.info("FFP_TEST_FINISH_0001") -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) |