aboutsummaryrefslogtreecommitdiffstats
path: root/resources/libraries/python/Tap.py
blob: 7380344b72e3523e732664d18174114005314316 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# Copyright (c) 2022 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tap utilities library."""

from enum import IntEnum

from robot.api import logger

from resources.libraries.python.Constants import Constants
from resources.libraries.python.InterfaceUtil import InterfaceUtil
from resources.libraries.python.L2Util import L2Util
from resources.libraries.python.PapiExecutor import PapiSocketExecutor
from resources.libraries.python.topology import Topology


class TapFeaturesFlags(IntEnum):
    """TAP Features Flags."""
    TAP_API_FLAG_GSO = 1
    TAP_API_FLAG_CSUM_OFFLOAD = 2
    TAP_API_FLAG_PERSIST = 4
    TAP_API_FLAG_ATTACH = 8
    TAP_API_FLAG_TUN = 16
    TAP_API_FLAG_GRO_COALESCE = 32
    TAP_API_FLAG_PACKED = 64
    TAP_API_FLAG_IN_ORDER = 128


class Tap:
    """Tap utilities."""

    @staticmethod
    def add_tap_interface(
            node, tap_name, mac=None, host_namespace=None, num_rx_queues=1,
            rxq_size=0, txq_size=0, tap_feature_mask=0):
        """Add tap interface with name and optionally with MAC.

        :param node: Node to add tap on.
        :param tap_name: Tap interface name for linux tap.
        :param mac: Optional MAC address for VPP tap.
        :param host_namespace: Namespace.
        :param num_rx_queues: Number of RX queues.
        :param rxq_size: Size of RXQ (0 = Default API; 256 = Default VPP).
        :param txq_size: Size of TXQ (0 = Default API; 256 = Default VPP).
        :param tap_feature_mask: Mask of tap features to be enabled.
        :type node: dict
        :type tap_name: str
        :type mac: str
        :type host_namespace: str
        :type num_rx_queues: int
        :type rxq_size: int
        :type txq_size: int
        :type tap_feature_mask: int
        :returns: Returns a interface index.
        :rtype: int
        """
        cmd = u"tap_create_v3"
        args = dict(
            id=Constants.BITWISE_NON_ZERO,
            use_random_mac=bool(mac is None),
            mac_address=L2Util.mac_to_bin(mac) if mac else None,
            num_rx_queues=int(num_rx_queues),
            tx_ring_sz=int(txq_size),
            rx_ring_sz=int(rxq_size),
            host_mtu_set=False,
            host_mac_addr_set=False,
            host_ip4_prefix_set=False,
            host_ip6_prefix_set=False,
            host_ip4_gw_set=False,
            host_ip6_gw_set=False,
            host_namespace_set=bool(host_namespace),
            host_namespace=host_namespace,
            host_if_name_set=True,
            host_if_name=tap_name,
            host_bridge_set=False,
            tap_flags=tap_feature_mask
        )
        err_msg = f"Failed to create tap interface {tap_name} " \
            f"on host {node[u'host']}"

        with PapiSocketExecutor(node) as papi_exec:
            sw_if_index = papi_exec.add(cmd, **args).get_sw_if_index(err_msg)

        if_key = Topology.add_new_port(node, u"tap")
        Topology.update_interface_sw_if_index(node, if_key, sw_if_index)
        Topology.update_interface_name(node, if_key, tap_name)
        if mac is None:
            mac = Tap.vpp_get_tap_interface_mac(node, tap_name)
        Topology.update_interface_mac_address(node, if_key, mac)
        tap_dev_name = Tap.vpp_get_tap_dev_name(node, tap_name)
        Topology.update_interface_tap_dev_name(node, if_key, tap_dev_name)

        return sw_if_index

    @staticmethod
    def vpp_get_tap_dev_name(node, host_if_name):
        """Get VPP tap interface name from hardware interfaces dump.

        :param node: DUT node.
        :param host_if_name: Tap host interface name.
        :type node: dict
        :type host_if_name: str
        :returns: VPP tap interface dev_name.
        :rtype: str
        """
        return Tap.tap_dump(node, host_if_name).get(u"dev_name")

    @staticmethod
    def vpp_get_tap_interface_mac(node, interface_name):
        """Get tap interface MAC address from interfaces dump.

        :param node: DUT node.
        :param interface_name: Tap interface name.
        :type node: dict
        :type interface_name: str
        :returns: Tap interface MAC address.
        :rtype: str
        """
        return InterfaceUtil.vpp_get_interface_mac(node, interface_name)

    @staticmethod
    def tap_dump(node, name=None):
        """Get all TAP interface data from the given node, or data about
        a specific TAP interface.

        :param node: VPP node to get data from.
        :param name: Optional name of a specific TAP interface.
        :type node: dict
        :type name: str
        :returns: Dictionary of information about a specific TAP interface, or
            a List of dictionaries containing all TAP data for the given node.
        :rtype: dict or list
        """
        def process_tap_dump(tap_dump):
            """Process tap dump.

            :param tap_dump: Tap interface dump.
            :type tap_dump: dict
            :returns: Processed tap interface dump.
            :rtype: dict
            """
            tap_dump[u"host_mac_addr"] = str(tap_dump[u"host_mac_addr"])
            tap_dump[u"host_ip4_prefix"] = str(tap_dump[u"host_ip4_prefix"])
            tap_dump[u"host_ip6_prefix"] = str(tap_dump[u"host_ip6_prefix"])
            tap_dump[u"tap_flags"] = tap_dump[u"tap_flags"].value \
                if hasattr(tap_dump[u"tap_flags"], u"value") \
                else int(tap_dump[u"tap_flags"])
            tap_dump[u"host_namespace"] = None \
                if tap_dump[u"host_namespace"] == u"(nil)" \
                else tap_dump[u"host_namespace"]
            tap_dump[u"host_bridge"] = None \
                if tap_dump[u"host_bridge"] == u"(nil)" \
                else tap_dump[u"host_bridge"]

            return tap_dump

        cmd = u"sw_interface_tap_v2_dump"
        err_msg = f"Failed to get TAP dump on host {node[u'host']}"

        with PapiSocketExecutor(node) as papi_exec:
            details = papi_exec.add(cmd).get_details(err_msg)

        data = list() if name is None else dict()
        for dump in details:
            if name is None:
                data.append(process_tap_dump(dump))
            elif dump.get(u"host_if_name") == name:
                data = process_tap_dump(dump)
                break

        logger.debug(f"TAP data:\n{data}")
        return data


class TapFeatureMask:
    """Tap features utilities"""

    @staticmethod
    def create_tap_feature_mask(**kwargs):
        """Create tap feature mask with feature bits set according to kwargs.
        :param kwargs: Key-value pairs of feature names and it's state
        :type kwargs: dict
        """
        tap_feature_mask = 0

        if u"all" in kwargs and kwargs[u"all"] is True:
            for tap_feature_flag in TapFeaturesFlags:
                tap_feature_mask |= 1 << (tap_feature_flag.value - 1)
        else:
            for feature_name, enabled in kwargs.items():
                tap_feature_name = u"TAP_API_FLAG_" + feature_name.upper()
                if tap_feature_name not in TapFeaturesFlags.__members__:
                    raise ValueError(u"Unsupported tap feature flag name")
                if enabled:
                    tap_feature_mask |= \
                        1 << (TapFeaturesFlags[tap_feature_name].value - 1)

        return tap_feature_mask

    @staticmethod
    def is_feature_enabled(tap_feature_mask, tap_feature_flag):
        """Checks if concrete tap feature is enabled within
        tap_feature_mask
        :param tap_feature_mask: Mask of enabled tap features
        :param tap_feature_flag: Checked tap feature
        :type tap_feature_mask: int
        :type tap_feature_flag: TapFeaturesFlags
        """
        feature_flag_bit = 1 << tap_feature_flag.value
        return (tap_feature_mask & feature_flag_bit) > 0