aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoradrianvillin <avillin@cisco.com>2023-12-06 19:21:49 +0100
committerMohammed HAWARI <momohawari@gmail.com>2023-12-15 09:53:29 +0000
commitc6fe6174908d9a045a7c4f055a9f60f233618e87 (patch)
tree7688d39d1a5609ce14a4931acd3b9f5d12048d44
parent04d262d1eba969538950da7183cfa77ff3d70dff (diff)
tests: Added bpf trace filter plugin test
Type: test Change-Id: I026d9298fe1372d03f61b6ad57c82628bab4c831 Signed-off-by: adrianvillin <avillin@cisco.com>
-rw-r--r--test/framework.py4
-rw-r--r--test/test_bpf_trace_filter.py105
2 files changed, 107 insertions, 2 deletions
diff --git a/test/framework.py b/test/framework.py
index fbbc11218bf..6ff03d8b073 100644
--- a/test/framework.py
+++ b/test/framework.py
@@ -132,14 +132,14 @@ class VppTestCase(VppAsfTestCase):
cls._pcaps.append((intf, worker))
@classmethod
- def pg_start(cls, trace=True):
+ def pg_start(cls, trace=True, traceFilter=False):
"""Enable the PG, wait till it is done, then clean up"""
for intf, worker in cls._old_pcaps:
intf.remove_old_pcap_file(intf.get_in_path(worker))
cls._old_pcaps = []
if trace:
cls.vapi.cli("clear trace")
- cls.vapi.cli("trace add pg-input 1000")
+ cls.vapi.cli("trace add pg-input 1000" + (" filter" if traceFilter else ""))
cls.vapi.cli("packet-generator enable")
# PG, when starts, runs to completion -
# so let's avoid a race condition,
diff --git a/test/test_bpf_trace_filter.py b/test/test_bpf_trace_filter.py
new file mode 100644
index 00000000000..9ee74db681e
--- /dev/null
+++ b/test/test_bpf_trace_filter.py
@@ -0,0 +1,105 @@
+from framework import VppTestCase
+from asfframework import VppTestRunner
+import unittest
+from config import config
+from scapy.layers.l2 import Ether
+from scapy.layers.inet import IP, UDP
+from random import randint
+
+
+@unittest.skipIf(
+ "bpf_trace_filter" in config.excluded_plugins,
+ "Exclude BPF Trace Filter plugin tests",
+)
+class TestBpfTraceFilter(VppTestCase):
+ """BPF Trace filter test"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestBpfTraceFilter, cls).setUpClass()
+ try:
+ cls.create_pg_interfaces(range(2))
+ for i in cls.pg_interfaces:
+ i.config_ip4()
+ i.resolve_arp()
+ i.admin_up()
+ except Exception:
+ cls.tearDownClass()
+ raise
+
+ @classmethod
+ def tearDownClass(cls):
+ for i in cls.pg_interfaces:
+ i.unconfig_ip4()
+ i.admin_down()
+ super(TestBpfTraceFilter, cls).tearDownClass()
+
+ # reset trace filter before each test
+ def setUp(self):
+ super(TestBpfTraceFilter, self).setUp()
+ self.vapi.cli("set trace filter function vnet_is_packet_traced")
+ self.vapi.cli("clear trace")
+
+ def create_stream(self, src_if, dst_if, count):
+ packets = []
+ for i in range(count):
+ info = self.create_packet_info(src_if, dst_if)
+ payload = self.info_to_payload(info)
+ p = (
+ Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+ / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+ / UDP(sport=randint(49152, 65535), dport=5678)
+ )
+ info.data = p.copy()
+ packets.append(p)
+ return packets
+
+ def test_bpf_trace_filter_cli(self):
+ """BPF Trace filter test [CLI]"""
+ self.vapi.cli("set bpf trace filter {{tcp}}")
+ self.vapi.cli("set trace filter function bpf_trace_filter")
+
+ packets = self.create_stream(self.pg0, self.pg1, 3)
+ self.pg0.add_stream(packets)
+ self.pg_start(traceFilter=True)
+
+ # verify that bpf trace filter has been selected
+ reply = self.vapi.cli("show trace filter function")
+ self.assertIn(
+ "(*) name:bpf_trace_filter", reply, "BPF Trace filter is not selected"
+ )
+
+ # verify that trace is empty
+ reply = self.vapi.cli("show trace")
+ self.assertIn(
+ "No packets in trace buffer",
+ reply,
+ "Unexpected packets in the trace buffer",
+ )
+
+ def test_bpf_trace_filter_vapi(self):
+ """BPF Trace filter test [VAPI]"""
+ self.vapi.bpf_trace_filter_set(filter="tcp")
+ self.vapi.trace_set_filter_function(filter_function_name="bpf_trace_filter")
+
+ packets = self.create_stream(self.pg0, self.pg1, 3)
+ self.pg0.add_stream(packets)
+ self.pg_start(traceFilter=True)
+
+ # verify that bpf trace filter has been selected
+ reply = self.vapi.cli("show trace filter function")
+ self.assertIn(
+ "(*) name:bpf_trace_filter", reply, "BPF Trace filter is not selected"
+ )
+
+ # verify that trace is empty
+ reply = self.vapi.cli("show trace")
+ self.assertIn(
+ "No packets in trace buffer",
+ reply,
+ "Unexpected packets in the trace buffer",
+ )
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)