aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_bpf_trace_filter.py
blob: 9ee74db681e94d73f4fe179b22345d638a0bb357 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from framework import VppTestCase
from asfframework import VppTestRunner
import unittest
from config import config
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
from random import randint


@unittest.skipIf(
    "bpf_trace_filter" in config.excluded_plugins,
    "Exclude BPF Trace Filter plugin tests",
)
class TestBpfTraceFilter(VppTestCase):
    """BPF Trace filter test"""

    @classmethod
    def setUpClass(cls):
        super(TestBpfTraceFilter, cls).setUpClass()
        try:
            cls.create_pg_interfaces(range(2))
            for i in cls.pg_interfaces:
                i.config_ip4()
                i.resolve_arp()
                i.admin_up()
        except Exception:
            cls.tearDownClass()
            raise

    @classmethod
    def tearDownClass(cls):
        for i in cls.pg_interfaces:
            i.unconfig_ip4()
            i.admin_down()
        super(TestBpfTraceFilter, cls).tearDownClass()

    # reset trace filter before each test
    def setUp(self):
        super(TestBpfTraceFilter, self).setUp()
        self.vapi.cli("set trace filter function vnet_is_packet_traced")
        self.vapi.cli("clear trace")

    def create_stream(self, src_if, dst_if, count):
        packets = []
        for i in range(count):
            info = self.create_packet_info(src_if, dst_if)
            payload = self.info_to_payload(info)
            p = (
                Ether(dst=src_if.local_mac, src=src_if.remote_mac)
                / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
                / UDP(sport=randint(49152, 65535), dport=5678)
            )
            info.data = p.copy()
            packets.append(p)
        return packets

    def test_bpf_trace_filter_cli(self):
        """BPF Trace filter test [CLI]"""
        self.vapi.cli("set bpf trace filter {{tcp}}")
        self.vapi.cli("set trace filter function bpf_trace_filter")

        packets = self.create_stream(self.pg0, self.pg1, 3)
        self.pg0.add_stream(packets)
        self.pg_start(traceFilter=True)

        # verify that bpf trace filter has been selected
        reply = self.vapi.cli("show trace filter function")
        self.assertIn(
            "(*) name:bpf_trace_filter", reply, "BPF Trace filter is not selected"
        )

        # verify that trace is empty
        reply = self.vapi.cli("show trace")
        self.assertIn(
            "No packets in trace buffer",
            reply,
            "Unexpected packets in the trace buffer",
        )

    def test_bpf_trace_filter_vapi(self):
        """BPF Trace filter test [VAPI]"""
        self.vapi.bpf_trace_filter_set(filter="tcp")
        self.vapi.trace_set_filter_function(filter_function_name="bpf_trace_filter")

        packets = self.create_stream(self.pg0, self.pg1, 3)
        self.pg0.add_stream(packets)
        self.pg_start(traceFilter=True)

        # verify that bpf trace filter has been selected
        reply = self.vapi.cli("show trace filter function")
        self.assertIn(
            "(*) name:bpf_trace_filter", reply, "BPF Trace filter is not selected"
        )

        # verify that trace is empty
        reply = self.vapi.cli("show trace")
        self.assertIn(
            "No packets in trace buffer",
            reply,
            "Unexpected packets in the trace buffer",
        )


if __name__ == "__main__":
    unittest.main(testRunner=VppTestRunner)