summaryrefslogtreecommitdiffstats
path: root/test/test_flowprobe.py
diff options
context:
space:
mode:
authorKlement Sekera <klement.sekera@gmail.com>2022-04-26 19:02:15 +0200
committerOle Tr�an <otroan@employees.org>2022-05-10 18:52:08 +0000
commitd9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch)
tree4f786cfd8ebc2443cb11e11b74c8657204068898 /test/test_flowprobe.py
parentf90348bcb4afd0af2611cefc43b17ef3042b511c (diff)
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera <klement.sekera@gmail.com> Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/test_flowprobe.py')
-rw-r--r--test/test_flowprobe.py307
1 files changed, 165 insertions, 142 deletions
diff --git a/test/test_flowprobe.py b/test/test_flowprobe.py
index 5bafd39eceb..6b271790f76 100644
--- a/test/test_flowprobe.py
+++ b/test/test_flowprobe.py
@@ -29,17 +29,26 @@ from vpp_papi import VppEnum
class VppCFLOW(VppObject):
"""CFLOW object for IPFIX exporter and Flowprobe feature"""
- def __init__(self, test, intf='pg2', active=0, passive=0, timeout=100,
- mtu=1024, datapath='l2', layer='l2 l3 l4'):
+ def __init__(
+ self,
+ test,
+ intf="pg2",
+ active=0,
+ passive=0,
+ timeout=100,
+ mtu=1024,
+ datapath="l2",
+ layer="l2 l3 l4",
+ ):
self._test = test
self._intf = intf
self._active = active
if passive == 0 or passive < active:
- self._passive = active+1
+ self._passive = active + 1
else:
self._passive = passive
- self._datapath = datapath # l2 ip4 ip6
- self._collect = layer # l2 l3 l4
+ self._datapath = datapath # l2 ip4 ip6
+ self._collect = layer # l2 l3 l4
self._timeout = timeout
self._mtu = mtu
self._configured = False
@@ -49,18 +58,17 @@ class VppCFLOW(VppObject):
l2_flag = 0
l3_flag = 0
l4_flag = 0
- if 'l2' in self._collect.lower():
- l2_flag = (VppEnum.vl_api_flowprobe_record_flags_t.
- FLOWPROBE_RECORD_FLAG_L2)
- if 'l3' in self._collect.lower():
- l3_flag = (VppEnum.vl_api_flowprobe_record_flags_t.
- FLOWPROBE_RECORD_FLAG_L3)
- if 'l4' in self._collect.lower():
- l4_flag = (VppEnum.vl_api_flowprobe_record_flags_t.
- FLOWPROBE_RECORD_FLAG_L4)
+ if "l2" in self._collect.lower():
+ l2_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L2
+ if "l3" in self._collect.lower():
+ l3_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L3
+ if "l4" in self._collect.lower():
+ l4_flag = VppEnum.vl_api_flowprobe_record_flags_t.FLOWPROBE_RECORD_FLAG_L4
self._test.vapi.flowprobe_params(
record_flags=(l2_flag | l3_flag | l4_flag),
- active_timer=self._active, passive_timer=self._passive)
+ active_timer=self._active,
+ passive_timer=self._passive,
+ )
self.enable_flowprobe_feature()
self._test.vapi.cli("ipfix flush")
self._configured = True
@@ -76,18 +84,21 @@ class VppCFLOW(VppObject):
collector_address=self._test.pg0.remote_ip4,
src_address=self._test.pg0.local_ip4,
path_mtu=self._mtu,
- template_interval=self._timeout)
+ template_interval=self._timeout,
+ )
def enable_flowprobe_feature(self):
- self._test.vapi.ppcli("flowprobe feature add-del %s %s" %
- (self._intf, self._datapath))
+ self._test.vapi.ppcli(
+ "flowprobe feature add-del %s %s" % (self._intf, self._datapath)
+ )
def disable_exporter(self):
self._test.vapi.cli("set ipfix exporter collector 0.0.0.0")
def disable_flowprobe_feature(self):
- self._test.vapi.cli("flowprobe feature add-del %s %s disable" %
- (self._intf, self._datapath))
+ self._test.vapi.cli(
+ "flowprobe feature add-del %s %s disable" % (self._intf, self._datapath)
+ )
def object_id(self):
return "ipfix-collector-%s-%s" % (self._src, self.dst)
@@ -99,8 +110,7 @@ class VppCFLOW(VppObject):
templates = []
self._test.assertIn(count, (1, 2, 3))
for _ in range(count):
- p = self._test.wait_for_cflow_packet(self._test.collector, 2,
- timeout)
+ p = self._test.wait_for_cflow_packet(self._test.collector, 2, timeout)
self._test.assertTrue(p.haslayer(IPFIX))
if decoder is not None and p.haslayer(Template):
templates.append(p[Template].templateID)
@@ -109,7 +119,7 @@ class VppCFLOW(VppObject):
class MethodHolder(VppTestCase):
- """ Flow-per-packet plugin: test L2, IP4, IP6 reporting """
+ """Flow-per-packet plugin: test L2, IP4, IP6 reporting"""
# Test variables
debug_print = False
@@ -135,9 +145,11 @@ class MethodHolder(VppTestCase):
# and put interfaces to this BD
cls.vapi.bridge_domain_add_del(bd_id=1, uu_flood=1, learn=1)
cls.vapi.sw_interface_set_l2_bridge(
- rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1)
+ rx_sw_if_index=cls.pg1._sw_if_index, bd_id=1
+ )
cls.vapi.sw_interface_set_l2_bridge(
- rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1)
+ rx_sw_if_index=cls.pg2._sw_if_index, bd_id=1
+ )
# Set up all interfaces
for i in cls.pg_interfaces:
@@ -173,8 +185,9 @@ class MethodHolder(VppTestCase):
def tearDownClass(cls):
super(MethodHolder, cls).tearDownClass()
- def create_stream(self, src_if=None, dst_if=None, packets=None,
- size=None, ip_ver='v4'):
+ def create_stream(
+ self, src_if=None, dst_if=None, packets=None, size=None, ip_ver="v4"
+ ):
"""Create a packet stream to tickle the plugin
:param VppInterface src_if: Source interface for packet stream
@@ -194,7 +207,7 @@ class MethodHolder(VppTestCase):
info = self.create_packet_info(src_if, dst_if)
payload = self.info_to_payload(info)
p = Ether(src=src_if.remote_mac, dst=src_if.local_mac)
- if ip_ver == 'v4':
+ if ip_ver == "v4":
p /= IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
else:
p /= IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
@@ -226,16 +239,16 @@ class MethodHolder(VppTestCase):
self.pg_start()
return dst_if.get_capture(len(self.pkts))
- def verify_cflow_data_detail(self, decoder, capture, cflow,
- data_set={1: 'octets', 2: 'packets'},
- ip_ver='v4'):
+ def verify_cflow_data_detail(
+ self, decoder, capture, cflow, data_set={1: "octets", 2: "packets"}, ip_ver="v4"
+ ):
if self.debug_print:
print(capture[0].show())
if cflow.haslayer(Data):
data = decoder.decode_data_set(cflow.getlayer(Set))
if self.debug_print:
print(data)
- if ip_ver == 'v4':
+ if ip_ver == "v4":
ip_layer = capture[0][IP]
else:
ip_layer = capture[0][IPv6]
@@ -251,35 +264,31 @@ class MethodHolder(VppTestCase):
if field not in record.keys():
continue
value = data_set[field]
- if value == 'octets':
+ if value == "octets":
value = ip_layer.len
- if ip_ver == 'v6':
- value += 40 # ??? is this correct
- elif value == 'packets':
+ if ip_ver == "v6":
+ value += 40 # ??? is this correct
+ elif value == "packets":
value = 1
- elif value == 'src_ip':
- if ip_ver == 'v4':
- ip = socket.inet_pton(socket.AF_INET,
- ip_layer.src)
+ elif value == "src_ip":
+ if ip_ver == "v4":
+ ip = socket.inet_pton(socket.AF_INET, ip_layer.src)
else:
- ip = socket.inet_pton(socket.AF_INET6,
- ip_layer.src)
+ ip = socket.inet_pton(socket.AF_INET6, ip_layer.src)
value = int(binascii.hexlify(ip), 16)
- elif value == 'dst_ip':
- if ip_ver == 'v4':
- ip = socket.inet_pton(socket.AF_INET,
- ip_layer.dst)
+ elif value == "dst_ip":
+ if ip_ver == "v4":
+ ip = socket.inet_pton(socket.AF_INET, ip_layer.dst)
else:
- ip = socket.inet_pton(socket.AF_INET6,
- ip_layer.dst)
+ ip = socket.inet_pton(socket.AF_INET6, ip_layer.dst)
value = int(binascii.hexlify(ip), 16)
- elif value == 'sport':
+ elif value == "sport":
value = int(capture[0][UDP].sport)
- elif value == 'dport':
+ elif value == "dport":
value = int(capture[0][UDP].dport)
- self.assertEqual(int(binascii.hexlify(
- record[field]), 16),
- value)
+ self.assertEqual(
+ int(binascii.hexlify(record[field]), 16), value
+ )
def verify_cflow_data_notimer(self, decoder, capture, cflows):
idx = 0
@@ -292,14 +301,12 @@ class MethodHolder(VppTestCase):
for rec in data:
p = capture[idx]
idx += 1
- self.assertEqual(p[IP].len, int(
- binascii.hexlify(rec[1]), 16))
- self.assertEqual(1, int(
- binascii.hexlify(rec[2]), 16))
+ self.assertEqual(p[IP].len, int(binascii.hexlify(rec[1]), 16))
+ self.assertEqual(1, int(binascii.hexlify(rec[2]), 16))
self.assertEqual(len(capture), idx)
def wait_for_cflow_packet(self, collector_intf, set_id=2, timeout=1):
- """ wait for CFLOW packet and verify its correctness
+ """wait for CFLOW packet and verify its correctness
:param timeout: how long to wait
@@ -327,7 +334,7 @@ class Flowprobe(MethodHolder):
super(Flowprobe, cls).tearDownClass()
def test_0001(self):
- """ timer less than template timeout"""
+ """timer less than template timeout"""
self.logger.info("FFP_TEST_START_0001")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
@@ -351,7 +358,7 @@ class Flowprobe(MethodHolder):
self.logger.info("FFP_TEST_FINISH_0001")
def test_0002(self):
- """ timer greater than template timeout"""
+ """timer greater than template timeout"""
self.logger.info("FFP_TEST_START_0002")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
@@ -384,26 +391,33 @@ class Flowprobe(MethodHolder):
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, intf='pg8', datapath="ip4",
- layer='l2 l3 l4', active=2)
+ ipfix = VppCFLOW(
+ test=self, intf="pg8", datapath="ip4", layer="l2 l3 l4", active=2
+ )
ipfix.add_vpp_config()
- route_9001 = VppIpRoute(self, "9.0.0.0", 24,
- [VppRoutePath(self.pg8._remote_hosts[0].ip4,
- self.pg8.sw_if_index)])
+ route_9001 = VppIpRoute(
+ self,
+ "9.0.0.0",
+ 24,
+ [VppRoutePath(self.pg8._remote_hosts[0].ip4, self.pg8.sw_if_index)],
+ )
route_9001.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
templates = ipfix.verify_templates(ipfix_decoder, count=1)
- self.pkts = [(Ether(dst=self.pg7.local_mac,
- src=self.pg7.remote_mac) /
- IP(src=self.pg7.remote_ip4, dst="9.0.0.100") /
- TCP(sport=1234, dport=4321, flags=80) /
- Raw(b'\xa5' * 100))]
+ self.pkts = [
+ (
+ Ether(dst=self.pg7.local_mac, src=self.pg7.remote_mac)
+ / IP(src=self.pg7.remote_ip4, dst="9.0.0.100")
+ / TCP(sport=1234, dport=4321, flags=80)
+ / Raw(b"\xa5" * 100)
+ )
+ ]
nowUTC = int(time.time())
- nowUNIX = nowUTC+2208988800
+ nowUNIX = nowUTC + 2208988800
self.send_packets(src_if=self.pg7, dst_if=self.pg8)
cflow = self.wait_for_cflow_packet(self.collector, templates[0], 10)
@@ -435,11 +449,9 @@ class Flowprobe(MethodHolder):
# ethernet type
self.assertEqual(int(binascii.hexlify(record[256]), 16), 8)
# src ip
- self.assertEqual(inet_ntop(socket.AF_INET, record[8]),
- self.pg7.remote_ip4)
+ self.assertEqual(inet_ntop(socket.AF_INET, record[8]), self.pg7.remote_ip4)
# dst ip
- self.assertEqual(inet_ntop(socket.AF_INET, record[12]),
- "9.0.0.100")
+ self.assertEqual(inet_ntop(socket.AF_INET, record[12]), "9.0.0.100")
# protocol (TCP)
self.assertEqual(int(binascii.hexlify(record[4]), 16), 6)
# src port
@@ -466,11 +478,11 @@ class Datapath(MethodHolder):
super(Datapath, cls).tearDownClass()
def test_templatesL2(self):
- """ verify template on L2 datapath"""
+ """verify template on L2 datapath"""
self.logger.info("FFP_TEST_START_0000")
self.pg_enable_capture(self.pg_interfaces)
- ipfix = VppCFLOW(test=self, layer='l2')
+ ipfix = VppCFLOW(test=self, layer="l2")
ipfix.add_vpp_config()
# template packet should arrive immediately
@@ -482,12 +494,12 @@ class Datapath(MethodHolder):
self.logger.info("FFP_TEST_FINISH_0000")
def test_L2onL2(self):
- """ L2 data on L2 datapath"""
+ """L2 data on L2 datapath"""
self.logger.info("FFP_TEST_START_0001")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, layer='l2')
+ ipfix = VppCFLOW(test=self, layer="l2")
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
@@ -500,20 +512,21 @@ class Datapath(MethodHolder):
# make sure the one packet we expect actually showed up
self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
- self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
- {2: 'packets', 256: 8})
+ self.verify_cflow_data_detail(
+ ipfix_decoder, capture, cflow, {2: "packets", 256: 8}
+ )
self.collector.get_capture(2)
ipfix.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0001")
def test_L3onL2(self):
- """ L3 data on L2 datapath"""
+ """L3 data on L2 datapath"""
self.logger.info("FFP_TEST_START_0002")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, layer='l3')
+ ipfix = VppCFLOW(test=self, layer="l3")
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
@@ -526,9 +539,12 @@ class Datapath(MethodHolder):
# make sure the one packet we expect actually showed up
self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
- self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
- {2: 'packets', 4: 17,
- 8: 'src_ip', 12: 'dst_ip'})
+ self.verify_cflow_data_detail(
+ ipfix_decoder,
+ capture,
+ cflow,
+ {2: "packets", 4: 17, 8: "src_ip", 12: "dst_ip"},
+ )
self.collector.get_capture(3)
@@ -536,12 +552,12 @@ class Datapath(MethodHolder):
self.logger.info("FFP_TEST_FINISH_0002")
def test_L4onL2(self):
- """ L4 data on L2 datapath"""
+ """L4 data on L2 datapath"""
self.logger.info("FFP_TEST_START_0003")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, layer='l4')
+ ipfix = VppCFLOW(test=self, layer="l4")
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
@@ -554,8 +570,9 @@ class Datapath(MethodHolder):
# make sure the one packet we expect actually showed up
self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
- self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
- {2: 'packets', 7: 'sport', 11: 'dport'})
+ self.verify_cflow_data_detail(
+ ipfix_decoder, capture, cflow, {2: "packets", 7: "sport", 11: "dport"}
+ )
self.collector.get_capture(3)
@@ -563,12 +580,12 @@ class Datapath(MethodHolder):
self.logger.info("FFP_TEST_FINISH_0003")
def test_templatesIp4(self):
- """ verify templates on IP4 datapath"""
+ """verify templates on IP4 datapath"""
self.logger.info("FFP_TEST_START_0000")
self.pg_enable_capture(self.pg_interfaces)
- ipfix = VppCFLOW(test=self, datapath='ip4')
+ ipfix = VppCFLOW(test=self, datapath="ip4")
ipfix.add_vpp_config()
# template packet should arrive immediately
@@ -581,12 +598,12 @@ class Datapath(MethodHolder):
self.logger.info("FFP_TEST_FINISH_0000")
def test_L2onIP4(self):
- """ L2 data on IP4 datapath"""
+ """L2 data on IP4 datapath"""
self.logger.info("FFP_TEST_START_0001")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, intf='pg4', layer='l2', datapath='ip4')
+ ipfix = VppCFLOW(test=self, intf="pg4", layer="l2", datapath="ip4")
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
@@ -599,8 +616,9 @@ class Datapath(MethodHolder):
# make sure the one packet we expect actually showed up
self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
- self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
- {2: 'packets', 256: 8})
+ self.verify_cflow_data_detail(
+ ipfix_decoder, capture, cflow, {2: "packets", 256: 8}
+ )
# expected two templates and one cflow packet
self.collector.get_capture(2)
@@ -609,12 +627,12 @@ class Datapath(MethodHolder):
self.logger.info("FFP_TEST_FINISH_0001")
def test_L3onIP4(self):
- """ L3 data on IP4 datapath"""
+ """L3 data on IP4 datapath"""
self.logger.info("FFP_TEST_START_0002")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, intf='pg4', layer='l3', datapath='ip4')
+ ipfix = VppCFLOW(test=self, intf="pg4", layer="l3", datapath="ip4")
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
@@ -627,9 +645,12 @@ class Datapath(MethodHolder):
# make sure the one packet we expect actually showed up
self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
- self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
- {1: 'octets', 2: 'packets',
- 8: 'src_ip', 12: 'dst_ip'})
+ self.verify_cflow_data_detail(
+ ipfix_decoder,
+ capture,
+ cflow,
+ {1: "octets", 2: "packets", 8: "src_ip", 12: "dst_ip"},
+ )
# expected two templates and one cflow packet
self.collector.get_capture(2)
@@ -638,12 +659,12 @@ class Datapath(MethodHolder):
self.logger.info("FFP_TEST_FINISH_0002")
def test_L4onIP4(self):
- """ L4 data on IP4 datapath"""
+ """L4 data on IP4 datapath"""
self.logger.info("FFP_TEST_START_0003")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, intf='pg4', layer='l4', datapath='ip4')
+ ipfix = VppCFLOW(test=self, intf="pg4", layer="l4", datapath="ip4")
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
@@ -656,8 +677,9 @@ class Datapath(MethodHolder):
# make sure the one packet we expect actually showed up
self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
- self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
- {2: 'packets', 7: 'sport', 11: 'dport'})
+ self.verify_cflow_data_detail(
+ ipfix_decoder, capture, cflow, {2: "packets", 7: "sport", 11: "dport"}
+ )
# expected two templates and one cflow packet
self.collector.get_capture(2)
@@ -666,11 +688,11 @@ class Datapath(MethodHolder):
self.logger.info("FFP_TEST_FINISH_0003")
def test_templatesIP6(self):
- """ verify templates on IP6 datapath"""
+ """verify templates on IP6 datapath"""
self.logger.info("FFP_TEST_START_0000")
self.pg_enable_capture(self.pg_interfaces)
- ipfix = VppCFLOW(test=self, datapath='ip6')
+ ipfix = VppCFLOW(test=self, datapath="ip6")
ipfix.add_vpp_config()
# template packet should arrive immediately
@@ -682,28 +704,27 @@ class Datapath(MethodHolder):
self.logger.info("FFP_TEST_FINISH_0000")
def test_L2onIP6(self):
- """ L2 data on IP6 datapath"""
+ """L2 data on IP6 datapath"""
self.logger.info("FFP_TEST_START_0001")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, intf='pg6', layer='l2', datapath='ip6')
+ ipfix = VppCFLOW(test=self, intf="pg6", layer="l2", datapath="ip6")
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
templates = ipfix.verify_templates(ipfix_decoder, count=1)
- self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1,
- ip_ver='IPv6')
+ self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
# make sure the one packet we expect actually showed up
self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
- self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
- {2: 'packets', 256: 56710},
- ip_ver='v6')
+ self.verify_cflow_data_detail(
+ ipfix_decoder, capture, cflow, {2: "packets", 256: 56710}, ip_ver="v6"
+ )
# expected two templates and one cflow packet
self.collector.get_capture(2)
@@ -712,29 +733,31 @@ class Datapath(MethodHolder):
self.logger.info("FFP_TEST_FINISH_0001")
def test_L3onIP6(self):
- """ L3 data on IP6 datapath"""
+ """L3 data on IP6 datapath"""
self.logger.info("FFP_TEST_START_0002")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, intf='pg6', layer='l3', datapath='ip6')
+ ipfix = VppCFLOW(test=self, intf="pg6", layer="l3", datapath="ip6")
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
templates = ipfix.verify_templates(ipfix_decoder, count=1)
- self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1,
- ip_ver='IPv6')
+ self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
# make sure the one packet we expect actually showed up
self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
- self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
- {2: 'packets',
- 27: 'src_ip', 28: 'dst_ip'},
- ip_ver='v6')
+ self.verify_cflow_data_detail(
+ ipfix_decoder,
+ capture,
+ cflow,
+ {2: "packets", 27: "src_ip", 28: "dst_ip"},
+ ip_ver="v6",
+ )
# expected two templates and one cflow packet
self.collector.get_capture(2)
@@ -743,28 +766,31 @@ class Datapath(MethodHolder):
self.logger.info("FFP_TEST_FINISH_0002")
def test_L4onIP6(self):
- """ L4 data on IP6 datapath"""
+ """L4 data on IP6 datapath"""
self.logger.info("FFP_TEST_START_0003")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
- ipfix = VppCFLOW(test=self, intf='pg6', layer='l4', datapath='ip6')
+ ipfix = VppCFLOW(test=self, intf="pg6", layer="l4", datapath="ip6")
ipfix.add_vpp_config()
ipfix_decoder = IPFIXDecoder()
# template packet should arrive immediately
templates = ipfix.verify_templates(ipfix_decoder, count=1)
- self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1,
- ip_ver='IPv6')
+ self.create_stream(src_if=self.pg5, dst_if=self.pg6, packets=1, ip_ver="IPv6")
capture = self.send_packets(src_if=self.pg5, dst_if=self.pg6)
# make sure the one packet we expect actually showed up
self.vapi.ipfix_flush()
cflow = self.wait_for_cflow_packet(self.collector, templates[0])
- self.verify_cflow_data_detail(ipfix_decoder, capture, cflow,
- {2: 'packets', 7: 'sport', 11: 'dport'},
- ip_ver='v6')
+ self.verify_cflow_data_detail(
+ ipfix_decoder,
+ capture,
+ cflow,
+ {2: "packets", 7: "sport", 11: "dport"},
+ ip_ver="v6",
+ )
# expected two templates and one cflow packet
self.collector.get_capture(2)
@@ -773,7 +799,7 @@ class Datapath(MethodHolder):
self.logger.info("FFP_TEST_FINISH_0003")
def test_0001(self):
- """ no timers, one CFLOW packet, 9 Flows inside"""
+ """no timers, one CFLOW packet, 9 Flows inside"""
self.logger.info("FFP_TEST_START_0001")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
@@ -798,7 +824,7 @@ class Datapath(MethodHolder):
self.logger.info("FFP_TEST_FINISH_0001")
def test_0002(self):
- """ no timers, two CFLOW packets (mtu=256), 3 Flows in each"""
+ """no timers, two CFLOW packets (mtu=256), 3 Flows in each"""
self.logger.info("FFP_TEST_START_0002")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
@@ -817,10 +843,8 @@ class Datapath(MethodHolder):
# make sure the one packet we expect actually showed up
cflows = []
self.vapi.ipfix_flush()
- cflows.append(self.wait_for_cflow_packet(self.collector,
- templates[1]))
- cflows.append(self.wait_for_cflow_packet(self.collector,
- templates[1]))
+ cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
+ cflows.append(self.wait_for_cflow_packet(self.collector, templates[1]))
self.verify_cflow_data_notimer(ipfix_decoder, capture, cflows)
self.collector.get_capture(5)
@@ -841,7 +865,7 @@ class DisableIPFIX(MethodHolder):
super(DisableIPFIX, cls).tearDownClass()
def test_0001(self):
- """ disable IPFIX after first packets"""
+ """disable IPFIX after first packets"""
self.logger.info("FFP_TEST_START_0001")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
@@ -889,8 +913,7 @@ class ReenableIPFIX(MethodHolder):
super(ReenableIPFIX, cls).tearDownClass()
def test_0011(self):
- """ disable IPFIX after first packets and re-enable after few packets
- """
+ """disable IPFIX after first packets and re-enable after few packets"""
self.logger.info("FFP_TEST_START_0001")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
@@ -957,7 +980,7 @@ class DisableFP(MethodHolder):
super(DisableFP, cls).tearDownClass()
def test_0001(self):
- """ disable flowprobe feature after first packets"""
+ """disable flowprobe feature after first packets"""
self.logger.info("FFP_TEST_START_0001")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
@@ -1004,8 +1027,8 @@ class ReenableFP(MethodHolder):
super(ReenableFP, cls).tearDownClass()
def test_0001(self):
- """ disable flowprobe feature after first packets and re-enable
- after few packets """
+ """disable flowprobe feature after first packets and re-enable
+ after few packets"""
self.logger.info("FFP_TEST_START_0001")
self.pg_enable_capture(self.pg_interfaces)
self.pkts = []
@@ -1054,5 +1077,5 @@ class ReenableFP(MethodHolder):
self.logger.info("FFP_TEST_FINISH_0001")
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)