summaryrefslogtreecommitdiffstats
path: root/test/test_tracedump.py
blob: d58d15a2c348000e470badc08a9e6bc88db14b67 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
from config import config
from asfframework import VppTestRunner
import unittest
from framework import VppTestCase
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
from random import randint
import ctypes


def create_stream(src_if, dst_if, count):
    packets = []
    for i in range(count):
        p = (
            Ether(dst=src_if.local_mac, src=src_if.remote_mac)
            / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
            / UDP(sport=randint(49152, 65535), dport=5678)
        )
        packets.append(p)

    return packets


@unittest.skipIf(
    "tracedump" in config.excluded_plugins, "Exclude tracedump plugin tests"
)
class TestTracedump(VppTestCase):
    """Tracedump plugin tests"""

    @classmethod
    def setUpClass(cls):
        super(TestTracedump, cls).setUpClass()
        cls.create_pg_interfaces(range(2))
        for i in cls.pg_interfaces:
            i.admin_up()
            i.config_ip4()
            i.resolve_arp()

    @classmethod
    def tearDownClass(cls):
        for i in cls.pg_interfaces:
            i.unconfig_ip4()
            i.admin_down()
        super(TestTracedump, cls).tearDownClass()

    def test_tracedump_include(self):
        """Check API/CLI output + include node"""
        packets = create_stream(self.pg0, self.pg1, 5)
        self.pg0.add_stream(packets)

        self.vapi.trace_clear_cache()
        self.vapi.trace_clear_capture()
        # get pg-input node index
        reply = self.vapi.graph_node_get(
            cursor=0xFFFFFFFF,
            index=0xFFFFFFFF,
            name="pg-input",
        )
        self.assertTrue(reply[1][0].name == "pg-input")
        pg_input_index = reply[1][0].index
        self.vapi.trace_set_filters(flag=1, node_index=pg_input_index, count=5)
        self.vapi.trace_capture_packets(
            node_index=pg_input_index,
            max_packets=5,
            use_filter=True,
            verbose=True,
            pre_capture_clear=True,
        )

        reply = self.vapi.cli(
            "show graph node want_arcs input drop output punt handoff no_free polling interrupt"
        )
        self.assertIn("af-packet-input", reply)

        self.pg_start()
        reply = self.vapi.trace_v2_dump(
            thread_id=0xFFFFFFFF, position=0, clear_cache=False
        )
        self.assertTrue(reply)
        reply = self.vapi.trace_filter_function_dump()
        self.assertTrue(reply[1].selected)

    def test_tracedump_exclude(self):
        """Exclude node (no trace output)"""
        self.vapi.trace_clear_cache()
        self.vapi.trace_clear_capture()

        packets = create_stream(self.pg0, self.pg1, 5)
        self.pg0.add_stream(packets)

        # exclude node
        reply = self.vapi.graph_node_get(
            cursor=0xFFFFFFFF,
            index=0xFFFFFFFF,
            name="pg-input",
        )
        self.assertTrue(reply[1][0].name == "pg-input")
        pg_input_index = reply[1][0].index
        self.vapi.trace_set_filters(flag=2, node_index=pg_input_index, count=5)
        self.vapi.trace_capture_packets(
            node_index=pg_input_index,
            max_packets=5,
            use_filter=True,
            verbose=True,
            pre_capture_clear=True,
        )
        self.pg_start()
        reply = self.vapi.trace_v2_dump(thread_id=0, position=1, clear_cache=False)
        self.assertFalse(reply)


if __name__ == "__main__":
    unittest.main(testRunner=VppTestRunner)