diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/test_flowprobe.py | 70 |
1 files changed, 68 insertions, 2 deletions
diff --git a/test/test_flowprobe.py b/test/test_flowprobe.py index 1d565dcf0a4..19571f71235 100644 --- a/test/test_flowprobe.py +++ b/test/test_flowprobe.py @@ -11,6 +11,7 @@ from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, TCP, UDP from scapy.layers.inet6 import IPv6 +from scapy.contrib.lacp import SlowProtocol, LACP from config import config from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11 @@ -273,9 +274,9 @@ class MethodHolder(VppTestCase): if self.debug_print: print(data) if ip_ver == "v4": - ip_layer = capture[0][IP] + ip_layer = capture[0][IP] if capture[0].haslayer(IP) else None else: - ip_layer = capture[0][IPv6] + ip_layer = capture[0][IPv6] if capture[0].haslayer(IPv6) else None if data_set is not None: for record in data: # skip flow if ingress/egress interface is 0 @@ -682,6 +683,71 @@ class DatapathTestsHolder(object): ipfix.remove_vpp_config() self.logger.info("FFP_TEST_FINISH_0002") + def test_L23onL2(self): + """L2/3 data on L2 datapath""" + self.pg_enable_capture(self.pg_interfaces) + self.pkts = [] + + ipfix = VppCFLOW( + test=self, intf=self.intf1, layer="l2 l3", direction=self.direction + ) + ipfix.add_vpp_config() + + ipfix_decoder = IPFIXDecoder() + # template packet should arrive immediately + templates = ipfix.verify_templates(ipfix_decoder, count=3) + + # verify IPv4 and IPv6 flows + for ip_ver in ("v4", "v6"): + self.create_stream(packets=1, ip_ver=ip_ver) + capture = self.send_packets() + + # make sure the one packet we expect actually showed up + self.vapi.ipfix_flush() + cflow = self.wait_for_cflow_packet( + self.collector, templates[1 if ip_ver == "v4" else 2] + ) + src_ip_id = 8 if ip_ver == "v4" else 27 + dst_ip_id = 12 if ip_ver == "v4" else 28 + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + { + 2: "packets", + 256: 8 if ip_ver == "v4" else 56710, + 4: 17, + src_ip_id: "src_ip", + dst_ip_id: "dst_ip", + 61: (self.direction == "tx"), + }, + ip_ver=ip_ver, + ) + + # verify non-IP flow + self.pkts = [ + ( + Ether(dst=self.pg2.local_mac, src=self.pg1.remote_mac) + / SlowProtocol() + / LACP() + ) + ] + capture = self.send_packets() + + # make sure the one packet we expect actually showed up + self.vapi.ipfix_flush() + cflow = self.wait_for_cflow_packet(self.collector, templates[0]) + self.verify_cflow_data_detail( + ipfix_decoder, + capture, + cflow, + {2: "packets", 256: 2440, 61: (self.direction == "tx")}, + ) + + self.collector.get_capture(6) + + ipfix.remove_vpp_config() + def test_L4onL2(self): """L4 data on L2 datapath""" self.logger.info("FFP_TEST_START_0003") |