aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_tracedump.py
blob: b2346d71f47db6722544c4b01a353050378838a0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
from config import config
from asfframework import VppTestRunner
import unittest
from framework import VppTestCase
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
from random import randint
import ctypes


def create_stream(src_if, dst_if, count):
    packets = []
    for i in range(count):
        p = (
            Ether(dst=src_if.local_mac, src=src_if.remote_mac)
            / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
            / UDP(sport=randint(49152, 65535), dport=5678)
        )
        packets.append(p)

    return packets


@unittest.skipIf(
    "tracedump" in config.excluded_plugins, "Exclude tracedump plugin tests"
)
class TestTracedump(VppTestCase):
    """Tracedump plugin tests"""

    @classmethod
    def setUpClass(cls):
        super(TestTracedump, cls).setUpClass()
        cls.create_pg_interfaces(range(2))
        for i in cls.pg_interfaces:
            i.admin_up()
            i.config_ip4()
            i.resolve_arp()

    @classmethod
    def tearDownClass(cls):
        for i in cls.pg_interfaces:
            i.unconfig_ip4()
            i.admin_down()
        super(TestTracedump, cls).tearDownClass()

    def test_tracedump_include(self):
        """Check API/CLI output + include node"""
        packets = create_stream(self.pg0, self.pg1, 5)
        self.pg0.add_stream(packets)

        self.vapi.trace_clear_cache()
        self.vapi.trace_clear_capture()
        # pg-input node = 425
        self.vapi.trace_set_filters(flag=1, node_index=425, count=5)
        self.vapi.trace_capture_packets(
            node_index=425,
            max_packets=5,
            use_filter=True,
            verbose=True,
            pre_capture_clear=True,
        )

        reply = self.vapi.cli(
            "show graph node want_arcs input drop output punt handoff no_free polling interrupt"
        )
        self.assertIn("af-packet-input", reply)

        self.pg_start()
        reply = self.vapi.graph_node_get(cursor=ctypes.c_uint32(~0).value, index=425)
        self.assertTrue(reply[1][0].name == "pg-input")
        reply = self.vapi.trace_v2_dump(
            thread_id=ctypes.c_uint32(~0).value, position=0, clear_cache=False
        )
        self.assertTrue(reply)
        reply = self.vapi.trace_filter_function_dump()
        self.assertTrue(reply[1].selected)

    def test_tracedump_exclude(self):
        """Exclude node (no trace output)"""
        self.vapi.trace_clear_cache()
        self.vapi.trace_clear_capture()

        packets = create_stream(self.pg0, self.pg1, 5)
        self.pg0.add_stream(packets)

        # exclude node
        self.vapi.trace_set_filters(flag=2, node_index=425, count=5)
        self.vapi.trace_capture_packets(
            node_index=425,
            max_packets=5,
            use_filter=True,
            verbose=True,
            pre_capture_clear=True,
        )
        self.pg_start()
        reply = self.vapi.trace_v2_dump(thread_id=0, position=1, clear_cache=False)
        self.assertFalse(reply)


if __name__ == "__main__":
    unittest.main(testRunner=VppTestRunner)