aboutsummaryrefslogtreecommitdiffstats
path: root/resources/libraries/python/IPv4Util.py
diff options
context:
space:
mode:
authorStefan Kobza <skobza@cisco.com>2016-01-11 18:03:25 +0100
committerStefan Kobza <skobza@cisco.com>2016-02-08 22:38:32 +0100
commit33499c81c94c2d3baef9d3e9f061cd76ef86fa74 (patch)
tree2d000ba17b821339a05e0c039f71e48e09553de9 /resources/libraries/python/IPv4Util.py
parent5cbeca02602061d32212e14f289d65cf648920e4 (diff)
New version of RF tests.
Change-Id: I241a2b7a7706e65f71cfd4a62e2a40f053fc5d07 Signed-off-by: Stefan Kobza <skobza@cisco.com>
Diffstat (limited to 'resources/libraries/python/IPv4Util.py')
-rw-r--r--resources/libraries/python/IPv4Util.py499
1 files changed, 499 insertions, 0 deletions
diff --git a/resources/libraries/python/IPv4Util.py b/resources/libraries/python/IPv4Util.py
new file mode 100644
index 0000000000..5480bfcea3
--- /dev/null
+++ b/resources/libraries/python/IPv4Util.py
@@ -0,0 +1,499 @@
+# Copyright (c) 2016 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Implements IPv4 RobotFramework keywords"""
+
+from socket import inet_ntoa
+from struct import pack
+from abc import ABCMeta, abstractmethod
+import copy
+
+from robot.api import logger as log
+from robot.api.deco import keyword
+from robot.utils.asserts import assert_not_equal
+
+import resources.libraries.python.ssh as ssh
+from resources.libraries.python.topology import Topology
+from resources.libraries.python.topology import NodeType
+from resources.libraries.python.VatExecutor import VatExecutor
+from resources.libraries.python.TrafficScriptExecutor\
+ import TrafficScriptExecutor
+
+
+class IPv4Node(object):
+ """Abstract class of a node in a topology."""
+ __metaclass__ = ABCMeta
+
+ def __init__(self, node_info):
+ self.node_info = node_info
+
+ @staticmethod
+ def _get_netmask(prefix_length):
+ bits = 0xffffffff ^ (1 << 32 - prefix_length) - 1
+ return inet_ntoa(pack('>I', bits))
+
+ @abstractmethod
+ def set_ip(self, interface, address, prefix_length):
+ """Configure IPv4 address on interface
+ :param interface: interface name
+ :param address:
+ :param prefix_length:
+ :type interface: str
+ :type address: str
+ :type prefix_length: int
+ :return: nothing
+ """
+ pass
+
+ @abstractmethod
+ def set_interface_state(self, interface, state):
+ """Set interface state
+ :param interface: interface name string
+ :param state: one of following values: "up" or "down"
+ :return: nothing
+ """
+ pass
+
+ @abstractmethod
+ def set_route(self, network, prefix_length, gateway, interface):
+ """Configure IPv4 route
+ :param network: network IPv4 address
+ :param prefix_length: mask length
+ :param gateway: IPv4 address of the gateway
+ :param interface: interface name
+ :type network: str
+ :type prefix_length: int
+ :type gateway: str
+ :type interface: str
+ :return: nothing
+ """
+ pass
+
+ @abstractmethod
+ def unset_route(self, network, prefix_length, gateway, interface):
+ """Remove specified IPv4 route
+ :param network: network IPv4 address
+ :param prefix_length: mask length
+ :param gateway: IPv4 address of the gateway
+ :param interface: interface name
+ :type network: str
+ :type prefix_length: int
+ :type gateway: str
+ :type interface: str
+ :return: nothing
+ """
+ pass
+
+ @abstractmethod
+ def flush_ip_addresses(self, interface):
+ """Flush all IPv4 addresses from specified interface
+ :param interface: interface name
+ :type interface: str
+ :return: nothing
+ """
+ pass
+
+ @abstractmethod
+ def ping(self, destination_address, source_interface):
+ """Send an ICMP request to destination node
+ :param destination_address: address to send the ICMP request
+ :param source_interface:
+ :type destination_address: str
+ :type source_interface: str
+ :return: nothing
+ """
+ pass
+
+
+class Tg(IPv4Node):
+ """Traffic generator node"""
+ def __init__(self, node_info):
+ super(Tg, self).__init__(node_info)
+
+ def _execute(self, cmd):
+ return ssh.exec_cmd_no_error(self.node_info, cmd)
+
+ def _sudo_execute(self, cmd):
+ return ssh.exec_cmd_no_error(self.node_info, cmd, sudo=True)
+
+ def set_ip(self, interface, address, prefix_length):
+ cmd = 'ip -4 addr flush dev {}'.format(interface)
+ self._sudo_execute(cmd)
+ cmd = 'ip addr add {}/{} dev {}'.format(address, prefix_length,
+ interface)
+ self._sudo_execute(cmd)
+
+ # TODO: not ipv4-specific, move to another class
+ def set_interface_state(self, interface, state):
+ cmd = 'ip link set {} {}'.format(interface, state)
+ self._sudo_execute(cmd)
+
+ def set_route(self, network, prefix_length, gateway, interface):
+ netmask = self._get_netmask(prefix_length)
+ cmd = 'route add -net {} netmask {} gw {}'.\
+ format(network, netmask, gateway)
+ self._sudo_execute(cmd)
+
+ def unset_route(self, network, prefix_length, gateway, interface):
+ self._sudo_execute('ip route delete {}/{}'.
+ format(network, prefix_length))
+
+ def arp_ping(self, destination_address, source_interface):
+ self._sudo_execute('arping -c 1 -I {} {}'.format(source_interface,
+ destination_address))
+
+ def ping(self, destination_address, source_interface):
+ self._execute('ping -c 1 -w 5 -I {} {}'.format(source_interface,
+ destination_address))
+
+ def flush_ip_addresses(self, interface):
+ self._sudo_execute('ip addr flush dev {}'.format(interface))
+
+
+class Dut(IPv4Node):
+ """Device under test"""
+ def __init__(self, node_info):
+ super(Dut, self).__init__(node_info)
+
+ def get_sw_if_index(self, interface):
+ """Get sw_if_index of specified interface from current node
+ :param interface: interface name
+ :type interface: str
+ :return: sw_if_index of 'int' type
+ """
+ return Topology().get_interface_sw_index(self.node_info, interface)
+
+ def exec_vat(self, script, **args):
+ """Wrapper for VAT executor.
+ :param script: script to execute
+ :param args: parameters to the script
+ :type script: str
+ :type args: dict
+ :return: nothing
+ """
+ # TODO: check return value
+ VatExecutor.cmd_from_template(self.node_info, script, **args)
+
+ def set_ip(self, interface, address, prefix_length):
+ self.exec_vat('add_ip_address.vat',
+ sw_if_index=self.get_sw_if_index(interface),
+ address=address, prefix_length=prefix_length)
+
+ def set_interface_state(self, interface, state):
+ if state == 'up':
+ state = 'admin-up link-up'
+ elif state == 'down':
+ state = 'admin-down link-down'
+ else:
+ raise Exception('Unexpected interface state: {}'.format(state))
+
+ self.exec_vat('set_if_state.vat',
+ sw_if_index=self.get_sw_if_index(interface), state=state)
+
+ def set_route(self, network, prefix_length, gateway, interface):
+ sw_if_index = self.get_sw_if_index(interface)
+ self.exec_vat('add_route.vat',
+ network=network, prefix_length=prefix_length,
+ gateway=gateway, sw_if_index=sw_if_index)
+
+ def unset_route(self, network, prefix_length, gateway, interface):
+ self.exec_vat('del_route.vat', network=network,
+ prefix_length=prefix_length, gateway=gateway,
+ sw_if_index=self.get_sw_if_index(interface))
+
+ def arp_ping(self, destination_address, source_interface):
+ pass
+
+ def flush_ip_addresses(self, interface):
+ self.exec_vat('flush_ip_addresses.vat',
+ sw_if_index=self.get_sw_if_index(interface))
+
+ def ping(self, destination_address, source_interface):
+ pass
+
+
+def get_node(node_info):
+ """Creates a class instance derived from Node based on type.
+ :param node_info: dictionary containing information on nodes in topology
+ :return: Class instance that is derived from Node
+ """
+ if node_info['type'] == NodeType.TG:
+ return Tg(node_info)
+ elif node_info['type'] == NodeType.DUT:
+ return Dut(node_info)
+ else:
+ raise NotImplementedError('Node type "{}" unsupported!'.
+ format(node_info['type']))
+
+
+def get_node_hostname(node_info):
+ """Get string identifying specifed node.
+ :param node_info: Node in the topology.
+ :type node_info: Dict
+ :return: String identifying node.
+ """
+ return node_info['host']
+
+
+class IPv4Util(object):
+ """Implements keywords for IPv4 tests."""
+
+ ADDRESSES = {} # holds configured IPv4 addresses
+ PREFIXES = {} # holds configured IPv4 addresses' prefixes
+ SUBNETS = {} # holds configured IPv4 addresses' subnets
+
+ """
+ Helper dictionary used when setting up ipv4 addresses in topology
+
+ Example value:
+ 'link1': { b'port1': {b'addr': b'192.168.3.1'},
+ b'port2': {b'addr': b'192.168.3.2'},
+ b'prefix': 24,
+ b'subnet': b'192.168.3.0'}
+ """
+ topology_helper = None
+
+ @staticmethod
+ def next_address(subnet):
+ """Get next unused IPv4 address from a subnet
+ :param subnet: holds available IPv4 addresses
+ :return: tuple (ipv4_address, prefix_length)
+ """
+ for i in range(1, 4):
+ # build a key and try to get it from address dictionary
+ interface = 'port{}'.format(i)
+ if interface in subnet:
+ addr = subnet[interface]['addr']
+ del subnet[interface]
+ return addr, subnet['prefix']
+ raise Exception('Not enough ipv4 addresses in subnet')
+
+ @staticmethod
+ def next_network(nodes_addr):
+ """Get next unused network from dictionary
+ :param nodes_addr: dictionary of available networks
+ :return: dictionary describing an IPv4 subnet with addresses
+ """
+ assert_not_equal(len(nodes_addr), 0, 'Not enough networks')
+ _, subnet = nodes_addr.popitem()
+ return subnet
+
+ @staticmethod
+ def configure_ipv4_addr_on_node(node, nodes_addr):
+ """Configure IPv4 address for all interfaces on a node in topology
+ :param node: dictionary containing information about node
+ :param nodes_addr: dictionary containing IPv4 addresses
+ :return:
+ """
+ for interface, interface_data in node['interfaces'].iteritems():
+ if interface == 'mgmt':
+ continue
+ if interface_data['link'] not in IPv4Util.topology_helper:
+ IPv4Util.topology_helper[interface_data['link']] = \
+ IPv4Util.next_network(nodes_addr)
+
+ network = IPv4Util.topology_helper[interface_data['link']]
+ address, prefix = IPv4Util.next_address(network)
+
+ get_node(node).set_ip(interface_data['name'], address, prefix)
+ key = (get_node_hostname(node), interface_data['name'])
+ IPv4Util.ADDRESSES[key] = address
+ IPv4Util.PREFIXES[key] = prefix
+ IPv4Util.SUBNETS[key] = network['subnet']
+
+ @staticmethod
+ def nodes_setup_ipv4_addresses(nodes_info, nodes_addr):
+ """Configure IPv4 addresses on all non-management interfaces for each
+ node in nodes_info
+ :param nodes_info: dictionary containing information on all nodes
+ in topology
+ :param nodes_addr: dictionary containing IPv4 addresses
+ :return: nothing
+ """
+ IPv4Util.topology_helper = {}
+ # make a deep copy of nodes_addr because of modifications
+ nodes_addr_copy = copy.deepcopy(nodes_addr)
+ for _, node in nodes_info.iteritems():
+ IPv4Util.configure_ipv4_addr_on_node(node, nodes_addr_copy)
+
+ @staticmethod
+ def nodes_clear_ipv4_addresses(nodes):
+ """Clear all addresses from all nodes in topology
+ :param nodes: dictionary containing information on all nodes
+ :return: nothing
+ """
+ for _, node in nodes.iteritems():
+ for interface, interface_data in node['interfaces'].iteritems():
+ if interface == 'mgmt':
+ continue
+ IPv4Util.flush_ip_addresses(interface_data['name'], node)
+
+ # TODO: not ipv4-specific, move to another class
+ @staticmethod
+ @keyword('Node "${node}" interface "${interface}" is in "${state}" state')
+ def set_interface_state(node, interface, state):
+ """See IPv4Node.set_interface_state for more information.
+ :param node:
+ :param interface:
+ :param state:
+ :return:
+ """
+ log.debug('Node {} interface {} is in {} state'.format(
+ get_node_hostname(node), interface, state))
+ get_node(node).set_interface_state(interface, state)
+
+ @staticmethod
+ @keyword('Node "${node}" interface "${port}" has IPv4 address '
+ '"${address}" with prefix length "${prefix_length}"')
+ def set_interface_address(node, interface, address, length):
+ """See IPv4Node.set_ip for more information.
+ :param node:
+ :param interface:
+ :param address:
+ :param length:
+ :return:
+ """
+ log.debug('Node {} interface {} has IPv4 address {} with prefix '
+ 'length {}'.format(get_node_hostname(node), interface,
+ address, length))
+ get_node(node).set_ip(interface, address, int(length))
+ hostname = get_node_hostname(node)
+ IPv4Util.ADDRESSES[hostname, interface] = address
+ IPv4Util.PREFIXES[hostname, interface] = int(length)
+ # TODO: Calculate subnet from ip address and prefix length.
+ # IPv4Util.SUBNETS[hostname, interface] =
+
+ @staticmethod
+ @keyword('From node "${node}" interface "${port}" ARP-ping '
+ 'IPv4 address "${ip_address}"')
+ def arp_ping(node, interface, ip_address):
+ log.debug('From node {} interface {} ARP-ping IPv4 address {}'.
+ format(get_node_hostname(node), interface, ip_address))
+ get_node(node).arp_ping(ip_address, interface)
+
+ @staticmethod
+ @keyword('Node "${node}" routes to IPv4 network "${network}" with prefix '
+ 'length "${prefix_length}" using interface "${interface}" via '
+ '"${gateway}"')
+ def set_route(node, network, prefix_length, interface, gateway):
+ """See IPv4Node.set_route for more information.
+ :param node:
+ :param network:
+ :param prefix_length:
+ :param interface:
+ :param gateway:
+ :return:
+ """
+ log.debug('Node {} routes to network {} with prefix length {} '
+ 'via {} interface {}'.format(get_node_hostname(node),
+ network, prefix_length,
+ gateway, interface))
+ get_node(node).set_route(network, int(prefix_length),
+ gateway, interface)
+
+ @staticmethod
+ @keyword('Remove IPv4 route from "${node}" to network "${network}" with '
+ 'prefix length "${prefix_length}" interface "${interface}" via '
+ '"${gateway}"')
+ def unset_route(node, network, prefix_length, interface, gateway):
+ """See IPv4Node.unset_route for more information.
+ :param node:
+ :param network:
+ :param prefix_length:
+ :param interface:
+ :param gateway:
+ :return:
+ """
+ get_node(node).unset_route(network, prefix_length, gateway, interface)
+
+ @staticmethod
+ @keyword('After ping is sent from node "${src_node}" interface '
+ '"${src_port}" with destination IPv4 address of node '
+ '"${dst_node}" interface "${dst_port}" a ping response arrives '
+ 'and TTL is decreased by "${ttl_dec}"')
+ def send_ping(src_node, src_port, dst_node, dst_port, hops):
+ """Send IPv4 ping and wait for response.
+ :param src_node: Source node.
+ :param src_port: Source interface.
+ :param dst_node: Destination node.
+ :param dst_port: Destination interface.
+ :param hops: Number of hops between src_node and dst_node.
+ """
+ log.debug('After ping is sent from node "{}" interface "{}" '
+ 'with destination IPv4 address of node "{}" interface "{}" '
+ 'a ping response arrives and TTL is decreased by "${}"'.
+ format(get_node_hostname(src_node), src_port,
+ get_node_hostname(dst_node), dst_port, hops))
+ node = src_node
+ src_mac = Topology.get_interface_mac(src_node, src_port)
+ if dst_node['type'] == NodeType.TG:
+ dst_mac = Topology.get_interface_mac(src_node, src_port)
+ adj_int = Topology.get_adjacent_interface(src_node, src_port)
+ first_hop_mac = adj_int['mac_address']
+ src_ip = IPv4Util.get_ip_addr(src_node, src_port)
+ dst_ip = IPv4Util.get_ip_addr(dst_node, dst_port)
+ args = '--src_if "{}" --src_mac "{}" --first_hop_mac "{}" ' \
+ '--src_ip "{}" --dst_ip "{}" --hops "{}"'\
+ .format(src_port, src_mac, first_hop_mac, src_ip, dst_ip, hops)
+ if dst_node['type'] == NodeType.TG:
+ args += ' --dst_if "{}" --dst_mac "{}"'.format(dst_port, dst_mac)
+ TrafficScriptExecutor.run_traffic_script_on_node(
+ "ipv4_ping_ttl_check.py", node, args)
+
+ @staticmethod
+ @keyword('Get IPv4 address of node "${node}" interface "${port}"')
+ def get_ip_addr(node, port):
+ """Get IPv4 address configured on specified interface
+ :param node: node dictionary
+ :param port: interface name
+ :return: IPv4 address of specified interface as a 'str' type
+ """
+ log.debug('Get IPv4 address of node {} interface {}'.
+ format(get_node_hostname(node), port))
+ return IPv4Util.ADDRESSES[(get_node_hostname(node), port)]
+
+ @staticmethod
+ @keyword('Get IPv4 address prefix of node "${node}" interface "${port}"')
+ def get_ip_addr_prefix(node, port):
+ """ Get IPv4 address prefix for specified interface.
+ :param node: Node dictionary.
+ :param port: Interface name.
+ """
+ log.debug('Get IPv4 address prefix of node {} interface {}'.
+ format(get_node_hostname(node), port))
+ return IPv4Util.PREFIXES[(get_node_hostname(node), port)]
+
+ @staticmethod
+ @keyword('Get IPv4 subnet of node "${node}" interface "${port}"')
+ def get_ip_addr_subnet(node, port):
+ """ Get IPv4 subnet of specified interface.
+ :param node: Node dictionary.
+ :param port: Interface name.
+ """
+ log.debug('Get IPv4 subnet of node {} interface {}'.
+ format(get_node_hostname(node), port))
+ return IPv4Util.SUBNETS[(get_node_hostname(node), port)]
+
+ @staticmethod
+ @keyword('Flush IPv4 addresses "${port}" "${node}"')
+ def flush_ip_addresses(port, node):
+ """See IPv4Node.flush_ip_addresses for more information.
+ :param port:
+ :param node:
+ :return:
+ """
+ key = (get_node_hostname(node), port)
+ del IPv4Util.ADDRESSES[key]
+ del IPv4Util.PREFIXES[key]
+ del IPv4Util.SUBNETS[key]
+ get_node(node).flush_ip_addresses(port)