path: root/vicn/resource/
diff options
Diffstat (limited to 'vicn/resource/')
1 files changed, 789 insertions, 0 deletions
diff --git a/vicn/resource/ b/vicn/resource/
new file mode 100644
index 00000000..73a43478
--- /dev/null
+++ b/vicn/resource/
@@ -0,0 +1,789 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+# Copyright (c) 2017 Cisco and/or its affiliates.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import networkx as nx
+import logging
+from netmodel.model.type import String
+from netmodel.util.misc import pairwise
+from vicn.core.address_mgr import AddressManager
+from vicn.core.attribute import Attribute
+from vicn.core.exception import ResourceNotFound
+from vicn.core.resource import Resource
+from vicn.core.task import async_task, inline_task
+from vicn.core.task import EmptyTask, BashTask
+from import Channel
+from vicn.resource.ip.route import IPRoute
+from vicn.resource.icn.forwarder import Forwarder
+from vicn.resource.icn.face import L2Face, L4Face, FaceProtocol
+from vicn.resource.icn.producer import Producer
+from vicn.resource.icn.route import Route as ICNRoute
+from vicn.resource.linux.file import TextFile
+from vicn.resource.lxd.lxc_container import LxcContainer
+from vicn.resource.node import Node
+log = logging.getLogger(__name__)
+CMD_CONTAINER_SET_DNS = 'echo "nameserver {ip_dns}" | ' \
+ 'resolvconf -a {interface_name}'
+# For host
+CMD_NAT = '\n'.join([
+ 'iptables -t nat -A POSTROUTING -o {bridge_name} -s {network} ' \
+ '! -d {network} -j MASQUERADE',
+ 'echo 1 > /proc/sys/net/ipv4/ip_forward'
+# For containers
+CMD_IP_FORWARD = 'echo 1 > /proc/sys/net/ipv4/ip_forward'
+HOST_FILE_PATH = "/etc/hosts.vicn"
+# Routing strategies
+def routing_strategy_spt(G, origins, weight_key = None):
+ """Routing strategy : Shortest path tree
+ This routing strategy uses the Dijkstra algorithm on an undirected graph
+ to build the shortest path tree towards all origin prefixes.
+ NOTE: weights are currently unsupported by this strategy.
+ Args:
+ G (nx.Graph): network graph
+ origins (dict): dictionary mapping nodes to the set of prefixes they
+ are origins for
+ weight_key (str): key corresponding to weight key in edge data. None
+ assumes all weights have unit cost
+ Returns:
+ generator : returning triplets (source, prefix, next hop)
+ """
+ assert weight_key is None
+ origin_nodes = origins.keys()
+ seen = set()
+ for dst_node in origin_nodes:
+ sssp = nx.shortest_path(G, target = dst_node)
+ # Notes from the documentation:
+ # - If only the target is specified, return a dictionary keyed by
+ # sources with a list of nodes in a shortest path from one of the
+ # sources to the target.
+ # - All returned paths include both the source and target in the
+ # path.
+ for _, path in sssp.items():
+ if len(path) == 1:
+ # Local prefix
+ continue
+ for s, d in pairwise(path):
+ for prefix in origins[dst_node]:
+ t = (s, prefix, d)
+ if t in seen:
+ continue
+ seen.add(t)
+ yield t
+def routing_strategy_max_flow(G, origins, weight_key = 'capacity'):
+ """Routing strategy : Maximum Flow
+ Args:
+ G (nx.Graph): network graph
+ origins (dict): dictionary mapping nodes to the set of prefixes they
+ are origins for
+ weight_key (str): key corresponding to weight key in edge data. None
+ assumes all weights have unit cost
+ Returns:
+ generator : returning triplets (source, prefix, next hop)
+ """
+ assert weight_key is None
+ origin_nodes = origins.keys()
+ for dst_node in origin_nodes:
+ for src_node in G.nodes:
+ if src_node == dst_node:
+ continue
+ _, flow_dict = nx.maximum_flow(G, src_node, dst_node,
+ capacity=weight_key)
+ # Notes from the documentation:
+ #
+ # generated/networkx.algorithms.flow.maximum_flow.html
+ # - flow_dict (dict) – A dictionary containing the value of the
+ # flow that went through each edge.
+ for s, d_map in flow_dict.items():
+ for d, flow in d_map.items():
+ if flow == 0:
+ continue
+ for prefix in origins[dst_node]:
+ yield s, prefix, d
+ 'spt' : routing_strategy_spt,
+ 'max_flow' : routing_strategy_max_flow,
+# L2 and L4/ICN graphs
+def _get_l2_graph(manager, with_managed = False):
+ G = nx.Graph()
+ for node in manager.by_type(Node):
+ G.add_node(node._state.uuid)
+ for channel in manager.by_type(Channel):
+ if channel.has_type('emulatedchannel'):
+ src = channel._ap_if
+ for dst in channel._sta_ifs.values():
+ if not with_managed and (not src.managed or not dst.managed):
+ continue
+ if G.has_edge(src.node._state.uuid, dst.node._state.uuid):
+ continue
+ map_node_interface = { src.node._state.uuid : src._state.uuid,
+ dst.node._state.uuid: dst._state.uuid}
+ G.add_edge(src.node._state.uuid, dst.node._state.uuid,
+ map_node_interface = map_node_interface)
+ else:
+ # This is for a normal Channel
+ for src_it in range(0,len(channel.interfaces)):
+ src = channel.interfaces[src_it]
+ # Iterate over the remaining interface to create all the
+ # possible combination
+ for dst_it in range(src_it+1,len(channel.interfaces)):
+ dst = channel.interfaces[dst_it]
+ if not with_managed and (not src.managed or
+ not dst.managed):
+ continue
+ if G.has_edge(src.node._state.uuid, dst.node._state.uuid):
+ continue
+ map_node_interface = {
+ src.node._state.uuid : src._state.uuid,
+ dst.node._state.uuid: dst._state.uuid}
+ G.add_edge(src.node._state.uuid, dst.node._state.uuid,
+ map_node_interface = map_node_interface)
+ return G
+def _get_icn_graph(manager):
+ G = nx.Graph()
+ for forwarder in manager.by_type(Forwarder):
+ node = forwarder.node
+ G.add_node(node._state.uuid)
+ for face in forwarder.faces:
+ other_face = manager.by_uuid(face._internal_data['sibling_face'])
+ other_node = other_face.node
+ if G.has_edge(node._state.uuid, other_node._state.uuid):
+ continue
+ map_node_face = { node._state.uuid: face._state.uuid,
+ other_node._state.uuid: other_face._state.uuid }
+ G.add_edge(node._state.uuid, other_node._state.uuid,
+ map_node_face = map_node_face)
+ return G
+class IPAssignment(Resource):
+ """
+ Resource: IPAssignment
+ """
+ #--------------------------------------------------------------------------
+ # Resource lifecycle
+ #--------------------------------------------------------------------------
+ def __after__(self):
+ return ('Interface',)
+ @inline_task
+ def __get__(self):
+ raise ResourceNotFound
+ def __subresources__(self):
+ self.host_file = TextFile(node = None, filename = HOST_FILE_PATH,
+ overwrite = True)
+ return self.host_file
+ def __create__(self):
+ """
+ IP assignment strategy /32: assign /32 IP address to each interface
+ We might use different subnets for resources involved in experiment,
+ and supporting resources, and to minimize routing tables.
+ """
+'Assigment of IP addresses')
+ tasks = EmptyTask()
+ # We sort nodes by names for IP assignment. This code ensures that
+ # interfaces on the same channel get consecutive IP addresses. That
+ # way, we can assign /31 on p2p channels.
+ channels = sorted(self._state.manager.by_type(Channel),
+ key = lambda x : x.get_sortable_name())
+ channels.extend(sorted(self._state.manager.by_type(Node),
+ key = lambda node :
+ host_file_content = ""
+ # Dummy code to start IP addressing on an even number for /31
+ ip = AddressManager().get_ip(None)
+ if int(ip[-1]) % 2 == 0:
+ ip = AddressManager().get_ip("dummy object")
+ for channel in channels:
+ # Sort interfaces in a deterministic order to ensure consistent
+ # addressing across restarts of the tool
+ interfaces = sorted(channel.interfaces,
+ key = lambda x : x.device_name)
+ for interface in interfaces:
+ if interface.ip_address is None:
+ ip = AddressManager().get_ip(interface)
+ @async_task
+ async def set_ip(interface, ip):
+ await interface.async_set('ip_address', ip)
+ if interface.managed:
+ tasks = tasks | set_ip(interface, ip)
+ else:
+ ip = interface.ip_address
+ # Note: interface.ip_address should still be None at this stage
+ # since we have not made the assignment yet
+ if isinstance(channel, Node):
+ host_file_content += '# {} {} {}\n'.format(
+, interface.device_name, ip)
+ if interface == interface.node.host_interface:
+ host_file_content += '{} {}\n'.format(ip,
+ self.host_file.content = host_file_content
+ return tasks
+ __delete__ = None
+class IPRoutes(Resource):
+ """
+ Resource: IPRoutes
+ Centralized IP route computation.
+ """
+ routing_strategy = Attribute(String)
+ #--------------------------------------------------------------------------
+ # Resource lifecycle
+ #--------------------------------------------------------------------------
+ @inline_task
+ def __get__(self):
+ raise ResourceNotFound
+ @inline_task
+ def __create__(self):
+ routes = list()
+ pre_routes, routes = self._get_ip_routes()
+ routes.extend(pre_routes)
+ routes.extend(routes)
+ for route in routes:
+ route.node.routing_table.routes << route
+ def __delete__(self):
+ raise NotImplementedError
+ #--------------------------------------------------------------------------
+ # Internal methods
+ #--------------------------------------------------------------------------
+ def _get_ip_origins(self):
+ origins = dict()
+ for node in self._state.manager.by_type(Node):
+ node_uuid = node._state.uuid
+ if not node_uuid in origins:
+ origins[node_uuid] = list()
+ for interface in node.interfaces:
+ origins[node_uuid].append(interface.ip_address)
+ return origins
+ def _get_ip_routes(self):
+ if self.routing_strategy == 'pair':
+ return [], self._get_pair_ip_routes()
+ strategy = MAP_ROUTING_STRATEGY.get(self.routing_strategy)
+ G = _get_l2_graph(self._state.manager)
+ origins = self._get_ip_origins()
+ # node -> list(origins for which we have routes)
+ ip_routes = dict()
+ pre_routes = list()
+ routes = list()
+ for src, prefix, dst in strategy(G, origins):
+ data = G.get_edge_data(src, dst)
+ map_ = data['map_node_interface']
+ next_hop_interface = map_[src]
+ next_hop_ingress = self._state.manager.by_uuid(map_[dst])
+ src_node = self._state.manager.by_uuid(src)
+ mac_addr = None
+ if ((hasattr(next_hop_ingress, 'vpp') and
+ next_hop_ingress.vpp is not None) or
+ (hasattr(src_node, 'vpp') and src_node.vpp is not None)):
+ mac_addr = next_hop_ingress.mac_address
+ # Avoid duplicate routes due to multiple paths in the network
+ if not src_node in ip_routes:
+ ip_routes[src_node] = list()
+ if prefix in ip_routes[src_node]:
+ continue
+ if prefix == next_hop_ingress.ip_address:
+ # Direct route on :
+ # route add [prefix] dev [next_hop_interface_.device_name]
+ route = IPRoute(node = src_node,
+ managed = False,
+ owner = self,
+ ip_address = prefix,
+ mac_address = mac_addr,
+ interface = next_hop_interface)
+ else:
+ # We need to be sure we have a route to the gw from the node
+ if not next_hop_ingress.ip_address in ip_routes[src_node]:
+ pre_route = IPRoute(node = src_node,
+ managed = False,
+ owner = self,
+ ip_address = next_hop_ingress.ip_address,
+ mac_address = mac_addr,
+ interface = next_hop_interface)
+ ip_routes[src_node].append(next_hop_ingress.ip_address)
+ pre_routes.append(pre_route)
+ # Route on
+ # route add [prefix] dev [next_hop_interface_.device_name]
+ # via [next_hop_ingress.ip_address]
+ route = IPRoute(node = src_node,
+ managed = False,
+ owner = self,
+ ip_address = prefix,
+ interface = next_hop_interface,
+ mac_address = mac_addr,
+ gateway = next_hop_ingress.ip_address)
+ ip_routes[src_node].append(prefix)
+ routes.append(route)
+ return pre_routes, routes
+ def _get_pair_ip_routes(self):
+ """
+ IP routing strategy : direct routes only
+ """
+ routes = list()
+ G = _get_l2_graph(self._state.manager)
+ for src_node_uuid, dst_node_uuid, data in G.edges_iter(data = True):
+ src_node = self._state.manager.by_uuid(src_node_uuid)
+ dst_node = self._state.manager.by_uuid(dst_node_uuid)
+ map_ = data['map_node_interface']
+ src = self._state.manager.by_uuid(map_[src_node_uuid])
+ dst = self._state.manager.by_uuid(map_[dst_node_uuid])
+ log.debug('[IP ROUTE] NODES {}/{}/{} -> {}/{}/{}'.format(
+, src.device_name, src.ip_address,
+, dst.device_name, dst.ip_address))
+ log.debug('[IP ROUTE] NODES {}/{}/{} -> {}/{}/{}'.format(
+, dst.device_name, dst.ip_address,
+, src.device_name, src.ip_address))
+ route = IPRoute(node = src_node,
+ managed = False,
+ owner = self,
+ ip_address = dst.ip_address,
+ mac_address = dst.mac_address,
+ interface = src)
+ routes.append(route)
+ route = IPRoute(node = dst_node,
+ managed = False,
+ owner = self,
+ ip_address = src.ip_address,
+ mac_address = src.mac_address,
+ interface = dst)
+ routes.append(route)
+ return routes
+class ICNFaces(Resource):
+ """
+ Resource: ICNFaces
+ Centralized ICN face creation.
+ """
+ protocol_name = Attribute(String)
+ #--------------------------------------------------------------------------
+ # Resource lifecycle
+ #--------------------------------------------------------------------------
+ @inline_task
+ def __get__(self):
+ raise ResourceNotFound # always create faces
+ @inline_task
+ def __create__(self):
+ icn_faces = self._get_faces()
+ for face in icn_faces:
+ face.node.forwarder.faces << face
+ def __delete__(self):
+ raise NotImplementedError
+ #--------------------------------------------------------------------------
+ # Internal methods
+ #--------------------------------------------------------------------------
+ def _get_faces(self):
+ """
+ Face creation (heuristic: facemgr)
+ Requires: at least direct IP links
+ """
+ protocol = FaceProtocol.from_string(self.protocol_name)
+ faces = list()
+ G = _get_l2_graph(self._state.manager)
+ for src_node_uuid, dst_node_uuid, data in G.edges_iter(data = True):
+ src_node = self._state.manager.by_uuid(src_node_uuid)
+ dst_node = self._state.manager.by_uuid(dst_node_uuid)
+ map_ = data['map_node_interface']
+ src = self._state.manager.by_uuid(map_[src_node_uuid])
+ dst = self._state.manager.by_uuid(map_[dst_node_uuid])
+ log.debug('{} -> {} ({} -> {})'.format(src_node_uuid,
+ dst_node_uuid, src.device_name, dst.device_name))
+ if protocol == FaceProtocol.ether:
+ src_face = L2Face(node = src_node,
+ owner = self,
+ protocol = protocol,
+ src_nic = src,
+ dst_mac = dst.mac_address)
+ dst_face = L2Face(node = dst_node,
+ owner = self,
+ protocol = protocol,
+ src_nic = dst,
+ dst_mac = src.mac_address)
+ elif protocol in (FaceProtocol.tcp4, FaceProtocol.tcp6,
+ FaceProtocol.udp4, FaceProtocol.udp6):
+ src_face = L4Face(node = src_node,
+ owner = self,
+ protocol = protocol,
+ src_ip = src.ip_address,
+ dst_ip = dst.ip_address,
+ src_port = TMP_DEFAULT_PORT,
+ dst_port = TMP_DEFAULT_PORT)
+ dst_face = L4Face(node = dst_node,
+ owner = self,
+ protocol = protocol,
+ src_ip = dst.ip_address,
+ dst_ip = src.ip_address,
+ src_port = TMP_DEFAULT_PORT,
+ dst_port = TMP_DEFAULT_PORT)
+ else:
+ raise NotImplementedError
+ # We key the sibling face for easier building of the ICN graph
+ src_face._internal_data['sibling_face'] = dst_face._state.uuid
+ dst_face._internal_data['sibling_face'] = src_face._state.uuid
+ faces.append(src_face)
+ faces.append(dst_face)
+ return faces
+class ICNRoutes(Resource):
+ """
+ Resource: ICNRoutes
+ Centralized ICN route computation.
+ """
+ routing_strategy = Attribute(String)
+ #--------------------------------------------------------------------------
+ # Resource lifecycle
+ #--------------------------------------------------------------------------
+ @inline_task
+ def __get__(self):
+ raise ResourceNotFound # always create routes
+ @inline_task
+ def __create__(self):
+ icn_routes = self._get_icn_routes()
+ for route in icn_routes:
+ route.node.forwarder.routes << route
+ def __delete__(self):
+ raise NotImplementedError
+ #--------------------------------------------------------------------------
+ # Internal methods
+ #--------------------------------------------------------------------------
+ def _get_prefix_origins(self):
+ origins = dict()
+ for producer in self._state.manager.by_type(Producer):
+ node_uuid = producer.node._state.uuid
+ if not node_uuid in origins:
+ origins[node_uuid] = list()
+ origins[node_uuid].extend(producer.prefixes)
+ return origins
+ def _get_icn_routes(self):
+ strategy = MAP_ROUTING_STRATEGY.get(self.routing_strategy)
+ G = _get_icn_graph(self._state.manager)
+ origins = self._get_prefix_origins()
+ routes = list()
+ for src, prefix, dst in strategy(G, origins):
+ data = G.get_edge_data(src, dst)
+ map_ = data['map_node_face']
+ next_hop_face = map_[src]
+ route = ICNRoute(node = src,
+ owner = self,
+ prefix = prefix,
+ face = next_hop_face)
+ routes.append(route)
+ return routes
+class DnsServerEntry(Resource):
+ """
+ Resource: DnsServerEntry
+ Setup of DNS resolver for LxcContainers
+ Todo:
+ - This should be merged into the LxcContainer resource
+ """
+ node = Attribute(String)
+ ip_address = Attribute(String)
+ interface_name = Attribute(String)
+ #--------------------------------------------------------------------------
+ # Resource lifecycle
+ #--------------------------------------------------------------------------
+ @inline_task
+ def __get__(self):
+ raise ResourceNotFound
+ def __create__(self):
+ return BashTask(self.node, CMD_CONTAINER_SET_DNS,
+ {'ip_dns': self.ip_address,
+ 'interface_name': self.interface_name})
+ def __delete__(self):
+ raise NotImplementedError
+class ContainerSetup(Resource):
+ """
+ Resource: ContainerSetup
+ Setup of container networking
+ Todo:
+ - This should be merged into the LxcContainer resource
+ """
+ container = Attribute(LxcContainer)
+ #--------------------------------------------------------------------------
+ # Resource lifecycle
+ #--------------------------------------------------------------------------
+ def __subresources__(self):
+ # a) routes: host -> container
+ # . container interfaces
+ # . container host (main) interface
+ # route add -host {ip_address} dev {bridge_name}
+ route = IPRoute(node = self.container.node,
+ managed = False,
+ owner = self,
+ ip_address = self.container.host_interface.ip_address,
+ interface = self.container.node.bridge)
+ route.node.routing_table.routes << route
+ # b) route: container -> host
+ # route add {ip_gateway} dev {interface_name}
+ # route add default gw {ip_gateway} dev {interface_name}
+ route = IPRoute(node = self.container,
+ owner = self,
+ managed = False,
+ ip_address = self.container.node.bridge.ip_address,
+ interface = self.container.host_interface)
+ route.node.routing_table.routes << route
+ route_gw = IPRoute(node = self.container,
+ managed = False,
+ owner = self,
+ ip_address = 'default',
+ interface = self.container.host_interface,
+ gateway = self.container.node.bridge.ip_address)
+ route_gw.node.routing_table.routes << route_gw
+ # c) dns
+ dns_server_entry = DnsServerEntry(node = self.container,
+ owner = self,
+ ip_address = self.container.node.bridge.ip_address,
+ interface_name = self.container.host_interface.device_name)
+ return dns_server_entry
+ @inline_task
+ def __get__(self):
+ raise ResourceNotFound
+ def __create__(self):
+ return BashTask(self.container.node, CMD_IP_FORWARD)
+class ContainersSetup(Resource):
+ """
+ Resource: ContainersSetup
+ Setup of LxcContainers (main resource)
+ Todo:
+ - This should be merged into the LxcContainer resource
+ """
+ #--------------------------------------------------------------------------
+ # Resource lifecycle
+ #--------------------------------------------------------------------------
+ def __subresources__(self):
+ containers = self._state.manager.by_type(LxcContainer)
+ if len(containers) == 0:
+ return None
+ container_resources = [ContainerSetup(owner = self, container = c)
+ for c in containers]
+ return Resource.__concurrent__(*container_resources)
+class CentralIP(Resource):
+ """
+ Resource: CentralIP
+ Central IP management (main resource)
+ """
+ ip_routing_strategy = Attribute(String, description = 'IP routing strategy',
+ default = 'pair') # spt, pair
+ #--------------------------------------------------------------------------
+ # Resource lifecycle
+ #--------------------------------------------------------------------------
+ def __after_init__(self):
+ return ('Node', 'Channel', 'Interface')
+ def __subresources__(self):
+ ip_assign = IPAssignment(owner=self)
+ containers_setup = ContainersSetup(owner=self)
+ ip_routes = IPRoutes(owner = self,
+ routing_strategy = self.ip_routing_strategy)
+ return ip_assign > (ip_routes | containers_setup)
+ @inline_task
+ def __get__(self):
+ raise ResourceNotFound
+ __delete__ = None
+class CentralICN(Resource):
+ """
+ Resource: CentralICN
+ Central ICN management (main resource)
+ """
+ # Choices: spt, max_flow
+ icn_routing_strategy = Attribute(String,
+ description = 'ICN routing strategy',
+ default = 'spt')
+ face_protocol = Attribute(String,
+ description = 'Protocol used to create faces',
+ default = 'ether')
+ #--------------------------------------------------------------------------
+ # Resource lifecycle
+ #--------------------------------------------------------------------------
+ def __after__(self):
+ """
+ We need to wait for IP configuration in order to be able to build
+ overload ICN faces, and producers for prefix origins.
+ """
+ return ('CentralIP',)
+ def __subresources__(self):
+ icn_faces = ICNFaces(owner = self, protocol_name = self.face_protocol)
+ icn_routes = ICNRoutes(owner = self,
+ routing_strategy = self.icn_routing_strategy)
+ return icn_faces > icn_routes
+ @inline_task
+ def __get__(self):
+ raise ResourceNotFound
+ __delete__ = None