aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_bufmon.py
blob: 6d7f2f6da65766a8722da2f841161a9927fa8b4b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
from framework import VppTestCase, VppTestRunner
import unittest
from config import config
from scapy.layers.l2 import Ether
from scapy.packet import Raw
from scapy.layers.inet import IP, UDP
from random import randint
from util import ppp


@unittest.skipIf("bufmon" in config.excluded_plugins, "Exclude bufmon plugin tests")
class TestBufmon(VppTestCase):
    """bufmon plugin test"""

    @classmethod
    def setUpClass(cls):
        super(TestBufmon, cls).setUpClass()
        try:
            cls.create_pg_interfaces(range(2))
            for i in cls.pg_interfaces:
                i.config_ip4()
                i.resolve_arp()
                i.admin_up()
        except Exception:
            cls.tearDownClass()
            raise

    @classmethod
    def tearDownClass(cls):
        for i in cls.pg_interfaces:
            i.unconfig_ip4()
            i.admin_down()
        super(TestBufmon, cls).tearDownClass()

    # https://fd.io/docs/vpp/master/developer/tests/overview.html#example-how-to-add-a-new-test
    def create_stream(self, src_if, dst_if, count):
        packets = []
        for i in range(count):
            info = self.create_packet_info(src_if, dst_if)
            payload = self.info_to_payload(info)
            p = (
                Ether(dst=src_if.local_mac, src=src_if.remote_mac)
                / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
                / UDP(sport=randint(49152, 65535), dport=5678)
                / Raw(payload)
            )
            info.data = p.copy()
            packets.append(p)
        return packets

    def verify_capture(self, src_if, dst_if, capture):
        packet_info = None
        for packet in capture:
            try:
                ip = packet[IP]
                udp = packet[UDP]
                # convert the payload to packet info object
                payload_info = self.payload_to_info(packet[Raw])
                # make sure the indexes match
                self.assert_equal(
                    payload_info.src, src_if.sw_if_index, "source sw_if_index"
                )
                self.assert_equal(
                    payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
                )
                packet_info = self.get_next_packet_info_for_interface2(
                    src_if.sw_if_index, dst_if.sw_if_index, packet_info
                )
                # make sure we didn't run out of saved packets
                self.assertIsNotNone(packet_info)
                self.assert_equal(
                    payload_info.index, packet_info.index, "packet info index"
                )
                saved_packet = packet_info.data  # fetch the saved packet
                # assert the values match
                self.assert_equal(ip.src, saved_packet[IP].src, "IP source address")
                # ... more assertions here
                self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
            except Exception:
                self.logger.error(ppp("Unexpected or invalid packet:", packet))
                raise
        remaining_packet = self.get_next_packet_info_for_interface2(
            src_if.sw_if_index, dst_if.sw_if_index, packet_info
        )
        self.assertIsNone(
            remaining_packet,
            "Interface %s: Packet expected from interface "
            "%s didn't arrive" % (dst_if.name, src_if.name),
        )

    def test_bufmon(self):
        self.vapi.cli("set buffer traces on")

        reply = self.vapi.cli("show buffer traces status")
        self.assertIn("buffers tracing is on", reply)

        packets = self.create_stream(self.pg0, self.pg1, 5)
        self.pg0.add_stream(packets)
        self.pg0.enable_capture()
        self.pg1.enable_capture()
        self.pg_start()

        capture = self.pg1.get_capture()
        self.pg0.assert_nothing_captured()
        self.verify_capture(self.pg0, self.pg1, capture)

        expected = [
            "pg-input",
            "ip4-input",
            "ip4-rewrite",
            "ip4-lookup",
            "ethernet-input",
            "pg1-tx",
            "pg1-output",
        ]
        reply = self.vapi.cli("show buffer traces verbose")
        for entry in expected:
            self.assertIn(entry, reply)

        # not verbose, skips nodes w/o buffered buffers
        reply = self.vapi.cli("show buffer traces")
        self.assertNotIn("pg-input", reply)

        self.vapi.cli("clear buffer traces")
        reply = self.vapi.cli("show buffer traces verbose")
        self.assertNotIn("pg-input", reply)


if __name__ == "__main__":
    unittest.main(testRunner=VppTestRunner)