aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_interface_crud.py
blob: c88759d9b59dd761780484a51c1a8a4f5aa562b5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#!/usr/bin/env python3
"""CRUD tests of APIs (Create, Read, Update, Delete) HLD:

- interface up/down/add/delete - interface type:
    - pg (TBD)
    - loopback
    - vhostuser (TBD)
    - af_packet (TBD)
    - netmap (TBD)
    - tuntap (root privileges needed)
    - vxlan (TBD)
"""

import unittest

from scapy.layers.inet import IP, ICMP
from scapy.layers.l2 import Ether

from framework import VppTestCase
from asfframework import VppTestRunner


class TestLoopbackInterfaceCRUD(VppTestCase):
    """CRUD Loopback"""

    @classmethod
    def setUpClass(cls):
        super(TestLoopbackInterfaceCRUD, cls).setUpClass()
        try:
            cls.create_pg_interfaces(range(1))
            for i in cls.pg_interfaces:
                i.config_ip4().resolve_arp()
        except:
            cls.tearDownClass()
            raise

    @classmethod
    def tearDownClass(cls):
        super(TestLoopbackInterfaceCRUD, cls).tearDownClass()

    @staticmethod
    def create_icmp_stream(src_if, dst_ifs):
        """

        :param VppInterface src_if: Packets are send to this interface,
            using this interfaces remote host.
        :param list dst_ifs: IPv4 ICMP requests are send to interfaces
            addresses.
        :return: List of generated packets.
        """
        pkts = []
        for i in dst_ifs:
            p = (
                Ether(dst=src_if.local_mac, src=src_if.remote_mac)
                / IP(src=src_if.remote_ip4, dst=i.local_ip4)
                / ICMP(id=i.sw_if_index, type="echo-request")
            )
            pkts.append(p)
        return pkts

    def verify_icmp(self, capture, request_src_if, dst_ifs):
        """

        :param capture: Capture to verify.
        :param VppInterface request_src_if: Interface where was send packets.
        :param list dst_ifs: Interfaces where was generated IPv4 ICMP requests.
        """
        rcvd_icmp_pkts = []
        for pkt in capture:
            try:
                ip = pkt[IP]
                icmp = pkt[ICMP]
            except IndexError:
                pass
            else:
                info = (ip.src, ip.dst, icmp.type, icmp.id)
                rcvd_icmp_pkts.append(info)

        for i in dst_ifs:
            # 0 - icmp echo response
            info = (i.local_ip4, request_src_if.remote_ip4, 0, i.sw_if_index)
            self.assertIn(info, rcvd_icmp_pkts)

    def test_crud(self):
        # create
        loopbacks = self.create_loopback_interfaces(20)
        for i in loopbacks:
            i.local_ip4_prefix_len = 32
            i.config_ip4().admin_up()

        # read (check sw if dump, ip4 fib, ip6 fib)
        if_dump = self.vapi.sw_interface_dump(
            name_filter_valid=True, name_filter="loop"
        )
        fib4_dump = self.vapi.ip_route_dump(0)
        for i in loopbacks:
            self.assertTrue(i.is_interface_config_in_dump(if_dump))
            self.assertTrue(i.is_ip4_entry_in_fib_dump(fib4_dump))

        if_dump = self.vapi.sw_interface_dump(
            name_filter_valid=True, name_filter="loopXYZ"
        )
        self.assertEqual(len(if_dump), 0)

        # check ping
        stream = self.create_icmp_stream(self.pg0, loopbacks)
        self.pg0.add_stream(stream)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        capture = self.pg0.get_capture(expected_count=len(stream))

        self.verify_icmp(capture, self.pg0, loopbacks)

        # delete
        for i in loopbacks:
            i.remove_vpp_config()

        # read (check not in sw if dump, ip4 fib, ip6 fib)
        if_dump = self.vapi.sw_interface_dump()
        fib4_dump = self.vapi.ip_route_dump(0)
        for i in loopbacks:
            self.assertFalse(i.is_interface_config_in_dump(if_dump))
            self.assertFalse(i.is_ip4_entry_in_fib_dump(fib4_dump))

        #  check not ping
        stream = self.create_icmp_stream(self.pg0, loopbacks)
        self.pg0.add_stream(stream)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        self.pg0.assert_nothing_captured()

    def test_down(self):
        # create
        loopbacks = self.create_loopback_interfaces(20)
        for i in loopbacks:
            i.local_ip4_prefix_len = 32
            i.config_ip4().admin_up()

        # disable
        for i in loopbacks:
            i.admin_down().unconfig_ip4()

        # read (check not in sw if dump, ip4 fib, ip6 fib)
        if_dump = self.vapi.sw_interface_dump()
        fib4_dump = self.vapi.ip_route_dump(0)
        for i in loopbacks:
            self.assertTrue(i.is_interface_config_in_dump(if_dump))
            self.assertFalse(i.is_ip4_entry_in_fib_dump(fib4_dump))

        #  check not ping
        stream = self.create_icmp_stream(self.pg0, loopbacks)
        self.pg0.add_stream(stream)
        self.pg_enable_capture(self.pg_interfaces)
        self.pg_start()
        self.pg0.assert_nothing_captured()


class TestInterfaceDumpApiLocalOnly(VppTestCase):
    """test_interface_crud.TestInterfaceDumpApiLocalOnly"""

    def test_sw_if_index_0(self):
        rv = self.vapi.sw_interface_dump(sw_if_index=0)
        self.assertEqual(rv[0].sw_if_index, 0)

    def test_sw_if_index_twiddle0(self):
        rv = self.vapi.sw_interface_dump(sw_if_index=0xFFFFFFFF)
        self.assertEqual(rv[0].sw_if_index, 0)

    def test_sw_if_index_1_not_existing(self):
        rv = self.vapi.sw_interface_dump(sw_if_index=1)
        self.assertEqual(len(rv), 0, "expected no records.")


class TestInterfaceDumpApi(VppTestCase):
    """test_interface_crud.TestInterfaceDumpApi"""

    def test_sw_if_index_1(self):
        self.vapi.create_loopback_instance(is_specified=1, user_instance=10)
        self.vapi.create_loopback_instance(is_specified=1, user_instance=5)

        # Can I get back the specified record?
        rv = self.vapi.sw_interface_dump(sw_if_index=1)
        self.assertEqual(rv[0].sw_if_index, 1, rv)

        # verify 3 interfaces
        rv = self.vapi.sw_interface_dump(sw_if_index=0xFFFFFFFF)
        self.assertEqual(len(rv), 3, "Expected 3 interfaces.")


if __name__ == "__main__":
    unittest.main(testRunner=VppTestRunner)
n> #endif if (event_data) { _vec_len (event_data) = 0; } } return 0; } /* * lldp process node declaration */ /* *INDENT-OFF* */ VLIB_REGISTER_NODE(lldp_process_node, static) = { .function = lldp_process, .type = VLIB_NODE_TYPE_PROCESS, .name = "lldp-process", }; /* *INDENT-ON* */ void lldp_schedule_intf (lldp_main_t * lm, lldp_intf_t * n) { const int idx = n - lm->intfs; u32 v; vec_foreach_index (v, lm->intfs_timeouts) { if (lm->intfs_timeouts[v] == idx) { /* already scheduled */ return; } } n->last_sent = 0; /* ensure that a packet is sent out immediately */ /* put the interface at the current position in the timeouts - it * will timeout immediately */ vec_insert (lm->intfs_timeouts, 1, lm->intfs_timeouts_idx); lm->intfs_timeouts[lm->intfs_timeouts_idx] = n - lm->intfs; vlib_process_signal_event (lm->vlib_main, lm->lldp_process_node_index, LLDP_EVENT_RESCHEDULE, 0); #if LLDP_DEBUG clib_warning ("DEBUG: schedule interface %p, if idx %d", n, n->hw_if_index); #endif } void lldp_unschedule_intf (lldp_main_t * lm, lldp_intf_t * n) { if (!n) { return; } #if LLDP_DEBUG clib_warning ("DEBUG: unschedule interface %p, if idx %d", n, n->hw_if_index); #endif const int idx = n - lm->intfs; u32 v; /* remove intf index from timeouts vector */ vec_foreach_index (v, lm->intfs_timeouts) { if (lm->intfs_timeouts[v] == idx) { vec_delete (lm->intfs_timeouts, 1, v); break; } } /* wrap current timeout index to first element if needed */ if (lm->intfs_timeouts_idx >= vec_len (lm->intfs_timeouts)) { lm->intfs_timeouts_idx = 0; } vlib_process_signal_event (lm->vlib_main, lm->lldp_process_node_index, LLDP_EVENT_RESCHEDULE, 0); } static clib_error_t * lldp_sw_interface_up_down (vnet_main_t * vnm, u32 sw_if_index, u32 flags) { lldp_main_t *lm = &lldp_main; vnet_hw_interface_t *hi = vnet_get_sup_hw_interface (vnm, sw_if_index); lldp_intf_t *n = lldp_get_intf (lm, hi->hw_if_index); if (n) { if (!(flags & VNET_SW_INTERFACE_FLAG_ADMIN_UP)) { /* FIXME - the packet sent here isn't send properly - need to find a * way to send the packet before interface goes down */ lldp_send_ethernet (lm, n, 1); lldp_unschedule_intf (lm, n); } } return 0; } VNET_SW_INTERFACE_ADMIN_UP_DOWN_FUNCTION (lldp_sw_interface_up_down); static clib_error_t * lldp_hw_interface_up_down (vnet_main_t * vnm, u32 hw_if_index, u32 flags) { lldp_main_t *lm = &lldp_main; lldp_intf_t *n = lldp_get_intf (lm, hw_if_index); if (n) { if (flags & VNET_HW_INTERFACE_FLAG_LINK_UP) { lldp_schedule_intf (lm, n); } } return 0; } VNET_HW_INTERFACE_LINK_UP_DOWN_FUNCTION (lldp_hw_interface_up_down); /* * fd.io coding-style-patch-verification: ON * * Local Variables: * eval: (c-set-style "gnu") * End: */