1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
from config import config
from asfframework import VppTestRunner
import unittest
from framework import VppTestCase
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP
from random import randint
import ctypes
def create_stream(src_if, dst_if, count):
packets = []
for i in range(count):
p = (
Ether(dst=src_if.local_mac, src=src_if.remote_mac)
/ IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
/ UDP(sport=randint(49152, 65535), dport=5678)
)
packets.append(p)
return packets
@unittest.skipIf(
"tracedump" in config.excluded_plugins, "Exclude tracedump plugin tests"
)
class TestTracedump(VppTestCase):
"""Tracedump plugin tests"""
@classmethod
def setUpClass(cls):
super(TestTracedump, cls).setUpClass()
cls.create_pg_interfaces(range(2))
for i in cls.pg_interfaces:
i.admin_up()
i.config_ip4()
i.resolve_arp()
@classmethod
def tearDownClass(cls):
for i in cls.pg_interfaces:
i.unconfig_ip4()
i.admin_down()
super(TestTracedump, cls).tearDownClass()
def test_tracedump_include(self):
"""Check API/CLI output + include node"""
packets = create_stream(self.pg0, self.pg1, 5)
self.pg0.add_stream(packets)
self.vapi.trace_clear_cache()
self.vapi.trace_clear_capture()
# pg-input node = 425
self.vapi.trace_set_filters(flag=1, node_index=425, count=5)
self.vapi.trace_capture_packets(
node_index=425,
max_packets=5,
use_filter=True,
verbose=True,
pre_capture_clear=True,
)
reply = self.vapi.cli(
"show graph node want_arcs input drop output punt handoff no_free polling interrupt"
)
self.assertIn("af-packet-input", reply)
self.pg_start()
reply = self.vapi.graph_node_get(cursor=ctypes.c_uint32(~0).value, index=425)
self.assertTrue(reply[1][0].name == "pg-input")
reply = self.vapi.trace_v2_dump(
thread_id=ctypes.c_uint32(~0).value, position=0, clear_cache=False
)
self.assertTrue(reply)
reply = self.vapi.trace_filter_function_dump()
self.assertTrue(reply[1].selected)
def test_tracedump_exclude(self):
"""Exclude node (no trace output)"""
self.vapi.trace_clear_cache()
self.vapi.trace_clear_capture()
packets = create_stream(self.pg0, self.pg1, 5)
self.pg0.add_stream(packets)
# exclude node
self.vapi.trace_set_filters(flag=2, node_index=425, count=5)
self.vapi.trace_capture_packets(
node_index=425,
max_packets=5,
use_filter=True,
verbose=True,
pre_capture_clear=True,
)
self.pg_start()
reply = self.vapi.trace_v2_dump(thread_id=0, position=1, clear_cache=False)
self.assertFalse(reply)
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)
|