aboutsummaryrefslogtreecommitdiffstats
path: root/resources/libraries/python/IPv4Util.py
blob: 30e185c9ffe955e66d4ab69315a944f5d844832c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# Copyright (c) 2016 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Implements IPv4 RobotFramework keywords"""

from robot.api import logger as log
from robot.api.deco import keyword

from resources.libraries.python.topology import Topology
from resources.libraries.python.topology import NodeType
from resources.libraries.python.TrafficScriptExecutor\
    import TrafficScriptExecutor
from resources.libraries.python.IPv4Setup import get_node


class IPv4Util(object):
    """Implements keywords for IPv4 tests."""

    @staticmethod
    @keyword('From node "${node}" interface "${port}" ARP-ping '
             'IPv4 address "${ip_address}"')
    def arp_ping(node, interface, ip_address):
        log.debug('From node {} interface {} ARP-ping IPv4 address {}'.
                  format(Topology.get_node_hostname(node),
                         interface, ip_address))
        get_node(node).arp_ping(ip_address, interface)

    @staticmethod
    @keyword('Node "${node}" routes to IPv4 network "${network}" with prefix '
             'length "${prefix_length}" using interface "${interface}" via '
             '"${gateway}"')
    def set_route(node, network, prefix_length, interface, gateway):
        """See IPv4Node.set_route for more information.

        :param node:
        :param network:
        :param prefix_length:
        :param interface:
        :param gateway:
        :return:
        """
        log.debug('Node {} routes to network {} with prefix length {} '
                  'via {} interface {}'.format(Topology.get_node_hostname(node),
                                               network, prefix_length,
                                               gateway, interface))
        get_node(node).set_route(network, int(prefix_length),
                                 gateway, interface)

    @staticmethod
    @keyword('After ping is sent in topology "${nodes_info}" from node '
             '"${src_node}" interface "${src_port}" "${src_ip}" with '
             'destination IPv4 address "${dst_ip}" of node "${dst_node}" '
             'interface "${dst_port}" a ping response arrives and TTL is '
             'decreased by "${hops}"')
    def send_ping(nodes_info, src_node, src_port, src_ip, dst_ip, dst_node,
                  dst_port, hops):
        """Send IPv4 ping and wait for response.

        :param nodes_info: Dictionary containing information on all nodes
        in topology.
        :param src_node: Source node.
        :param src_port: Source interface.
        :param src_ip: Source ipv4 address.
        :param dst_ip: Destination ipv4 address.
        :param dst_node: Destination node.
        :param dst_port: Destination interface.
        :param hops: Number of hops between src_node and dst_node.
        """
        log.debug('After ping is sent from node "{}" interface "{}" '
                  'with destination IPv4 address of node "{}" interface "{}" '
                  'a ping response arrives and TTL is decreased by "${}"'.
                  format(Topology.get_node_hostname(src_node), src_port,
                         Topology.get_node_hostname(dst_node), dst_port, hops))
        dst_mac = None
        src_mac = Topology.get_interface_mac(src_node, src_port)
        if dst_node['type'] == NodeType.TG:
            dst_mac = Topology.get_interface_mac(src_node, src_port)
        _, adj_int = Topology.\
            get_adjacent_node_and_interface(nodes_info, src_node, src_port)
        first_hop_mac = adj_int['mac_address']
        args = '--src_if "{}" --src_mac "{}" --first_hop_mac "{}" ' \
               '--src_ip "{}" --dst_ip "{}" --hops "{}"'\
            .format(src_port, src_mac, first_hop_mac, src_ip, dst_ip, hops)
        if dst_node['type'] == NodeType.TG:
            args += ' --dst_if "{}" --dst_mac "{}"'.format(dst_port, dst_mac)
        TrafficScriptExecutor.run_traffic_script_on_node(
            "ipv4_ping_ttl_check.py", src_node, args)

    @staticmethod
    @keyword('Get IPv4 address prefix of node "${node}" interface "${port}" '
             'from "${nodes_addr}"')
    def get_ip_addr_prefix_length(node, port, nodes_addr):
        """ Get IPv4 address prefix for specified interface.

        :param node: Node dictionary.
        :param port: Interface name.
        :return: IPv4 prefix length
        """
        for net in nodes_addr.values():
            for p in net['ports'].values():
                if p['node'] == node['host'] and p['if'] == port:
                    return net['prefix']

        raise Exception('Subnet not found for node {n} port {p}'.
                        format(n=node['host'], p=port))

    @staticmethod
    @keyword('Get IPv4 subnet of node "${node}" interface "${port}" from '
             '"${nodes_addr}"')
    def get_ip_addr_subnet(node, port, nodes_addr):
        """ Get IPv4 subnet of specified interface.

        :param node: Node dictionary.
        :param port: Interface name.
        :return: IPv4 subnet of 'str' type
        """
        for net in nodes_addr.values():
            for p in net['ports'].values():
                if p['node'] == node['host'] and p['if'] == port:
                    return net['net_addr']

        raise Exception('Subnet not found for node {n} port {p}'.
                        format(n=node['host'], p=port))

    @staticmethod
    @keyword('Flush IPv4 addresses "${port}" "${node}"')
    def flush_ip_addresses(port, node):
        """See IPv4Node.flush_ip_addresses for more information.

        :param port:
        :param node:
        :return:
        """
        get_node(node).flush_ip_addresses(port)