# Copyright (c) 2021 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Path utilities library for nodes in the topology."""
from resources.libraries.python.topology import Topology
class NodePath:
"""Path utilities for nodes in the topology.
:Example:
node1--link1-->node2--link2-->node3--link3-->node2--link4-->node1
RobotFramework:
| Library | resources/libraries/python/NodePath.py
| Path test
| | [Arguments] | ${node1} | ${node2} | ${node3}
| | Append Node | ${nodes1}
| | Append Node | ${nodes2}
| | Append Nodes | ${nodes3} | ${nodes2}
| | Append Node | ${nodes1}
| | Compute Path | ${FALSE}
| | ${first_int} | ${node}= | First Interface
| | ${last_int} | ${node}= | Last Interface
| | ${first_ingress} | ${node}= | First Ingress Interface
| | ${last_egress} | ${node}= | Last Egress Interface
| | ${next} | ${node}= | Next Interface
Python:
>>> from NodePath import NodePath
>>> path = NodePath()
>>> path.append_node(node1)
>>> path.append_node(node2)
>>> path.append_nodes(node3, node2)
>>> path.append_node(node1)
>>> path.compute_path()
>>> (interface, node) = path.first_interface()
>>> (interface, node) = path.last_interface()
>>> (interface, node) = path.first_ingress_interface()
>>> (interface, node) = path.last_egress_interface()
>>> (interface, node) = path.next_interface()
"""
def __init__(self):
self._nodes = []
self._nodes_filter = []
self._links = []
self._path = []
self._path_iter = []
def append_node(self, node, filter_list=None):
"""Append node to the path.
:param node: Node to append to the path.
:param filter_list: Filter criteria list.
:type node: dict
:type filter_list: list of strings
"""
self._nodes_filter.append(filter_list)
self._nodes.append(node)
def append_nodes(self, *nodes, filter_list=None):
"""Append nodes to the path.
:param nodes: Nodes to append to the path.
:param filter_list: Filter criteria list.
:type nodes: dict
:type filter_list: list of strings
.. note:: Node order does matter.
"""
for node in nodes:
self.append_node(node, filter_list=filter_list)
def clear_path(self):
"""Clear path."""
self._nodes = []
self._nodes_filter = []
self._links = []
self._path = []
self._path_iter = []
def compute_path(self, always_same_link=True):
"""Compute path for added nodes.
.. note:: First add at least two nodes to the topology.
:param always_same_link: If True use always same link between two nodes
in path. If False use different link (if available)
between two nodes if one link was used before.
:type always_same_link: bool
:raises RuntimeError: If not enough nodes for path.
"""
nodes = self._nodes
if len(nodes) < 2:
raise RuntimeError(u"Not enough nodes to compute path")
for idx in range(0, len(nodes) - 1):
topo = Topology()
node1 = nodes[idx]
node2 = nodes[idx + 1]
n1_list = self._nodes_filter[idx]
n2_list = self._nodes_filter[idx + 1]
links = topo.get_active_connecting_links(
node1, node2, filter_list_node1=n1_list,
filter_list_node2=n2_list
)
if not links:
raise RuntimeError(
f"No link between {node1[u'host']} and {node2[u'host']}"
)
# Not using set operations, as we need deterministic order.
if always_same_link:
l_set = [link for link in links if link in self._links]
else:
l_set = [link for link in links if link not in self._links]
if not l_set:
raise RuntimeError(
f"No free link between {node1[u'host']} and "
f"{node2[u'host']}, all links already used"
)
if not l_set:
link = links[0]
else:
link = l_set[0]
self._links.append(link)
interface1 = topo.get_interface_by_link_name(node1, link)
interface2 = topo.get_interface_by_link_name(node2, link)
self._path.append((interface1, node1))
self._path.append((interface2, node2))
self._path_iter.extend(self._path)
self._path_iter.reverse()
def next_interface(self):
"""Path interface iterator.
:returns: Interface and node or None if not next interface.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
"""
if not self._path_iter:
return None, None
return self._path_iter.pop()
def first_interface(self):
"""Return first interface on the path.
:returns: Interface and node.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
"""
if not self._path:
raise RuntimeError(u"No path for topology")
return self._path[0]
def last_interface(self):
"""Return last interface on the path.
:returns: Interface and node.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
"""
if not self._path:
raise RuntimeError(u"No path for topology")
return self._path[-1]
def first_ingress_interface(self):
"""Return first ingress interface on the path.
:returns: Interface and node.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
"""
if not self._path:
raise RuntimeError(u"No path for topology")
return self._path[1]
def last_egress_interface(self):
"""Return last egress interface on the path.
:returns: Interface and node.
:rtype: tuple (str, dict)
.. note:: Call compute_path before.
"""
if not self._path:
raise RuntimeError(u"No path for topology")
return self._path[-2]
def compute_circular_topology(self, nodes, filter_list=None, nic_pfs=1,
always_same_link=False, topo_has_tg=True):
"""Return computed circular path.
:param nodes: Nodes to append to the path.
:param filter_list: Filter criteria list.
:param nic_pfs: Number of PF of NIC.
:param always_same_link: If True use always same link between two nodes
in path. If False use different link (if available)
between two nodes if one link was used before.
:param topo_has_tg: If True, the topology has a TG node. If False,
the topology consists entirely of DUT nodes.
:type nodes: dict
:type filter_list: list of strings
:type nic_pfs: int
:type always_same_link: bool
:type topo_has_tg: bool
:returns: Topology information dictionary.
:rtype: dict
:raises RuntimeError: If unsupported combination of parameters.
"""
t_dict = dict()
duts = [key for key in nodes if u"DUT" in key]
t_dict[u"duts"] = duts
t_dict[u"duts_count"] = len(duts)
t_dict[u"int"] = u"pf"
for _ in range(0, nic_pfs // 2):
if topo_has_tg:
self.append_node(nodes[u"TG"])
for dut in duts:
self.append_node(nodes[dut], filter_list=filter_list)
if topo_has_tg:
self.append_node(nodes[u"TG"])
self.compute_path(always_same_link)
n_idx = 0 # node index
t_idx = 1 # TG interface index
d_idx = 0 # DUT interface index
prev_host = None
while True:
interface, node = self.next_interface()
if not interface:
break
if topo_has_tg and node.get(u"type") == u"TG":
n_pfx = f"TG" # node prefix
p_pfx = f"pf{t_idx}" # physical interface prefix
i_pfx = f"if{t_idx}" # [backwards compatible] interface prefix
n_idx = 0
t_idx = t_idx + 1
elif topo_has_tg:
# Each node has 2 interfaces, starting with 1
# Calculate prefixes appropriately for current
# path topology nomenclature:
# tg1_if1 -> dut1_if1 -> dut1_if2 ->
# [dut2_if1 -> dut2_if2 ...] -> tg1_if2
n_pfx = f"DUT{n_idx // 2 + 1}"
p_pfx = f"pf{d_idx % 2 + t_idx - 1}"
i_pfx = f"if{d_idx % 2 + t_idx - 1}"
n_idx = n_idx + 1
d_idx = d_idx + 1
elif not topo_has_tg and always_same_link:
this_host = node.get(u"host")
if prev_host != this_host:
# When moving to a new host in the path,
# increment the node index (n_idx) and
# reset DUT interface index (d_idx) to 1.
n_idx = n_idx + 1
d_idx = 1
n_pfx = f"DUT{n_idx}"
p_pfx = f"pf{d_idx}"
i_pfx = f"if{d_idx}"
d_idx = d_idx + 1
else:
raise RuntimeError(u"Unsupported combination of paramters")
t_dict[f"{n_pfx}"] = node
t_dict[f"{n_pfx}_{p_pfx}"] = [interface]
t_dict[f"{n_pfx}_{p_pfx}_mac"] = \
[Topology.get_interface_mac(node, interface)]
t_dict[f"{n_pfx}_{p_pfx}_vlan"] = \
[Topology.get_interface_vlan(node, interface)]
t_dict[f"{n_pfx}_{p_pfx}_pci"] = \
[Topology.get_interface_pci_addr(node, interface)]
t_dict[f"{n_pfx}_{p_pfx}_ip4_addr"] = \
[Topology.get_interface_ip4(node, interface)]
t_dict[f"{n_pfx}_{p_pfx}_ip4_prefix"] = \
[Topology.get_interface_ip4_prefix_length(node, interface)]
if f"{n_pfx}_pf_pci" not in t_dict:
t_dict[f"{n_pfx}_pf_pci"] = []
t_dict[f"{n_pfx}_pf_pci"].append(
Topology.get_interface_pci_addr(node, interface))
if f"{n_pfx}_pf_keys" not in t_dict:
t_dict[f"{n_pfx}_pf_keys"] = []
t_dict[f"{n_pfx}_pf_keys"].append(interface)
# Backward compatibility below
t_dict[f"{n_pfx.lower()}_{i_pfx}"] = interface
t_dict[f"{n_pfx.lower()}_{i_pfx}_mac"] = \
Topology.get_interface_mac(node, interface)
t_dict[f"{n_pfx.lower()}_{i_pfx}_pci"] = \
Topology.get_interface_pci_addr(node, interface)
t_dict[f"{n_pfx.lower()}_{i_pfx}_ip4_addr"] = \
Topology.get_interface_ip4(node, interface)
t_dict[f"{n_pfx.lower()}_{i_pfx}_ip4_prefix"] = \
Topology.get_interface_ip4_prefix_length(node, interface)
self.clear_path()
return t_dict