aboutsummaryrefslogtreecommitdiffstats
path: root/resources/libraries/python/IPv4Util.py
diff options
context:
space:
mode:
Diffstat (limited to 'resources/libraries/python/IPv4Util.py')
-rw-r--r--resources/libraries/python/IPv4Util.py499
1 files changed, 38 insertions, 461 deletions
diff --git a/resources/libraries/python/IPv4Util.py b/resources/libraries/python/IPv4Util.py
index d3a016d767..30e185c9ff 100644
--- a/resources/libraries/python/IPv4Util.py
+++ b/resources/libraries/python/IPv4Util.py
@@ -13,430 +13,26 @@
"""Implements IPv4 RobotFramework keywords"""
-from socket import inet_ntoa
-from struct import pack
-from abc import ABCMeta, abstractmethod
-import copy
-
from robot.api import logger as log
from robot.api.deco import keyword
-from robot.utils.asserts import assert_not_equal
-import resources.libraries.python.ssh as ssh
from resources.libraries.python.topology import Topology
from resources.libraries.python.topology import NodeType
-from resources.libraries.python.VatExecutor import VatExecutor
from resources.libraries.python.TrafficScriptExecutor\
import TrafficScriptExecutor
-
-
-class IPv4Node(object):
- """Abstract class of a node in a topology."""
- __metaclass__ = ABCMeta
-
- def __init__(self, node_info):
- self.node_info = node_info
-
- @staticmethod
- def _get_netmask(prefix_length):
- bits = 0xffffffff ^ (1 << 32 - prefix_length) - 1
- return inet_ntoa(pack('>I', bits))
-
- @abstractmethod
- def set_ip(self, interface, address, prefix_length):
- """Configure IPv4 address on interface
-
- :param interface: interface name
- :param address:
- :param prefix_length:
- :type interface: str
- :type address: str
- :type prefix_length: int
- :return: nothing
- """
- pass
-
- @abstractmethod
- def set_interface_state(self, interface, state):
- """Set interface state
-
- :param interface: interface name string
- :param state: one of following values: "up" or "down"
- :return: nothing
- """
- pass
-
- @abstractmethod
- def set_route(self, network, prefix_length, gateway, interface):
- """Configure IPv4 route
-
- :param network: network IPv4 address
- :param prefix_length: mask length
- :param gateway: IPv4 address of the gateway
- :param interface: interface name
- :type network: str
- :type prefix_length: int
- :type gateway: str
- :type interface: str
- :return: nothing
- """
- pass
-
- @abstractmethod
- def unset_route(self, network, prefix_length, gateway, interface):
- """Remove specified IPv4 route
-
- :param network: network IPv4 address
- :param prefix_length: mask length
- :param gateway: IPv4 address of the gateway
- :param interface: interface name
- :type network: str
- :type prefix_length: int
- :type gateway: str
- :type interface: str
- :return: nothing
- """
- pass
-
- @abstractmethod
- def flush_ip_addresses(self, interface):
- """Flush all IPv4 addresses from specified interface
-
- :param interface: interface name
- :type interface: str
- :return: nothing
- """
- pass
-
- @abstractmethod
- def ping(self, destination_address, source_interface):
- """Send an ICMP request to destination node
-
- :param destination_address: address to send the ICMP request
- :param source_interface:
- :type destination_address: str
- :type source_interface: str
- :return: nothing
- """
- pass
-
-
-class Tg(IPv4Node):
- """Traffic generator node"""
- def __init__(self, node_info):
- super(Tg, self).__init__(node_info)
-
- def _execute(self, cmd):
- return ssh.exec_cmd_no_error(self.node_info, cmd)
-
- def _sudo_execute(self, cmd):
- return ssh.exec_cmd_no_error(self.node_info, cmd, sudo=True)
-
- def set_ip(self, interface, address, prefix_length):
- cmd = 'ip -4 addr flush dev {}'.format(interface)
- self._sudo_execute(cmd)
- cmd = 'ip addr add {}/{} dev {}'.format(address, prefix_length,
- interface)
- self._sudo_execute(cmd)
-
- # TODO: not ipv4-specific, move to another class
- def set_interface_state(self, interface, state):
- cmd = 'ip link set {} {}'.format(interface, state)
- self._sudo_execute(cmd)
-
- def set_route(self, network, prefix_length, gateway, interface):
- netmask = self._get_netmask(prefix_length)
- cmd = 'route add -net {} netmask {} gw {}'.\
- format(network, netmask, gateway)
- self._sudo_execute(cmd)
-
- def unset_route(self, network, prefix_length, gateway, interface):
- self._sudo_execute('ip route delete {}/{}'.
- format(network, prefix_length))
-
- def arp_ping(self, destination_address, source_interface):
- self._sudo_execute('arping -c 1 -I {} {}'.format(source_interface,
- destination_address))
-
- def ping(self, destination_address, source_interface):
- self._execute('ping -c 1 -w 5 -I {} {}'.format(source_interface,
- destination_address))
-
- def flush_ip_addresses(self, interface):
- self._sudo_execute('ip addr flush dev {}'.format(interface))
-
-
-class Dut(IPv4Node):
- """Device under test"""
- def __init__(self, node_info):
- super(Dut, self).__init__(node_info)
-
- def get_sw_if_index(self, interface):
- """Get sw_if_index of specified interface from current node
-
- :param interface: interface name
- :type interface: str
- :return: sw_if_index of 'int' type
- """
- return Topology().get_interface_sw_index(self.node_info, interface)
-
- def exec_vat(self, script, **args):
- """Wrapper for VAT executor.
-
- :param script: script to execute
- :param args: parameters to the script
- :type script: str
- :type args: dict
- :return: nothing
- """
- # TODO: check return value
- VatExecutor.cmd_from_template(self.node_info, script, **args)
-
- def set_arp(self, interface, ip_address, mac_address):
- """Set entry in ARP cache.
-
- :param interface: Interface name.
- :param ip_address: IP address.
- :param mac_address: MAC address.
- :type interface: str
- :type ip_address: str
- :type mac_address: str
- """
- self.exec_vat('add_ip_neighbor.vat',
- sw_if_index=self.get_sw_if_index(interface),
- ip_address=ip_address, mac_address=mac_address)
-
- def set_ip(self, interface, address, prefix_length):
- self.exec_vat('add_ip_address.vat',
- sw_if_index=self.get_sw_if_index(interface),
- address=address, prefix_length=prefix_length)
-
- def set_interface_state(self, interface, state):
- if state == 'up':
- state = 'admin-up link-up'
- elif state == 'down':
- state = 'admin-down link-down'
- else:
- raise Exception('Unexpected interface state: {}'.format(state))
-
- self.exec_vat('set_if_state.vat',
- sw_if_index=self.get_sw_if_index(interface), state=state)
-
- def set_route(self, network, prefix_length, gateway, interface):
- sw_if_index = self.get_sw_if_index(interface)
- self.exec_vat('add_route.vat',
- network=network, prefix_length=prefix_length,
- gateway=gateway, sw_if_index=sw_if_index)
-
- def unset_route(self, network, prefix_length, gateway, interface):
- self.exec_vat('del_route.vat', network=network,
- prefix_length=prefix_length, gateway=gateway,
- sw_if_index=self.get_sw_if_index(interface))
-
- def arp_ping(self, destination_address, source_interface):
- pass
-
- def flush_ip_addresses(self, interface):
- self.exec_vat('flush_ip_addresses.vat',
- sw_if_index=self.get_sw_if_index(interface))
-
- def ping(self, destination_address, source_interface):
- pass
-
-
-def get_node(node_info):
- """Creates a class instance derived from Node based on type.
-
- :param node_info: dictionary containing information on nodes in topology
- :return: Class instance that is derived from Node
- """
- if node_info['type'] == NodeType.TG:
- return Tg(node_info)
- elif node_info['type'] == NodeType.DUT:
- return Dut(node_info)
- else:
- raise NotImplementedError('Node type "{}" unsupported!'.
- format(node_info['type']))
-
-
-def get_node_hostname(node_info):
- """Get string identifying specifed node.
-
- :param node_info: Node in the topology.
- :type node_info: Dict
- :return: String identifying node.
- """
- return node_info['host']
+from resources.libraries.python.IPv4Setup import get_node
class IPv4Util(object):
"""Implements keywords for IPv4 tests."""
- ADDRESSES = {} # holds configured IPv4 addresses
- PREFIXES = {} # holds configured IPv4 addresses' prefixes
- SUBNETS = {} # holds configured IPv4 addresses' subnets
-
- """
- Helper dictionary used when setting up ipv4 addresses in topology
-
- Example value:
- 'link1': { b'port1': {b'addr': b'192.168.3.1'},
- b'port2': {b'addr': b'192.168.3.2'},
- b'prefix': 24,
- b'subnet': b'192.168.3.0'}
- """
- topology_helper = None
-
- @staticmethod
- def setup_arp_on_all_duts(nodes_info):
- """For all DUT nodes extract MAC and IP addresses of adjacent interfaces
- from topology and use them to setup ARP entries.
-
- :param nodes_info: Dictionary containing information on all nodes
- in topology.
- :type nodes_info: dict
- """
- for node in nodes_info.values():
- if node['type'] == NodeType.TG:
- continue
- for interface, interface_data in node['interfaces'].iteritems():
- if interface == 'mgmt':
- continue
- interface_name = interface_data['name']
- adj_node, adj_int = Topology.\
- get_adjacent_node_and_interface(nodes_info, node,
- interface_name)
- ip_address = IPv4Util.get_ip_addr(adj_node, adj_int['name'])
- mac_address = adj_int['mac_address']
- get_node(node).set_arp(interface_name, ip_address, mac_address)
-
- @staticmethod
- def next_address(subnet):
- """Get next unused IPv4 address from a subnet
-
- :param subnet: holds available IPv4 addresses
- :return: tuple (ipv4_address, prefix_length)
- """
- for i in range(1, 4):
- # build a key and try to get it from address dictionary
- interface = 'port{}'.format(i)
- if interface in subnet:
- addr = subnet[interface]['addr']
- del subnet[interface]
- return addr, subnet['prefix']
- raise Exception('Not enough ipv4 addresses in subnet')
-
- @staticmethod
- def next_network(nodes_addr):
- """Get next unused network from dictionary
-
- :param nodes_addr: dictionary of available networks
- :return: dictionary describing an IPv4 subnet with addresses
- """
- assert_not_equal(len(nodes_addr), 0, 'Not enough networks')
- _, subnet = nodes_addr.popitem()
- return subnet
-
- @staticmethod
- def configure_ipv4_addr_on_node(node, nodes_addr):
- """Configure IPv4 address for all non-management interfaces
- on a node in topology.
-
- :param node: dictionary containing information about node
- :param nodes_addr: dictionary containing IPv4 addresses
- :return:
- """
- for interface, interface_data in node['interfaces'].iteritems():
- if interface == 'mgmt':
- continue
- if interface_data['link'] not in IPv4Util.topology_helper:
- IPv4Util.topology_helper[interface_data['link']] = \
- IPv4Util.next_network(nodes_addr)
-
- network = IPv4Util.topology_helper[interface_data['link']]
- address, prefix = IPv4Util.next_address(network)
-
- if node['type'] != NodeType.TG:
- get_node(node).set_ip(interface_data['name'], address, prefix)
- get_node(node).set_interface_state(interface_data['name'], 'up')
-
- key = (get_node_hostname(node), interface_data['name'])
- IPv4Util.ADDRESSES[key] = address
- IPv4Util.PREFIXES[key] = prefix
- IPv4Util.SUBNETS[key] = network['subnet']
-
- @staticmethod
- def dut_nodes_setup_ipv4_addresses(nodes_info, nodes_addr):
- """Configure IPv4 addresses on all non-management interfaces for each
- node in nodes_info if node type is not traffic generator
-
- :param nodes_info: dictionary containing information on all nodes
- in topology
- :param nodes_addr: dictionary containing IPv4 addresses
- :return: nothing
- """
- IPv4Util.topology_helper = {}
- # make a deep copy of nodes_addr because of modifications
- nodes_addr_copy = copy.deepcopy(nodes_addr)
- for node in nodes_info.values():
- IPv4Util.configure_ipv4_addr_on_node(node, nodes_addr_copy)
-
- @staticmethod
- def nodes_clear_ipv4_addresses(nodes):
- """Clear all addresses from all nodes in topology
-
- :param nodes: dictionary containing information on all nodes
- :return: nothing
- """
- for node in nodes.values():
- for interface, interface_data in node['interfaces'].iteritems():
- if interface == 'mgmt':
- continue
- IPv4Util.flush_ip_addresses(interface_data['name'], node)
-
- # TODO: not ipv4-specific, move to another class
- @staticmethod
- @keyword('Node "${node}" interface "${interface}" is in "${state}" state')
- def set_interface_state(node, interface, state):
- """See IPv4Node.set_interface_state for more information.
-
- :param node:
- :param interface:
- :param state:
- :return:
- """
- log.debug('Node {} interface {} is in {} state'.format(
- get_node_hostname(node), interface, state))
- get_node(node).set_interface_state(interface, state)
-
- @staticmethod
- @keyword('Node "${node}" interface "${port}" has IPv4 address '
- '"${address}" with prefix length "${prefix_length}"')
- def set_interface_address(node, interface, address, length):
- """See IPv4Node.set_ip for more information.
-
- :param node:
- :param interface:
- :param address:
- :param length:
- :return:
- """
- log.debug('Node {} interface {} has IPv4 address {} with prefix '
- 'length {}'.format(get_node_hostname(node), interface,
- address, length))
- get_node(node).set_ip(interface, address, int(length))
- hostname = get_node_hostname(node)
- IPv4Util.ADDRESSES[hostname, interface] = address
- IPv4Util.PREFIXES[hostname, interface] = int(length)
- # TODO: Calculate subnet from ip address and prefix length.
- # IPv4Util.SUBNETS[hostname, interface] =
-
@staticmethod
@keyword('From node "${node}" interface "${port}" ARP-ping '
'IPv4 address "${ip_address}"')
def arp_ping(node, interface, ip_address):
log.debug('From node {} interface {} ARP-ping IPv4 address {}'.
- format(get_node_hostname(node), interface, ip_address))
+ format(Topology.get_node_hostname(node),
+ interface, ip_address))
get_node(node).arp_ping(ip_address, interface)
@staticmethod
@@ -454,40 +50,28 @@ class IPv4Util(object):
:return:
"""
log.debug('Node {} routes to network {} with prefix length {} '
- 'via {} interface {}'.format(get_node_hostname(node),
+ 'via {} interface {}'.format(Topology.get_node_hostname(node),
network, prefix_length,
gateway, interface))
get_node(node).set_route(network, int(prefix_length),
gateway, interface)
@staticmethod
- @keyword('Remove IPv4 route from "${node}" to network "${network}" with '
- 'prefix length "${prefix_length}" interface "${interface}" via '
- '"${gateway}"')
- def unset_route(node, network, prefix_length, interface, gateway):
- """See IPv4Node.unset_route for more information.
-
- :param node:
- :param network:
- :param prefix_length:
- :param interface:
- :param gateway:
- :return:
- """
- get_node(node).unset_route(network, prefix_length, gateway, interface)
-
- @staticmethod
@keyword('After ping is sent in topology "${nodes_info}" from node '
- '"${src_node}" interface "${src_port}" with destination IPv4 '
- 'address of node "${dst_node}" interface "${dst_port}" a ping '
- 'response arrives and TTL is decreased by "${hops}"')
- def send_ping(nodes_info, src_node, src_port, dst_node, dst_port, hops):
+ '"${src_node}" interface "${src_port}" "${src_ip}" with '
+ 'destination IPv4 address "${dst_ip}" of node "${dst_node}" '
+ 'interface "${dst_port}" a ping response arrives and TTL is '
+ 'decreased by "${hops}"')
+ def send_ping(nodes_info, src_node, src_port, src_ip, dst_ip, dst_node,
+ dst_port, hops):
"""Send IPv4 ping and wait for response.
:param nodes_info: Dictionary containing information on all nodes
in topology.
:param src_node: Source node.
:param src_port: Source interface.
+ :param src_ip: Source ipv4 address.
+ :param dst_ip: Destination ipv4 address.
:param dst_node: Destination node.
:param dst_port: Destination interface.
:param hops: Number of hops between src_node and dst_node.
@@ -495,61 +79,58 @@ class IPv4Util(object):
log.debug('After ping is sent from node "{}" interface "{}" '
'with destination IPv4 address of node "{}" interface "{}" '
'a ping response arrives and TTL is decreased by "${}"'.
- format(get_node_hostname(src_node), src_port,
- get_node_hostname(dst_node), dst_port, hops))
- node = src_node
+ format(Topology.get_node_hostname(src_node), src_port,
+ Topology.get_node_hostname(dst_node), dst_port, hops))
+ dst_mac = None
src_mac = Topology.get_interface_mac(src_node, src_port)
if dst_node['type'] == NodeType.TG:
dst_mac = Topology.get_interface_mac(src_node, src_port)
_, adj_int = Topology.\
get_adjacent_node_and_interface(nodes_info, src_node, src_port)
first_hop_mac = adj_int['mac_address']
- src_ip = IPv4Util.get_ip_addr(src_node, src_port)
- dst_ip = IPv4Util.get_ip_addr(dst_node, dst_port)
args = '--src_if "{}" --src_mac "{}" --first_hop_mac "{}" ' \
'--src_ip "{}" --dst_ip "{}" --hops "{}"'\
.format(src_port, src_mac, first_hop_mac, src_ip, dst_ip, hops)
if dst_node['type'] == NodeType.TG:
args += ' --dst_if "{}" --dst_mac "{}"'.format(dst_port, dst_mac)
TrafficScriptExecutor.run_traffic_script_on_node(
- "ipv4_ping_ttl_check.py", node, args)
+ "ipv4_ping_ttl_check.py", src_node, args)
@staticmethod
- @keyword('Get IPv4 address of node "${node}" interface "${port}"')
- def get_ip_addr(node, port):
- """Get IPv4 address configured on specified interface
-
- :param node: node dictionary
- :param port: interface name
- :return: IPv4 address of specified interface as a 'str' type
- """
- log.debug('Get IPv4 address of node {} interface {}'.
- format(get_node_hostname(node), port))
- return IPv4Util.ADDRESSES[(get_node_hostname(node), port)]
-
- @staticmethod
- @keyword('Get IPv4 address prefix of node "${node}" interface "${port}"')
- def get_ip_addr_prefix(node, port):
+ @keyword('Get IPv4 address prefix of node "${node}" interface "${port}" '
+ 'from "${nodes_addr}"')
+ def get_ip_addr_prefix_length(node, port, nodes_addr):
""" Get IPv4 address prefix for specified interface.
:param node: Node dictionary.
:param port: Interface name.
+ :return: IPv4 prefix length
"""
- log.debug('Get IPv4 address prefix of node {} interface {}'.
- format(get_node_hostname(node), port))
- return IPv4Util.PREFIXES[(get_node_hostname(node), port)]
+ for net in nodes_addr.values():
+ for p in net['ports'].values():
+ if p['node'] == node['host'] and p['if'] == port:
+ return net['prefix']
+
+ raise Exception('Subnet not found for node {n} port {p}'.
+ format(n=node['host'], p=port))
@staticmethod
- @keyword('Get IPv4 subnet of node "${node}" interface "${port}"')
- def get_ip_addr_subnet(node, port):
+ @keyword('Get IPv4 subnet of node "${node}" interface "${port}" from '
+ '"${nodes_addr}"')
+ def get_ip_addr_subnet(node, port, nodes_addr):
""" Get IPv4 subnet of specified interface.
:param node: Node dictionary.
:param port: Interface name.
+ :return: IPv4 subnet of 'str' type
"""
- log.debug('Get IPv4 subnet of node {} interface {}'.
- format(get_node_hostname(node), port))
- return IPv4Util.SUBNETS[(get_node_hostname(node), port)]
+ for net in nodes_addr.values():
+ for p in net['ports'].values():
+ if p['node'] == node['host'] and p['if'] == port:
+ return net['net_addr']
+
+ raise Exception('Subnet not found for node {n} port {p}'.
+ format(n=node['host'], p=port))
@staticmethod
@keyword('Flush IPv4 addresses "${port}" "${node}"')
@@ -560,8 +141,4 @@ class IPv4Util(object):
:param node:
:return:
"""
- key = (get_node_hostname(node), port)
- del IPv4Util.ADDRESSES[key]
- del IPv4Util.PREFIXES[key]
- del IPv4Util.SUBNETS[key]
get_node(node).flush_ip_addresses(port)