aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_flowprobe.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_flowprobe.py')
-rw-r--r--test/test_flowprobe.py70
1 files changed, 68 insertions, 2 deletions
diff --git a/test/test_flowprobe.py b/test/test_flowprobe.py
index 1d565dcf0a4..19571f71235 100644
--- a/test/test_flowprobe.py
+++ b/test/test_flowprobe.py
@@ -11,6 +11,7 @@ from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, TCP, UDP
from scapy.layers.inet6 import IPv6
+from scapy.contrib.lacp import SlowProtocol, LACP
from config import config
from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, tag_fixme_debian11
@@ -273,9 +274,9 @@ class MethodHolder(VppTestCase):
if self.debug_print:
print(data)
if ip_ver == "v4":
- ip_layer = capture[0][IP]
+ ip_layer = capture[0][IP] if capture[0].haslayer(IP) else None
else:
- ip_layer = capture[0][IPv6]
+ ip_layer = capture[0][IPv6] if capture[0].haslayer(IPv6) else None
if data_set is not None:
for record in data:
# skip flow if ingress/egress interface is 0
@@ -682,6 +683,71 @@ class DatapathTestsHolder(object):
ipfix.remove_vpp_config()
self.logger.info("FFP_TEST_FINISH_0002")
+ def test_L23onL2(self):
+ """L2/3 data on L2 datapath"""
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pkts = []
+
+ ipfix = VppCFLOW(
+ test=self, intf=self.intf1, layer="l2 l3", direction=self.direction
+ )
+ ipfix.add_vpp_config()
+
+ ipfix_decoder = IPFIXDecoder()
+ # template packet should arrive immediately
+ templates = ipfix.verify_templates(ipfix_decoder, count=3)
+
+ # verify IPv4 and IPv6 flows
+ for ip_ver in ("v4", "v6"):
+ self.create_stream(packets=1, ip_ver=ip_ver)
+ capture = self.send_packets()
+
+ # make sure the one packet we expect actually showed up
+ self.vapi.ipfix_flush()
+ cflow = self.wait_for_cflow_packet(
+ self.collector, templates[1 if ip_ver == "v4" else 2]
+ )
+ src_ip_id = 8 if ip_ver == "v4" else 27
+ dst_ip_id = 12 if ip_ver == "v4" else 28
+ self.verify_cflow_data_detail(
+ ipfix_decoder,
+ capture,
+ cflow,
+ {
+ 2: "packets",
+ 256: 8 if ip_ver == "v4" else 56710,
+ 4: 17,
+ src_ip_id: "src_ip",
+ dst_ip_id: "dst_ip",
+ 61: (self.direction == "tx"),
+ },
+ ip_ver=ip_ver,
+ )
+
+ # verify non-IP flow
+ self.pkts = [
+ (
+ Ether(dst=self.pg2.local_mac, src=self.pg1.remote_mac)
+ / SlowProtocol()
+ / LACP()
+ )
+ ]
+ capture = self.send_packets()
+
+ # make sure the one packet we expect actually showed up
+ self.vapi.ipfix_flush()
+ cflow = self.wait_for_cflow_packet(self.collector, templates[0])
+ self.verify_cflow_data_detail(
+ ipfix_decoder,
+ capture,
+ cflow,
+ {2: "packets", 256: 2440, 61: (self.direction == "tx")},
+ )
+
+ self.collector.get_capture(6)
+
+ ipfix.remove_vpp_config()
+
def test_L4onL2(self):
"""L4 data on L2 datapath"""
self.logger.info("FFP_TEST_START_0003")