aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_vlib.py
blob: 1b92c94a4c4671680a2b0fb04e5ab80989854cfd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
#!/usr/bin/env python3

import unittest
import pexpect
import time
import signal
from config import config
from framework import VppTestCase
from asfframework import VppTestRunner
from scapy.layers.inet import IP, ICMP
from scapy.layers.l2 import Ether
from scapy.packet import Raw


@unittest.skipUnless(config.gcov, "part of code coverage tests")
class TestVlib(VppTestCase):
    """Vlib Unit Test Cases"""

    vpp_worker_count = 1

    @classmethod
    def setUpClass(cls):
        super(TestVlib, cls).setUpClass()

    @classmethod
    def tearDownClass(cls):
        super(TestVlib, cls).tearDownClass()

    def setUp(self):
        super(TestVlib, self).setUp()

    def tearDown(self):
        super(TestVlib, self).tearDown()

    def test_vlib_main_unittest(self):
        """Vlib main.c Code Coverage Test"""

        cmds = [
            "loopback create",
            "packet-generator new {\n"
            " name vlib\n"
            " limit 15\n"
            " size 128-128\n"
            " interface loop0\n"
            " node ethernet-input\n"
            " data {\n"
            "   IP6: 00:d0:2d:5e:86:85 -> 00:0d:ea:d0:00:00\n"
            "   ICMP: db00::1 -> db00::2\n"
            "   incrementing 30\n"
            "   }\n"
            "}\n",
            "event-logger trace dispatch",
            "event-logger stop",
            "event-logger clear",
            "event-logger resize 102400",
            "event-logger restart",
            "pcap dispatch trace on max 100 buffer-trace pg-input 15",
            "pa en",
            "show event-log 100 all",
            "event-log save",
            "event-log save foo",
            "pcap dispatch trace",
            "pcap dispatch trace status",
            "pcap dispatch trace off",
            "show vlib frame-allocation",
        ]

        for cmd in cmds:
            r = self.vapi.cli_return_response(cmd)
            if r.retval != 0:
                if hasattr(r, "reply"):
                    self.logger.info(cmd + " FAIL reply " + r.reply)
                else:
                    self.logger.info(cmd + " FAIL retval " + str(r.retval))

    def test_vlib_node_cli_unittest(self):
        """Vlib node_cli.c Code Coverage Test"""

        cmds = [
            "loopback create",
            "packet-generator new {\n"
            " name vlib\n"
            " limit 15\n"
            " size 128-128\n"
            " interface loop0\n"
            " node ethernet-input\n"
            " data {\n"
            "   IP6: 00:d0:2d:5e:86:85 -> 00:0d:ea:d0:00:00\n"
            "   ICMP: db00::1 -> db00::2\n"
            "   incrementing 30\n"
            "   }\n"
            "}\n",
            "show vlib graph",
            "show vlib graph ethernet-input",
            "show vlib graphviz",
            "show vlib graphviz graphviz.dot",
            "pa en",
            "show runtime ethernet-input",
            "show runtime brief verbose max summary",
            "clear runtime",
            "show node index 1",
            "show node ethernet-input",
            "show node pg-input",
            "set node function",
            "set node function no-such-node",
            "set node function cdp-input default",
            "set node function ethernet-input default",
            "set node function ethernet-input bozo",
            "set node function ethernet-input",
            "show \t",
        ]

        for cmd in cmds:
            r = self.vapi.cli_return_response(cmd)
            if r.retval != 0:
                if hasattr(r, "reply"):
                    self.logger.info(cmd + " FAIL reply " + r.reply)
                else:
                    self.logger.info(cmd + " FAIL retval " + str(r.retval))

    def test_vlib_buffer_c_unittest(self):
        """Vlib buffer.c Code Coverage Test"""

        cmds = [
            "loopback create",
            "packet-generator new {\n"
            " name vlib\n"
            " limit 15\n"
            " size 128-128\n"
            " interface loop0\n"
            " node ethernet-input\n"
            " data {\n"
            "   IP6: 00:d0:2d:5e:86:85 -> 00:0d:ea:d0:00:00\n"
            "   ICMP: db00::1 -> db00::2\n"
            "   incrementing 30\n"
            "   }\n"
            "}\n",
            "event-logger trace",
            "event-logger trace enable",
            "event-logger trace api cli barrier",
            "pa en",
            "show interface bogus",
            "event-logger trace disable api cli barrier",
            "event-logger trace circuit-node ethernet-input",
            "event-logger trace circuit-node ethernet-input disable",
            "clear interfaces",
            "test vlib",
            "test vlib2",
            "show memory api-segment stats-segment main-heap verbose",
            "leak-check { show memory }",
            "show cpu",
            "memory-trace main-heap",
            "memory-trace main-heap api-segment stats-segment",
            "leak-check { show version }",
            "show version ?",
            "comment { show version }",
            "uncomment { show version }",
            "show memory main-heap",
            "show memory bogus",
            "choices",
            "test heap-validate",
            "memory-trace main-heap disable",
            "show buffers",
            "show eve",
            "show help",
            "show ip ",
        ]

        for cmd in cmds:
            r = self.vapi.cli_return_response(cmd)
            if r.retval != 0:
                if hasattr(r, "reply"):
                    self.logger.info(cmd + " FAIL reply " + r.reply)
                else:
                    self.logger.info(cmd + " FAIL retval " + str(r.retval))

    def test_vlib_format_unittest(self):
        """Vlib format.c Code Coverage Test"""

        cmds = [
            "loopback create",
            "classify filter pcap mask l2 proto match l2 proto 0x86dd",
            "classify filter pcap del",
            "test format-vlib",
        ]

        for cmd in cmds:
            r = self.vapi.cli_return_response(cmd)
            if r.retval != 0:
                if hasattr(r, "reply"):
                    self.logger.info(cmd + " FAIL reply " + r.reply)
                else:
                    self.logger.info(cmd + " FAIL retval " + str(r.retval))

    def test_vlib_main_unittest(self):
        """Private Binary API Segment Test (takes 70 seconds)"""

        vat_path = config.vpp + "_api_test"
        vat = pexpect.spawn(vat_path, ["socket-name", self.get_api_sock_path()])
        vat.expect("vat# ", timeout=10)
        vat.sendline("sock_init_shm")
        vat.expect("vat# ", timeout=10)
        vat.sendline("sh api cli")
        vat.kill(signal.SIGKILL)
        vat.wait()
        self.logger.info("vat terminated, 70 second wait for the Reaper")
        time.sleep(70)
        self.logger.info("Reaper should be complete...")

    def test_pool(self):
        """Fixed-size Pool Test"""

        cmds = [
            "test pool",
        ]

        for cmd in cmds:
            r = self.vapi.cli_return_response(cmd)
            if r.retval != 0:
                if hasattr(r, "reply"):
                    self.logger.info(cmd + " FAIL reply " + r.reply)
                else:
                    self.logger.info(cmd + " FAIL retval " + str(r.retval))


class TestVlibFrameLeak(VppTestCase):
    """Vlib Frame Leak Test Cases"""

    vpp_worker_count = 1

    @classmethod
    def setUpClass(cls):
        super(TestVlibFrameLeak, cls).setUpClass()

    @classmethod
    def tearDownClass(cls):
        super(TestVlibFrameLeak, cls).tearDownClass()

    def setUp(self):
        super(TestVlibFrameLeak, self).setUp()
        # create 1 pg interface
        self.create_pg_interfaces(range(1))

        for i in self.pg_interfaces:
            i.admin_up()
            i.config_ip4()
            i.resolve_arp()

    def tearDown(self):
        super(TestVlibFrameLeak, self).tearDown()
        for i in self.pg_interfaces:
            i.unconfig_ip4()
            i.admin_down()

    def test_vlib_mw_refork_frame_leak(self):
        """Vlib worker thread refork leak test case"""
        icmp_id = 0xB
        icmp_seq = 5
        icmp_load = b"\x0a" * 18
        pkt = (
            Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
            / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
            / ICMP(id=icmp_id, seq=icmp_seq)
            / Raw(load=icmp_load)
        )

        # Send a packet
        self.pg0.add_stream(pkt)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        rx = self.pg0.get_capture(1)

        self.assertEquals(len(rx), 1)
        rx = rx[0]
        ether = rx[Ether]
        ipv4 = rx[IP]

        self.assertEqual(ether.src, self.pg0.local_mac)
        self.assertEqual(ether.dst, self.pg0.remote_mac)

        self.assertEqual(ipv4.src, self.pg0.local_ip4)
        self.assertEqual(ipv4.dst, self.pg0.remote_ip4)

        # Save allocated frame count
        frame_allocated = {}
        for fs in self.vapi.cli("show vlib frame-allocation").splitlines()[1:]:
            spl = fs.split()
            thread = int(spl[0])
            size = int(spl[1])
            alloc = int(spl[2])
            key = (thread, size)
            frame_allocated[key] = alloc

        # cause reforks
        _ = self.create_loopback_interfaces(1)

        # send the same packet
        self.pg0.add_stream(pkt)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()

        rx = self.pg0.get_capture(1)

        self.assertEquals(len(rx), 1)
        rx = rx[0]
        ether = rx[Ether]
        ipv4 = rx[IP]

        self.assertEqual(ether.src, self.pg0.local_mac)
        self.assertEqual(ether.dst, self.pg0.remote_mac)

        self.assertEqual(ipv4.src, self.pg0.local_ip4)
        self.assertEqual(ipv4.dst, self.pg0.remote_ip4)

        # Check that no frame were leaked during refork
        for fs in self.vapi.cli("show vlib frame-allocation").splitlines()[1:]:
            spl = fs.split()
            thread = int(spl[0])
            size = int(spl[1])
            alloc = int(spl[2])
            key = (thread, size)
            self.assertEqual(frame_allocated[key], alloc)


if __name__ == "__main__":
    unittest.main(testRunner=VppTestRunner)
is_multipath=1 if len(self.paths) > 1 else 0) self._test.registry.register(self, self._test.logger) def remove_vpp_config(self): if self.is_local or self.is_unreach or self.is_prohibit: self._test.vapi.ip_add_del_route( self.dest_addr, self.dest_addr_len, inet_pton(AF_INET6, "::"), 0xffffffff, is_local=self.is_local, is_unreach=self.is_unreach, is_prohibit=self.is_prohibit, is_add=0, table_id=self.table_id, is_ipv6=self.is_ip6) else: for path in self.paths: self._test.vapi.ip_add_del_route( self.dest_addr, self.dest_addr_len, path.nh_addr, path.nh_itf, table_id=self.table_id, next_hop_table_id=path.nh_table_id, next_hop_via_label=path.nh_via_label, next_hop_id=path.next_hop_id, is_add=0, is_udp_encap=path.is_udp_encap, is_ipv6=self.is_ip6, is_dvr=path.is_dvr) def query_vpp_config(self): return find_route(self._test, self.dest_addr_p, self.dest_addr_len, self.table_id, inet=AF_INET6 if self.is_ip6 == 1 else AF_INET) def __str__(self): return self.object_id() def object_id(self): return ("%d:%s/%d" % (self.table_id, self.dest_addr_p, self.dest_addr_len)) class VppIpMRoute(VppObject): """ IP Multicast Route """ def __init__(self, test, src_addr, grp_addr, grp_addr_len, e_flags, paths, table_id=0, rpf_id=0, is_ip6=0): self._test = test self.paths = paths self.grp_addr_len = grp_addr_len self.table_id = table_id self.e_flags = e_flags self.is_ip6 = is_ip6 self.rpf_id = rpf_id if is_ip6: self.grp_addr = inet_pton(AF_INET6, grp_addr) self.src_addr = inet_pton(AF_INET6, src_addr) else: self.grp_addr = inet_pton(AF_INET, grp_addr) self.src_addr = inet_pton(AF_INET, src_addr) def add_vpp_config(self): for path in self.paths: self._test.vapi.ip_mroute_add_del(self.src_addr, self.grp_addr, self.grp_addr_len, self.e_flags, path.proto, path.nh_itf, path.nh_addr, path.nh_i_flags, bier_imp=path.bier_imp, rpf_id=self.rpf_id, table_id=self.table_id, is_ipv6=self.is_ip6) self._test.registry.register(self, self._test.logger) def remove_vpp_config(self): for path in self.paths: self._test.vapi.ip_mroute_add_del(self.src_addr, self.grp_addr, self.grp_addr_len, self.e_flags, path.proto, path.nh_itf, path.nh_addr, path.nh_i_flags, table_id=self.table_id, bier_imp=path.bier_imp, is_add=0, is_ipv6=self.is_ip6) def update_entry_flags(self, flags): self.e_flags = flags self._test.vapi.ip_mroute_add_del(self.src_addr, self.grp_addr, self.grp_addr_len, self.e_flags, 0, 0xffffffff, "", 0, table_id=self.table_id, is_ipv6=self.is_ip6) def update_rpf_id(self, rpf_id): self.rpf_id = rpf_id self._test.vapi.ip_mroute_add_del(self.src_addr, self.grp_addr, self.grp_addr_len, self.e_flags, 0, 0xffffffff, "", 0, rpf_id=self.rpf_id, table_id=self.table_id, is_ipv6=self.is_ip6) def update_path_flags(self, itf, flags): for path in self.paths: if path.nh_itf == itf: path.nh_i_flags = flags break self._test.vapi.ip_mroute_add_del(self.src_addr, self.grp_addr, self.grp_addr_len, self.e_flags, path.proto, path.nh_itf, path.nh_addr, path.nh_i_flags, table_id=self.table_id, is_ipv6=self.is_ip6) def query_vpp_config(self): if self.is_ip6: dump = self._test.vapi.ip6_mfib_dump() else: dump = self._test.vapi.ip_mfib_dump() for e in dump: if self.grp_addr == e.grp_address \ and self.grp_addr_len == e.address_length \ and self.src_addr == e.src_address \ and self.table_id == e.table_id: return True return False def __str__(self): return self.object_id() def object_id(self): if self.is_ip6: return ("%d:(%s,%s/%d)" % (self.table_id, inet_ntop(AF_INET6, self.src_addr), inet_ntop(AF_INET6, self.grp_addr), self.grp_addr_len)) else: return ("%d:(%s,%s/%d)" % (self.table_id, inet_ntop(AF_INET, self.src_addr), inet_ntop(AF_INET, self.grp_addr), self.grp_addr_len)) class VppMFibSignal(object): def __init__(self, test, route, interface, packet): self.route = route self.interface = interface self.packet = packet self.test = test def compare(self, signal): self.test.assertEqual(self.interface, signal.sw_if_index) self.test.assertEqual(self.route.table_id, signal.table_id) self.test.assertEqual(self.route.grp_addr_len, signal.grp_address_len) for i in range(self.route.grp_addr_len / 8): self.test.assertEqual(self.route.grp_addr[i], signal.grp_address[i]) if (self.route.grp_addr_len > 32): for i in range(4): self.test.assertEqual(self.route.src_addr[i], signal.src_address[i]) class VppMplsIpBind(VppObject): """ MPLS to IP Binding """ def __init__(self, test, local_label, dest_addr, dest_addr_len, table_id=0, ip_table_id=0, is_ip6=0): self._test = test self.dest_addr_len = dest_addr_len self.dest_addr = dest_addr self.local_label = local_label self.table_id = table_id self.ip_table_id = ip_table_id self.is_ip6 = is_ip6 if is_ip6: self.dest_addrn = inet_pton(AF_INET6, dest_addr) else: self.dest_addrn = inet_pton(AF_INET, dest_addr) def add_vpp_config(self): self._test.vapi.mpls_ip_bind_unbind(self.local_label, self.dest_addrn, self.dest_addr_len, table_id=self.table_id, ip_table_id=self.ip_table_id, is_ip4=(self.is_ip6 == 0)) self._test.registry.register(self, self._test.logger) def remove_vpp_config(self): self._test.vapi.mpls_ip_bind_unbind(self.local_label, self.dest_addrn, self.dest_addr_len, table_id=self.table_id, ip_table_id=self.ip_table_id, is_bind=0, is_ip4=(self.is_ip6 == 0)) def query_vpp_config(self): dump = self._test.vapi.mpls_fib_dump() for e in dump: if self.local_label == e.label \ and self.table_id == e.table_id: return True return False def __str__(self): return self.object_id() def object_id(self): return ("%d:%s binds %d:%s/%d" % (self.table_id, self.local_label, self.ip_table_id, self.dest_addr, self.dest_addr_len)) class VppMplsTable(VppObject): def __init__(self, test, table_id): self._test = test self.table_id = table_id def add_vpp_config(self): self._test.vapi.mpls_table_add_del( self.table_id, is_add=1) self._test.registry.register(self, self._test.logger) def remove_vpp_config(self): self._test.vapi.mpls_table_add_del( self.table_id, is_add=0) def query_vpp_config(self): # find the default route dump = self._test.vapi.mpls_fib_dump() if len(dump): return True return False def __str__(self): return self.object_id() def object_id(self): return ("table-mpls-%d" % (self.table_id)) class VppMplsRoute(VppObject): """ MPLS Route/LSP """ def __init__(self, test, local_label, eos_bit, paths, table_id=0, is_multicast=0): self._test = test self.paths = paths self.local_label = local_label self.eos_bit = eos_bit self.table_id = table_id self.is_multicast = is_multicast def add_vpp_config(self): is_multipath = len(self.paths) > 1 for path in self.paths: lstack = path.encode_labels() self._test.vapi.mpls_route_add_del( self.local_label, self.eos_bit, path.proto, path.nh_addr, path.nh_itf, is_multicast=self.is_multicast, is_multipath=is_multipath, table_id=self.table_id, is_interface_rx=path.is_interface_rx, is_rpf_id=path.is_rpf_id, next_hop_out_label_stack=lstack, next_hop_n_out_labels=len(lstack), next_hop_via_label=path.nh_via_label, next_hop_table_id=path.nh_table_id) self._test.registry.register(self, self._test.logger) def remove_vpp_config(self): for path in self.paths: self._test.vapi.mpls_route_add_del(self.local_label, self.eos_bit, path.proto, path.nh_addr, path.nh_itf, is_rpf_id=path.is_rpf_id, table_id=self.table_id, is_add=0) def query_vpp_config(self): dump = self._test.vapi.mpls_fib_dump() for e in dump: if self.local_label == e.label \ and self.eos_bit == e.eos_bit \ and self.table_id == e.table_id: return True return False def __str__(self): return self.object_id() def object_id(self): return ("%d:%s/%d" % (self.table_id, self.local_label, 20+self.eos_bit))