aboutsummaryrefslogtreecommitdiffstats
path: root/vicn/core
diff options
context:
space:
mode:
Diffstat (limited to 'vicn/core')
-rw-r--r--vicn/core/__init__.py0
-rw-r--r--vicn/core/address_mgr.py181
-rw-r--r--vicn/core/api.py118
-rw-r--r--vicn/core/attribute.py270
-rw-r--r--vicn/core/command_helpers.py48
-rw-r--r--vicn/core/commands.py376
-rw-r--r--vicn/core/event.py25
-rw-r--r--vicn/core/exception.py39
-rw-r--r--vicn/core/requirement.py229
-rw-r--r--vicn/core/resource.py898
-rw-r--r--vicn/core/resource_factory.py84
-rw-r--r--vicn/core/resource_mgr.py1436
-rw-r--r--vicn/core/sa_collections.py249
-rw-r--r--vicn/core/sa_compat.py270
-rw-r--r--vicn/core/scheduling_algebra.py97
-rw-r--r--vicn/core/state.py177
-rw-r--r--vicn/core/task.py352
17 files changed, 4849 insertions, 0 deletions
diff --git a/vicn/core/__init__.py b/vicn/core/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/vicn/core/__init__.py
diff --git a/vicn/core/address_mgr.py b/vicn/core/address_mgr.py
new file mode 100644
index 00000000..7df5e4ac
--- /dev/null
+++ b/vicn/core/address_mgr.py
@@ -0,0 +1,181 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2017 Cisco and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import ipaddress
+import logging
+import random
+import struct
+import socket
+
+from netmodel.util.deprecated import deprecated
+from netmodel.util.singleton import Singleton
+from netmodel.util.toposort import toposort, toposort_flatten
+
+log = logging.getLogger(__name__)
+
+#------------------------------------------------------------------------------
+# SharedResource
+#------------------------------------------------------------------------------
+
+class SharedResource:
+ """
+ Base class for allocating shared resource
+ """
+ def __init__(self):
+ self._counter = 0
+ self._values = dict()
+
+ def __next__(self):
+ ret = self._counter
+ self._counter += 1
+ return ret
+
+ def get(self, requestor, tag = None):
+ if requestor not in self._values:
+ self._values[requestor] = dict()
+ if tag not in self._values[requestor]:
+ self._values[requestor][tag] = next(self)
+ return self._values[requestor][tag]
+
+#------------------------------------------------------------------------------
+
+class Vlan(SharedResource):
+ """
+ SharedResource: Vlan
+
+ Manages VLAN allocation
+ """
+
+ def get(self, requestor, tag = None):
+ if requestor not in self._values:
+ self._values[requestor] = dict()
+ if tag not in self._values[requestor]:
+ self._values[requestor][tag] = (next(self)+1)
+ return self._values[requestor][tag]
+
+#------------------------------------------------------------------------------
+
+MAX_DEVICE_NAME_SIZE = 15
+
+class DeviceName(SharedResource):
+
+ def get(self, *args, prefix = None, **kwargs):
+ count = super().get(*args, **kwargs)
+ device_name = '{}{}'.format(prefix, count)
+ if len(device_name) > MAX_DEVICE_NAME_SIZE:
+ overflow = len(device_name) - MAX_DEVICE_NAME_SIZE
+ max_prefix_len = len(prefix) - overflow
+ device_name = '{}{}'.format(prefix[:max_prefix_len], count)
+ return device_name
+
+#------------------------------------------------------------------------------
+
+class IpAddress(SharedResource):
+ pass
+
+class Ipv4Address(IpAddress):
+ pass
+
+class Ipv6Address(IpAddress):
+ pass
+
+#------------------------------------------------------------------------------
+# AddressManager
+#------------------------------------------------------------------------------
+
+class AddressManager(metaclass = Singleton):
+ """
+ The purpose of this class is to generate sequential deterministic MAC/IP
+ addresses in order to assign them to the node in the network.
+ """
+
+ MAP_TYPE = {
+ 'vlan': Vlan,
+ 'device_name': DeviceName,
+ }
+
+ def __init__(self):
+ self._ips = dict()
+ self._macs = dict()
+
+ self._pools = dict()
+
+ from vicn.core.resource_mgr import ResourceManager
+
+ network = ResourceManager().get('network')
+ network = ipaddress.ip_network(network, strict=False)
+ self._next_ip = network[1]
+
+ mac_address_base = ResourceManager().get('mac_address_base')
+ self._next_mac = int(mac_address_base, 0) + 1
+
+ def get(self, resource_type, requestor, *args, tag=None, scope=None,
+ **kwargs):
+ """
+ Params:
+ type : the type of shared resource to be requested
+ requestor: name of the resource that requests the shared resource, in
+ order to reattribute the same if requested multiple times.
+ tag: use when a single resource request multiple times the same
+ resource.
+ scope: None = global scope by default. Ensure uniqueness of resource
+ at global scope
+ """
+ if not scope in self._pools:
+ self._pools[scope] = dict()
+ if not resource_type in self._pools[scope]:
+ self._pools[scope][resource_type] = self.MAP_TYPE[resource_type]()
+ return self._pools[scope][resource_type].get(requestor, tag,
+ *args, **kwargs)
+
+ #--------------------------------------------------------------------------
+ # Attributes
+ #--------------------------------------------------------------------------
+
+ def get_mac(self, resource_name):
+ """
+ Generate a new mac address to assign to the containers created.
+
+ :return: The MAC address
+ """
+
+ if resource_name in self._macs:
+ return self._macs[resource_name]
+
+ mac = ':'.join(map(''.join,
+ zip(*[iter(hex(self._next_mac)[2:].zfill(12))]*2)))
+ self._next_mac += 1
+
+ self._macs[resource_name] = mac
+ return mac
+
+ def get_ip(self, resource):
+ """
+ Generate a new ip address to assign to the containers created.
+
+ :return: The IP address
+ """
+
+ if resource in self._ips:
+ return self._ips[resource]
+
+ ip = str(self._next_ip)
+ self._next_ip += 1
+
+ self._ips[resource] = ip
+ return ip
diff --git a/vicn/core/api.py b/vicn/core/api.py
new file mode 100644
index 00000000..761177c9
--- /dev/null
+++ b/vicn/core/api.py
@@ -0,0 +1,118 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2017 Cisco and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import asyncio
+import json
+import logging
+import resource as ulimit
+import sys
+
+from netmodel.model.query import Query
+from netmodel.model.query import ACTION_SELECT, ACTION_INSERT
+from netmodel.model.query import ACTION_UPDATE, ACTION_SUBSCRIBE
+from netmodel.network.interface import InterfaceState
+from netmodel.util.singleton import Singleton
+from vicn.core.exception import NotConfigured
+from vicn.core.resource_mgr import ResourceManager
+from vicn.resource.node import Node
+
+DEFAULT_SETTINGS = {
+ 'network': '192.168.0.0/16',
+ 'mac_address_base': '0x00163e000000',
+ 'websocket_port': 9999
+}
+
+log = logging.getLogger(__name__)
+
+class Event_ts(asyncio.Event):
+ def set(self):
+ self._loop.call_soon_threadsafe(super().set)
+
+#------------------------------------------------------------------------------
+
+class API(metaclass = Singleton):
+
+ def terminate(self):
+ ResourceManager().terminate()
+
+ def parse_topology_file(self, topology_fn):
+ log.debug("Parsing topology file %(topology_fn)s" % locals())
+ try:
+ topology_fd = open(topology_fn, 'r')
+ except IOError:
+ self.error("Topology file '%(topology_fn)s not found" % locals())
+ return None
+
+ try:
+ topology = json.loads(topology_fd.read())
+
+ # SETTING
+ settings = DEFAULT_SETTINGS
+ settings.update(topology.get('settings'))
+
+ # VICN process-related initializations
+ nofile = settings.get('ulimit-n', None)
+ if nofile is not None and nofile > 0:
+ if nofile < 1024:
+ log.error('Too few allowed open files for the process')
+ import os; os._exit(1)
+
+ log.info('Setting open file descriptor limit to {}'.format(
+ nofile))
+ ulimit.setrlimit(
+ ulimit.RLIMIT_NOFILE,
+ (nofile, nofile))
+
+ ResourceManager(base=topology_fn, settings=settings)
+
+ # NODES
+ resources = topology.get('resources', list())
+ for resource in resources:
+ try:
+ ResourceManager().create_from_dict(**resource)
+ except Exception as e:
+ log.warning("Could not create resource '%r': %r" % \
+ (resource, e,))
+ import traceback; traceback.print_exc()
+ continue
+
+ except SyntaxError:
+ log.error("Error reading topology file '%s'" % (topology_fn,))
+ sys.exit(1)
+
+ log.debug("Done parsing topology file %(topology_fn)s" % locals())
+
+ def configure(self, name, setup=False):
+ log.info("Parsing configuration file", extra={'category': 'blue'})
+ self.parse_topology_file(name)
+ self._configured = True
+ ResourceManager().setup(commit=setup)
+
+ def setup(self):
+ if not self._configured:
+ raise NotConfigured
+ ResourceManager().setup()
+
+ def teardown(self):
+ ResourceManager().teardown()
+
+ def open_terminal(self, node_name):
+ node = ResourceManager().by_name(node_name)
+ assert isinstance(node, Node)
+
+ node.open_terminal()
diff --git a/vicn/core/attribute.py b/vicn/core/attribute.py
new file mode 100644
index 00000000..f6ec7c70
--- /dev/null
+++ b/vicn/core/attribute.py
@@ -0,0 +1,270 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2017 Cisco and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import abc
+import copy
+import logging
+import operator
+import types
+
+from netmodel.model.mapper import ObjectSpecification
+from netmodel.model.type import Type, Self
+from netmodel.util.meta import inheritors
+from netmodel.util.misc import is_iterable
+from vicn.core.exception import VICNListException
+from vicn.core.requirement import Requirement, RequirementList
+from vicn.core.sa_collections import InstrumentedList
+from vicn.core.state import UUID, NEVER_SET, Operations
+
+log = logging.getLogger(__name__)
+
+#------------------------------------------------------------------------------
+# Attribute Multiplicity
+#------------------------------------------------------------------------------
+
+class Multiplicity:
+ OneToOne = '1_1'
+ OneToMany = '1_N'
+ ManyToOne = 'N_1'
+ ManyToMany = 'N_N'
+
+
+ @staticmethod
+ def reverse(value):
+ reverse_map = {
+ Multiplicity.OneToOne: Multiplicity.OneToOne,
+ Multiplicity.OneToMany: Multiplicity.ManyToOne,
+ Multiplicity.ManyToOne: Multiplicity.OneToMany,
+ Multiplicity.ManyToMany: Multiplicity.ManyToMany,
+ }
+ return reverse_map[value]
+
+
+# Default attribute properties values (default to None)
+DEFAULT = {
+ 'multiplicity' : Multiplicity.OneToOne,
+ 'mandatory' : False,
+}
+
+#------------------------------------------------------------------------------
+# Attribute
+#------------------------------------------------------------------------------
+
+class Attribute(abc.ABC, ObjectSpecification):
+ properties = [
+ 'name',
+ 'type',
+ 'key',
+ 'description',
+ 'default',
+ 'choices',
+ 'mandatory',
+ 'multiplicity',
+ 'ro',
+ 'auto',
+ 'func',
+ 'requirements',
+ 'reverse_name',
+ 'reverse_description',
+ 'reverse_auto'
+ ]
+
+ def __init__(self, *args, **kwargs):
+ for key in Attribute.properties:
+ value = kwargs.pop(key, NEVER_SET)
+ setattr(self, key, value)
+
+ if len(args) == 1:
+ self.type, = args
+ elif len(args) == 2:
+ self.name, self.type = args
+
+ # self.type is optional since the type can be inherited. Although we
+ # will have to verify the attribute is complete at some point
+ if self.type:
+ if isinstance(self.type, str):
+ self.type = Type.from_string(self.type)
+ assert self.type is Self or Type.exists(self.type)
+
+ # Post processing attribute properties
+ if self.requirements is not NEVER_SET:
+ self.requirements = RequirementList(self.requirements)
+
+ self.is_aggregate = False
+
+ self._reverse_attributes = list()
+
+ #--------------------------------------------------------------------------
+ # Display
+ #--------------------------------------------------------------------------
+
+ def __repr__(self):
+ return '<Attribute {}>'.format(self.name)
+
+ __str__ = __repr__
+
+ # The following functions are required to allow comparing attributes, and
+ # using them as dict keys
+
+ def __eq__(self, other):
+ return self.name == other.name
+
+ def __hash__(self):
+ return hash(self.name)
+
+ #--------------------------------------------------------------------------
+ # Descriptor protocol
+ #
+ # see. https://docs.python.org/3/howto/descriptor.html
+ #--------------------------------------------------------------------------
+
+ def __get__(self, instance, owner=None):
+ if not instance:
+ return self
+
+ return instance.get(self.name, blocking=False)
+
+ def __set__(self, instance, value):
+ if not instance:
+ raise NotImplementedError('Setting default value not implemented')
+
+ instance.set(self.name, value, blocking=False)
+
+ def __delete__(self, instance):
+ raise NotImplementedError
+
+ #--------------------------------------------------------------------------
+
+ def do_list_add(self, instance, value):
+ if instance.is_local_attribute(self.name):
+ from vicn.core.resource import Resource
+ if isinstance(value, Resource):
+ value = value.get_uuid()
+ return value
+ else:
+ try:
+ cur_value = vars(instance)[self.name]
+ if self.is_collection:
+ # copy the list
+ cur_value = list(cur_value)
+ except KeyError as e:
+ cur_value = None
+ if self.is_collection:
+ cur_value = list()
+
+ instance._state.dirty[self.name].trigger(Operations.LIST_ADD,
+ value, cur_value)
+
+ # prevent instrumented list to perform operation
+ raise VICNListException
+
+ def do_list_remove(self, instance, value):
+ if instance.is_local_attribute(self.name):
+ from vicn.core.resource import Resource
+ if isinstance(value, Resource):
+ value = value.get_uuid()
+ return value
+ else:
+ cur_value = vars(instance)[self.name]
+ if self.is_collection:
+ # copy the list
+ cur_value = list(cur_value)
+ instance._state.dirty[self.name].trigger(Operations.LIST_REMOVE,
+ value, cur_value)
+
+ # prevent instrumented list to perform operation
+ raise VICNListException
+
+ def do_list_clear(self, instance):
+ if instance.is_local_attribute(self.name):
+ return
+ else:
+ cur_value = vars(instance)[self.name]
+ if self.is_collection:
+ # copy the list
+ cur_value = list(cur_value)
+ instance._state.dirty[self.name].trigger(Operations.LIST_CLEAR,
+ value, cur_value)
+
+ # prevent instrumented list to perform operation
+ raise VICNListException
+
+ def handle_getitem(self, instance, item):
+ if isinstance(item, UUID):
+ from vicn.core.resource_mgr import ResourceManager
+ return ResourceManager().by_uuid(item)
+ return item
+
+ #--------------------------------------------------------------------------
+ # Accessors
+ #--------------------------------------------------------------------------
+
+ def __getattribute__(self, name):
+ value = super().__getattribute__(name)
+ if value is NEVER_SET:
+ if name == 'default':
+ return list() if self.is_collection else None
+ return DEFAULT.get(name, None)
+ return value
+
+ def has_reverse_attribute(self):
+ return self.reverse_name and self.multiplicity
+
+ @property
+ def is_collection(self):
+ return self.multiplicity in (Multiplicity.OneToMany,
+ Multiplicity.ManyToMany)
+
+ def is_set(self, instance):
+ return instance.is_set(self.name)
+
+ #--------------------------------------------------------------------------
+ # Operations
+ #--------------------------------------------------------------------------
+
+ def merge(self, parent):
+ for prop in Attribute.properties:
+ # NOTE: we cannot use getattr otherwise we get the default value,
+ # and we never override
+ value = vars(self).get(prop, NEVER_SET)
+ if value is not NEVER_SET and not is_iterable(value):
+ continue
+
+ parent_value = vars(parent).get(prop, NEVER_SET)
+ if parent_value is NEVER_SET:
+ continue
+
+ if parent_value:
+ if is_iterable(value):
+ value.extend(parent_value)
+ else:
+ setattr(self, prop, parent_value)
+
+#------------------------------------------------------------------------------
+
+class Reference:
+ """
+ Value reference.
+
+ Attribute value refers to attribute value on a different resource.
+ Use resource = Self to point to another attribute of the same resource.
+ """
+
+ def __init__(self, resource, attribute=None):
+ self._resource = resource
+ self._attribute = attribute
diff --git a/vicn/core/command_helpers.py b/vicn/core/command_helpers.py
new file mode 100644
index 00000000..4732e7b5
--- /dev/null
+++ b/vicn/core/command_helpers.py
@@ -0,0 +1,48 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2017 Cisco and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from vicn.core.commands import Command, Commands
+
+CMD_PRINT_TO_FILE = 'echo -n "{content}" > {filename}'
+
+class CommandHelper:
+
+ @staticmethod
+ def if_cmd(cmd_condition, if_true = None, if_false = None):
+ cmd = cmd_condition
+ if if_true:
+ cmd = cmd & if_true
+ if if_false:
+ cmd = cmd | if_false
+ return cmd
+
+ @staticmethod
+ def file_exists(filename, if_true = None, if_false = None):
+ cmd = Command('test -f {}'.format(filename))
+ return CommandHelper.if_cmd(cmd, if_true, if_false)
+
+ @staticmethod
+ def print_to_file(node, filename, content):
+ escaped_content = content.replace('{', '{{').replace('}', '}}')
+ return BashTask(self.node, CMD_PRINT_TO_FILE,
+ {'content': escaped_content, 'filename': filename})
+
+ @staticmethod
+ def print_to_file_no_escape(filename, content):
+ return BashTask(self.node, CMD_PRINT_TO_FILE,
+ {'content': content, 'filename': filename})
diff --git a/vicn/core/commands.py b/vicn/core/commands.py
new file mode 100644
index 00000000..41c06bf5
--- /dev/null
+++ b/vicn/core/commands.py
@@ -0,0 +1,376 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2017 Cisco and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import inspect
+import logging
+import shlex
+
+from vicn.core.exception import CommandException
+
+log = logging.getLogger(__name__)
+
+#------------------------------------------------------------------------------
+# Helper functions
+#------------------------------------------------------------------------------
+
+def bashize(command):
+ ret = "bash -c " + shlex.quote(command)
+ return ret
+
+def parenthesize(command):
+ return '({})'.format(command)
+
+def do_parenthesize(command):
+ if '&&' in command or '||' in command:
+ if command[0] == '(':
+ return command
+ else:
+ return parenthesize(command)
+ else:
+ return command
+
+#------------------------------------------------------------------------------
+
+class ReturnValue:
+ def __init__(self, return_value = None, stdout = None, stderr = None):
+ self._return_value = return_value
+
+ # We use accessors since it seems impossible to trigger properties from
+ # __init__
+ self._set_stdout(stdout)
+ self._set_stderr(stderr)
+
+ def __repr__(self):
+ return '<ReturnValue ({}) - OUT [{}] - ERR [{}]>'.format(
+ self._return_value, self._stdout, self._stderr)
+
+ def __str__(self):
+ return self.__repr__()
+
+ def _clean(self, value):
+ if value is None or isinstance(value, str):
+ return value
+ return value.decode('utf-8')
+
+ def _set_stdout(self, value):
+ self._stdout = self._clean(value)
+
+ def _set_stderr(self, value):
+ self._stderr = self._clean(value)
+
+ @property
+ def stdout(self):
+ return self._stdout
+
+ @stdout.setter
+ def stdout(self, value):
+ return self._set_stdout(value)
+
+ @property
+ def stderr(self):
+ return self._stderr
+
+ @stderr.setter
+ def stderr(self, value):
+ return self._set_stderr(value)
+
+ @property
+ def return_value(self):
+ return self._return_value
+
+ @return_value.setter
+ def return_value(self, value):
+ self._return_value = value
+
+ def __bool__(self):
+ return self._return_value == 0
+
+#------------------------------------------------------------------------------
+
+class Command:
+ """
+ Bash command
+
+ Todo:
+ - Commands with ; should be "bashized"
+ """
+ def __init__(self, commandline, node=None, parameters = None,
+ description = None, callback = None, blocking=True, lock=None):
+ self._commandline = commandline
+ self._node = node
+ self._parameters = parameters if parameters else dict()
+ self._description = description
+ self._callback = callback
+ self._blocking = blocking
+ self._lock = lock
+
+ def __str__(self):
+ try:
+ return self.full_commandline
+ except:
+ return self.commandline + ' -- ' + str(self.parameters)
+
+ def __repr__(self):
+ return '<Command {} -- {}'.format(self.commandline, self.parameters)
+
+ @property
+ def commandline(self):
+ return self._commandline
+
+ @property
+ def full_commandline(self):
+ cmd = self._commandline.format(**self._parameters)
+ if ('||' in cmd or '&&' in cmd or '|' in cmd or '<' in cmd or
+ '>' in cmd):
+ return bashize(cmd)
+ return cmd
+
+ @property
+ def full_commandline_nobashize(self):
+ """
+ TMP to fix issue with bashize heuristic above...
+ """
+ cmd = self._commandline.format(**self.parameters)
+ return cmd
+
+ @property
+ def command(self):
+ return self
+
+ @property
+ def node(self):
+ return self._node
+
+ @node.setter
+ def node(self, node):
+ self._node = node
+
+ @property
+ def parameters(self):
+ return self._parameters
+
+ @parameters.setter
+ def parameters(self, parameters):
+ self._parameters = parameters
+
+ @property
+ def description(self):
+ if not self._description:
+ return self._description
+ return self._description.format(**self.parameters)
+
+ @property
+ def success_callback(self):
+ return self._on_success
+
+ @property
+ def failure_callback(self):
+ return self._on_failure
+
+ @property
+ def blocking(self):
+ return self._blocking
+
+ @property
+ def lock(self):
+ return self._lock
+
+ def apply(self, params):
+ self._parameters.update(params)
+ return self
+
+ def __and__(self, other):
+ commandline = self.commandline + ' && ' + other.commandline
+ all_params = dict(i for c in (self, other)
+ for i in c.parameters.items())
+ return Command(commandline, parameters = all_params)
+
+ def __or__(self, other):
+ commandline = self.commandline + ' || ' + other.commandline
+ all_params = dict(i for c in (self, other)
+ for i in c.parameters.items())
+ return Command(commandline, parameters = all_params)
+
+ def __bool__(self):
+ return bool(self._commandline)
+
+ def submit(self):
+ CMD_MGR.execute([self])
+
+ def execute(self):
+ cmd = self.full_commandline
+ cmd_str = cmd[:77]+'...' if len(cmd) > 80 else cmd
+ log.debug('Node {}: {} ({})'.format(self.node.name, cmd_str,
+ self._description))
+
+ rv = self.node.execute(cmd)
+
+ if not rv:
+ raise CommandException
+ if self._callback:
+ if self._lock:
+ self._lock.acquire()
+ self._callback(rv)
+ if self._lock:
+ self._lock.release()
+ return rv
+
+#------------------------------------------------------------------------------
+
+class BackgroundCommand(Command):
+ pass
+
+#------------------------------------------------------------------------------
+
+class Commands(Command):
+
+ def __init__(self):
+ self._commands = list()
+ self._node = None
+
+ def __repr__(self):
+ return '<Commands {}>'.format(str(self))
+
+ def __str__(self):
+ return self.commandline
+
+ def add(self, command):
+ if not command:
+ return
+ if not isinstance(command, Command):
+ command = Command(command)
+ self._commands.append(command)
+
+ def _do_command(self, sep):
+ if len(self.commands) == 1:
+ full_cmd = sep.join(c.commandline for c in self.commands)
+ else:
+ full_cmd = sep.join(do_parenthesize(c.commandline)
+ for c in self.commands)
+ all_params = dict(i for c in self.commands
+ for i in c.parameters.items())
+
+ return Command(full_cmd, parameters = all_params)
+
+ @property
+ def command(self):
+ raise NotImplementedError('Not implemented')
+
+ @property
+ def commandline(self):
+ return self.command.commandline
+
+ @property
+ def parameters(self):
+ parameters = dict()
+ for command in self.commands:
+ parameters.update(command.parameters)
+ return parameters
+
+ @parameters.setter
+ def parameters(self, parameters):
+ for command in self.commands:
+ command.parameters = parameters
+
+ @property
+ def commands(self):
+ return self._commands
+
+ def apply(self, params):
+ self._commands = [c.apply(params) for c in self._commands]
+ return self._commands
+
+ __lshift__ = add
+
+ def __bool__(self):
+ return any(bool(c) for c in self._commands)
+
+#------------------------------------------------------------------------------
+
+class ParallelCommands(Commands):
+ @property
+ def command(self):
+ log.warning('Commands executed sequentially')
+ return self._do_command(';')
+
+#------------------------------------------------------------------------------
+
+class SequentialCommands(Commands):
+ @property
+ def command(self, fatal = True):
+ SEP = ' && ' if fatal else '; '
+ return self._do_command(SEP)
+
+#------------------------------------------------------------------------------
+
+def sequential_bash_from_docstring(fn):
+ def decorator(*args, **kwargs):
+ c = SequentialCommands()
+
+ desc = None
+ for line in fn.__doc__.splitlines():
+ line = line.strip()
+ if not line:
+ continue
+ if line.startswith('#'):
+ desc = line[1:].strip()
+ continue
+ c << Command(line, description = desc)
+ desc = None
+
+ # XXX we don't support keyword args
+ arg_info = inspect.getargspec(fn)
+ assert not arg_info.varargs
+ c.parameters = dict(zip(arg_info.args, args))
+ log.debug('sequential_bash_from_docstring: {}'.format(c))
+
+ # Execute the code in the function
+ fn(*args, **kwargs)
+
+ return c
+
+ return decorator
+
+bash_from_docstring = sequential_bash_from_docstring
+
+def execute_on_node(fn):
+ """
+ Decorator: execute the command returned by the function on the node found
+ in attributes. Note that such an attribute should be available.
+ This assumes the function returns a command
+
+ We need output in case apply_rx is used. This should be made an option
+ """
+
+ def wrapper(self, *args, **kwargs):
+ return self.node.execute(fn(self, *args, **kwargs), output = True)
+ return wrapper
+
+def apply_rx(rx):
+ """
+ Apply a compiled regular expression to the result of the decorated function.
+ Returns a dict (whose keys should be attributes of the resource that need
+ to be updated).
+ """
+
+ def decorator(fn):
+ def wrapper(*args, **kwargs):
+ ret = fn(*args, **kwargs)
+ return [m.groupdict() for m in rx.finditer(ret.stdout)]
+ return wrapper
+ return decorator
diff --git a/vicn/core/event.py b/vicn/core/event.py
new file mode 100644
index 00000000..ee418257
--- /dev/null
+++ b/vicn/core/event.py
@@ -0,0 +1,25 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2017 Cisco and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class Event:
+ pass
+
+class AttributeChangedEvent(Event):
+ def __init__(self, condition=None):
+ self._condition = condition
+
diff --git a/vicn/core/exception.py b/vicn/core/exception.py
new file mode 100644
index 00000000..d7422723
--- /dev/null
+++ b/vicn/core/exception.py
@@ -0,0 +1,39 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2017 Cisco and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+class VICNException(Exception): pass
+
+class CommandException(VICNException): pass
+
+class ProcessException(VICNException): pass
+class NotConfigured(ProcessException): pass
+
+class ParameterException(VICNException): pass
+class InvalidResource(ParameterException): pass
+
+class ABCException(VICNException): pass
+class NotImplemented(VICNException): pass
+
+class DependencyException(VICNException): pass
+class InitializeException(VICNException): pass
+class CheckException(VICNException): pass
+class SetupException(VICNException): pass
+
+class VICNListException(VICNException): pass
+
+class ResourceNotFound(VICNException): pass
diff --git a/vicn/core/requirement.py b/vicn/core/requirement.py
new file mode 100644
index 00000000..c42cecd9
--- /dev/null
+++ b/vicn/core/requirement.py
@@ -0,0 +1,229 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2017 Cisco and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import enum
+
+from netmodel.model.mapper import ObjectSpecification
+from vicn.core.exception import VICNException
+
+#------------------------------------------------------------------------------
+# Enums
+#------------------------------------------------------------------------------
+
+class RequirementScope(enum.Enum):
+ INSTANCE = 'Instance'
+ CLASS = 'Class'
+
+#------------------------------------------------------------------------------
+# Exceptions
+#------------------------------------------------------------------------------
+
+class RequirementError(VICNException):
+
+ def __init__(self, instance, attr):
+ super().__init__()
+ self._instance = instance
+ self._attr = attr
+
+ def __str__(self):
+ return "Requirement on {}.{} could not be satisfied:".format(
+ self._instance, self._attr)
+
+class RequiredAttributeError(RequirementError):
+
+ def __str__(self):
+ return super().__str__() + "could not find attribute {}".format(
+ self._attr)
+
+class RequiredPropertyError(RequirementError):
+ def __init__(self, instance, attr, prop):
+ super().__init__(instance, attr)
+ self._prop = prop
+
+ def __str__(self):
+ return super().__str__()+ "property {} is not verified".format(
+ self._prop)
+
+#------------------------------------------------------------------------------
+# Class: Property
+#------------------------------------------------------------------------------
+
+class Property:
+
+ TYPE_ANY_OF = 0
+ #XXX cant think of a good use case for that
+ #TYPE_ALL_OF = 1
+
+ def __init__(self, value, property_type=TYPE_ANY_OF):
+ self._type = property_type
+ try:
+ self._value = set(value)
+ except TypeError: #value is not iterable, it is a single value
+ self._value = set()
+ self._value.add(value)
+
+
+ @property
+ def property_type(self):
+ return self._type
+
+ @property
+ def value(self):
+ return self._value
+
+ def check(self, value):
+ return value in self._value
+
+ def __str__(self):
+ return str(self._value)
+
+ def merge(self, other):
+ assert self._type is other.property_type, \
+ "Properties must be of same type to be merged"
+
+ #if self._type is TYPE_ANY_OF:
+ self._value.intersection_update(other.value)
+ #elif self._type is TYPE_ALL_OF:
+ # self._value.union_update(other.value)
+
+#------------------------------------------------------------------------------
+# Class: Requirement
+#------------------------------------------------------------------------------
+
+class Requirement(ObjectSpecification):
+ """Resource requirement
+
+ This class allows to specify a requirement on a given resource, or on a
+ class of resources (all instances of a given class).
+ """
+
+ #--------------------------------------------------------------------------
+ # Constructor
+ #--------------------------------------------------------------------------
+
+ def __init__(self, requirement_type, properties = None,
+ capabilities = None, scope = RequirementScope.INSTANCE,
+ fatal = True, must_be_setup = False):
+ """
+ Args:
+ requirement_type (): the attribute on which the requirement is made
+ properties (Optional[XXX]): XXX (defaults to None)
+ scope (Optional[enum RequirementScope]): Is the requirement dealing
+ with an instance, or a class (all instance of the class)
+ (defaults to RequirementScope.INSTANCE)
+ fatal (Optional[bool]): is the failure of the requirement fatal
+ (raises an error), or not (raises a warning) (defaults to True)
+ must_be_setup (Optional[bool]): defaults to False
+ """
+ self._type = requirement_type
+ self._properties = {}
+ if properties:
+ for prop in properties:
+ self._properties[prop] = Property(properties[prop])
+ self._capabilities = capabilities if capabilities else set()
+ self._scope = scope
+ self._fatal = fatal
+ self._must_be_up = must_be_setup
+
+ #--------------------------------------------------------------------------
+ # Accessors and properties
+ #--------------------------------------------------------------------------
+
+ @property
+ def properties(self):
+ return self._properties
+
+ @property
+ def requirement_type(self):
+ return self._type
+
+ @property
+ def must_be_up(self):
+ return self._must_be_up
+
+ #--------------------------------------------------------------------------
+ # Display
+ #--------------------------------------------------------------------------
+
+ def __str__(self):
+ prop_str = "{" + ",".join(map(lambda x: "'{}': {}".format(x,
+ self._properties[x]), self._properties.keys())) +"}"
+ return "<type={}, properties={}, must_be_up={}>".format(self._type,
+ prop_str, self._must_be_up)
+
+ #--------------------------------------------------------------------------
+ # Requirement operators
+ #--------------------------------------------------------------------------
+
+ def check(self, instance):
+ if not hasattr(instance, self._type):
+ raise RequiredAttributeError(instance, self._type)
+
+ instance_attr = getattr(instance, self._type)
+ if not instance_attr:
+ raise TypeError("instance_attr is none")
+
+ for prop in self.properties:
+ if not hasattr(instance_attr, prop):
+ raise RequiredAttributeError(instance, self._type)
+ if not self._properties[prop].check(getattr(instance_attr, prop)):
+ raise RequiredPropertyError(instance, self._type, prop)
+
+ return True
+
+ #--------------------------------------------------------------------------
+ # Requirement logic
+ #--------------------------------------------------------------------------
+
+ def merge(self, other):
+ assert other.requirement_type == self._type, \
+ "Cannot merge Requirements with different types"
+
+ for prop in other.properties:
+ if prop in self._properties:
+ self._properties[prop].merge(other.properties[prop])
+ else:
+ self._properties[prop] = other.properties[prop]
+
+ if other._capabilities:
+ self._capabilities |= other._capabilities
+
+#------------------------------------------------------------------------------
+# Class: Requirement list
+#------------------------------------------------------------------------------
+
+class RequirementList(list):
+
+ def __init__(self,x=None):
+ super().__init__()
+ if x:
+ self.extend(x)
+
+ def append(self,x):
+ assert isinstance(x,Requirement)
+ # XXX O(n) right now, might be able to do better
+ for req in self:
+ if req.requirement_type == x.requirement_type:
+ req.merge(x)
+ return
+
+ super().append(x)
+
+ def extend(self, x):
+ for y in x:
+ self.append(y)
diff --git a/vicn/core/resource.py b/vicn/core/resource.py
new file mode 100644
index 00000000..53ad2181
--- /dev/null
+++ b/vicn/core/resource.py
@@ -0,0 +1,898 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2017 Cisco and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import asyncio
+import copy
+import logging
+import operator
+import random
+import string
+import sys
+import traceback
+import types
+
+from abc import ABC, ABCMeta
+
+from netmodel.model.mapper import ObjectSpecification
+from netmodel.model.type import String, Bool, Integer, Dict
+from netmodel.model.type import BaseType, Self
+from netmodel.util.deprecated import deprecated
+from netmodel.util.singleton import Singleton
+from vicn.core.attribute import Attribute, Multiplicity, Reference
+from vicn.core.attribute import NEVER_SET
+from vicn.core.commands import ReturnValue
+from vicn.core.event import Event, AttributeChangedEvent
+from vicn.core.exception import VICNException, ResourceNotFound
+from vicn.core.resource_factory import ResourceFactory
+from vicn.core.requirement import Requirement, Property
+from vicn.core.sa_collections import InstrumentedList, _list_decorators
+from vicn.core.scheduling_algebra import SchedulingAlgebra
+from vicn.core.state import ResourceState, UUID
+from vicn.core.state import Operations, InstanceState
+from vicn.core.task import run_task, BashTask
+
+log = logging.getLogger(__name__)
+
+NAME_SEP = '-'
+
+# Warning and error messages
+
+W_UNK_ATTR = 'Ignored unknown attribute {} for resource {}'
+E_UNK_RES_NAME = 'Unknown resource name for attribute {} in {} ({}) : {}'
+E_GET_NON_LOCAL = 'Cannot get non-local attribute {} for resource {}'
+E_AUTO_UNM = 'Trying to auto-instanciate attribute {} on unmanaged resource {}'
+
+#------------------------------------------------------------------------------
+# Resource category
+#------------------------------------------------------------------------------
+
+# A base resource is not instanciated itself but uses delegates. Which one to
+# use is resolved during initialization
+class TopLevelResource: pass
+
+class FactoryResource(TopLevelResource): pass
+class CategoryResource(TopLevelResource): pass
+
+#------------------------------------------------------------------------------
+
+class ResourceMetaclass(ABCMeta):
+ def __init__(cls, class_name, parents, attrs):
+ """
+ Args:
+ cls: The class type we're registering.
+ class_name: A String containing the class_name.
+ parents: The parent class types of 'cls'.
+ attrs: The attribute (members) of 'cls'.
+ """
+ super().__init__(class_name, parents, attrs)
+
+ # We use the metaclass to create attributes for instance, even before
+ # the Resource Factory is called. They are needed both for initializing
+ # attributes and reverse attributes, in whatever order. Only class
+ # creation allow us to clear _attributes, otherwise, we will just add
+ # those from the parent, siblings, etc...
+ cls._sanitize()
+
+#------------------------------------------------------------------------------
+
+class BaseResource(BaseType, ABC, metaclass=ResourceMetaclass):
+ """Base Resource class
+
+ The base Resource class implements all the logic related to resource
+ instances.
+
+ See also :
+ * ResourceManager : logic related to class instanciation
+ * Resource metaclass : logic related to class construction
+ * ResourceFactory : logic related to available classes and mapping from
+ name to type
+ """
+
+ __type__ = TopLevelResource
+
+ name = Attribute(String, description = 'Alias name for the resource')
+ managed = Attribute(Bool, description = 'Flag: resource is managed',
+ default = True)
+ owner = Attribute(Self, description = 'Owning resource', default = None)
+ data = Attribute(Dict, description = 'User data')
+
+ #---------------------------------------------------------------------------
+ # Constructor
+ #---------------------------------------------------------------------------
+
+ def __new__(cls, *args, **kwargs):
+ """
+ We implement a "factory method" design pattern in the constructor...
+ """
+ # Ensure the resource factory exists and has been initialized, and thus
+ # that Resource objects are fully created
+ from vicn.core.resource_mgr import ResourceManager
+
+ ResourceFactory()
+
+ delegate = ResourceManager().get_resource_with_capabilities(cls, set())
+ if not delegate:
+ log.error('No delegate for abstract resource : %s', cls.__name__)
+ raise VICNException
+
+ instance = super().__new__(delegate)
+
+ return instance
+
+ def __init__(self, *args, **kwargs):
+ from vicn.core.resource_mgr import ResourceManager
+
+ # Cache dependencies
+ self._deps = None
+
+ # Internal data tag for resources
+ self._internal_data = dict()
+
+ mandatory = { a.name for a in self.iter_attributes() if a.mandatory }
+
+ for key, value in kwargs.items():
+ attribute = self.get_attribute(key)
+ if attribute is None:
+ log.warning(W_UNK_ATTR.format(key, self.get_type()))
+ continue
+
+ if isinstance(value, Reference):
+ if value._resource is Self:
+ value = getattr(self, value._attribute)
+ else:
+ value = getattr(value._resource, value._attribute)
+
+ if value and issubclass(attribute.type, Resource):
+ if attribute.is_collection:
+ new_value = list()
+ for x in value:
+ if isinstance(x, str):
+ resource = ResourceManager().by_name(x)
+ elif isinstance(x, UUID):
+ resource = ResourceManager().by_uuid(x)
+ else:
+ resource = x
+ if not resource:
+ raise VICNException(E_UNK_RES_NAME.format(key,
+ self.name, self.__class__.__name__, x))
+ element = resource if isinstance(resource, Reference) \
+ else resource._state.uuid
+ new_value.append(element)
+ value = new_value
+ else:
+ if isinstance(value, str):
+ resource = ResourceManager().by_name(value)
+ elif isinstance(value, UUID):
+ resource = ResourceManager().by_uuid(value)
+ else:
+ resource = value
+ if not resource:
+ raise VICNException(E_UNK_RES_NAME.format(key,
+ self.name, self.__class__.__name__, value))
+ value = value if isinstance(resource, Reference) \
+ else resource._state.uuid
+ self.set(key, value, blocking=False)
+ mandatory -= { key }
+
+ # Check that all mandatory atttributes have been set
+ # Mandatory resource attributes will be marked as pending since they
+ # might be discovered
+ # Eventually, their absence will be discovered at runtime
+ if mandatory:
+ raise VICNException('Mandatory attributes not set: %r' % (mandatory,))
+
+ # Check requirements
+ for attr in self.iter_attributes():
+ if issubclass(attr.type, Resource) and attr.requirements:
+ for req in attr.requirements:
+ instance = self.get(attr.name)
+ if instance is None:
+ continue
+ ResourceManager().add_instance_requirement(instance, req)
+
+ self._subresources = None
+
+ def __after__(self):
+ return tuple()
+
+ def __after_init__(self):
+ return tuple()
+
+ def __subresources__(self):
+ return None
+
+ def set_subresources(self, subresources):
+ if not subresources:
+ return
+
+ # Add state to operators
+ for sr in subresources:
+ if not hasattr(sr, '_state'):
+ sr._state = InstanceState(self._state.manager, sr)
+
+ self._subresources = subresources
+
+ def get_uuid(self):
+ return self._state.uuid
+
+ def from_uuid(self, uuid):
+ return self._state.manager.by_uuid(uuid)
+
+ #--------------------------------------------------------------------------
+ # Object model
+ #--------------------------------------------------------------------------
+
+ def get(self, attribute_name, default=NEVER_SET, unref=True, resolve=True,
+ allow_never_set=True, blocking=True):
+
+ attribute = self.get_attribute(attribute_name)
+
+ # Handling Lambda attributes
+ if hasattr(attribute, 'func') and attribute.func:
+ value = attribute.func(self)
+ else:
+ if self.is_local_attribute(attribute.name):
+ value = vars(self).get(attribute.name, NEVER_SET)
+ else:
+ # A pending value has priority
+ value = self._state.dirty.get(attribute.name, NEVER_SET)
+ if value.value is not NEVER_SET:
+ value = value.value
+ else:
+ # otherwise, let's use a previously fetched value if it
+ # exists
+ value = vars(self).get(attribute.name, NEVER_SET)
+
+ if value is NEVER_SET:
+ if not allow_never_set:
+ log.error(E_GET_NON_LOCAL.format(attribute_name,
+ self._state.uuid))
+ raise NotImplementedError
+
+ if attribute.is_collection:
+ value = self.get_default_collection(attribute)
+ else:
+ if attribute.auto:
+ # Automatic instanciation
+ if attribute.requirements:
+ log.warning('Ignored requirements {}'.format(
+ attribute.requirements))
+ value = self.auto_instanciate(attribute)
+
+ if value is NEVER_SET:
+ value = self.get_default(attribute)
+
+ if self.is_local_attribute(attribute.name):
+ self.set(attribute.name, value)
+
+ if unref and isinstance(value, UUID):
+ value = self.from_uuid(value)
+
+ if resolve and isinstance(value, Reference):
+ if value._resource is Self:
+ value = getattr(self, value._attribute)
+ else:
+ value = getattr(value._resource, value._attribute)
+
+ return value
+
+ async def async_get(self, attribute_name, default=NEVER_SET, unref=True,
+ resolve=True, allow_never_set=False, blocking=True):
+ attribute = self.get_attribute(attribute_name)
+
+ # Handling Lambda attributes
+ if hasattr(attribute, 'func') and attribute.func:
+ value = self.func(self)
+ else:
+ if self.is_local_attribute(attribute.name):
+ value = vars(self).get(attribute.name, NEVER_SET)
+ else:
+
+ # A pending value has priority
+ value = self._state.dirty.get(attribute.name, NEVER_SET)
+ if value.value is not NEVER_SET:
+ value = value.value
+ else:
+ # otherwise, let's use a previously fetched value if it
+ # exists
+ value = vars(self).get(attribute.name, NEVER_SET)
+ if value is NEVER_SET:
+ await self._state.manager.attribute_get(self,
+ attribute_name, value)
+ value = vars(self).get(attribute.name, NEVER_SET)
+
+ # Handling NEVER_SET
+ if value is NEVER_SET:
+ if not allow_never_set:
+ log.error(E_GET_NON_LOCAL.format(attribute_name,
+ self._state.uuid))
+ raise NotImplementedError
+
+ if attribute.is_collection:
+ value = self.get_default_collection(attribute)
+ else:
+ if attribute.auto:
+ # Automatic instanciation
+ if attribute.requirements:
+ log.warning('Ignored requirements {}'.format(
+ attribute.requirements))
+ value = self.auto_instanciate(attribute)
+
+ if value is NEVER_SET:
+ value = self.get_default(attribute)
+
+ if value is self.is_local_attribute(attribute.name):
+ self.set(attribute.name, value)
+
+ if unref and isinstance(value, UUID):
+ value = self.from_uuid(value)
+
+ if resolve and isinstance(value, Reference):
+ if value._resource is Self:
+ value = getattr(self, value._attribute)
+ else:
+ value = getattr(value._resource, value._attribute)
+
+ return value
+
+ def _set(self, attribute_name, value, current=False, set_reverse=True):
+ """
+ Note that set does not automatically mark a resource dirty.
+ We might need a flag to avoid dirty by default, which will be useful
+ when a resource is modified by another resource: eg x.up, or
+ x.ip_address = y, ...
+ Returns : task that can be monitored (note that it is not scheduled)
+ """
+ attribute = self.get_attribute(attribute_name)
+
+ if set_reverse and attribute.reverse_name:
+ for base in self.__class__.mro():
+ if not hasattr(base, '_reverse_attributes'):
+ continue
+
+ for ra in base._reverse_attributes.get(attribute, list()):
+ # Value information : we need resources, not uuids
+ if attribute.is_collection:
+ lst = list()
+ if value:
+ for x in value:
+ if isinstance(x, UUID):
+ x = self.from_uuid(x)
+ lst.append(x)
+ value = InstrumentedList(lst)
+ value._attribute = attribute
+ value._instance = self
+ else:
+ if isinstance(value, UUID):
+ value = self.from_uuid(value)
+
+ if ra.multiplicity == Multiplicity.OneToOne:
+ if value is not None:
+ value.set(ra.name, self, set_reverse = False)
+ elif ra.multiplicity == Multiplicity.ManyToOne:
+ for element in value:
+ value.set(ra.name, self, set_reverse = False)
+ elif ra.multiplicity == Multiplicity.OneToMany:
+ if value is not None:
+ collection = value.get(ra.name)
+ collection.append(self)
+ else:
+ value is None
+ elif ra.multiplicity == Multiplicity.ManyToMany:
+ collection = value.get(ra.name)
+ value.extend(self)
+
+ # Handling value : we need uuids, not resources
+ if attribute.is_collection:
+ if not isinstance(value, InstrumentedList):
+ lst = list()
+ if value:
+ for x in value:
+ if isinstance(x, Resource):
+ x = x.get_uuid()
+ lst.append(x)
+
+ value = InstrumentedList(lst)
+ else:
+ value = InstrumentedList([])
+ value._attribute = attribute
+ value._instance = self
+ else:
+ if isinstance(value, Resource):
+ value = value.get_uuid()
+ return value
+
+ def set(self, attribute_name, value, current=False, set_reverse=True,
+ blocking = True):
+ value = self._set(attribute_name, value, current=current,
+ set_reverse=set_reverse)
+ if self.is_local_attribute(attribute_name) or current:
+ # super()
+ if value is None:
+ attribute = self.get_attribute(attribute_name)
+ vars(self)[attribute_name] = value
+
+ else:
+ fut = self._state.manager.attribute_set(self, attribute_name, value)
+ asyncio.ensure_future(fut)
+
+ async def async_set(self, attribute_name, value, current=False,
+ set_reverse=True, blocking=True):
+ """
+ Example:
+ - setting the ip address on a node's interface
+
+ We need to communicate our intention to the resource manager, which will
+ process our request in a centralized fashion, and do the necessary
+ steps for us to set the value properly.
+ """
+ value = self._set(attribute_name, value, current=current,
+ set_reverse=set_reverse)
+ await self._state.manager.attribute_set(self, attribute_name, value,
+ blocking=blocking)
+
+ def set_many(self, attribute_dict, current=False):
+ if not attribute_dict:
+ return
+ for k, v in attribute_dict.items():
+ self.set(k, v, current=current)
+
+ def is_set(self, attribute_name):
+ return attribute_name in vars(self)
+
+# def clean(self, attribute_name):
+# return self._state.manager.attribute_clean(self, attribute_name)
+
+ def is_local_attribute(self, attribute_name):
+ ACTIONS = ['get', 'set', 'add', 'remove']
+ for action in ACTIONS:
+ method = '_{}_{}'.format(action, attribute_name)
+ if hasattr(self, method) and getattr(self, method) is not None:
+ return False
+ return True
+
+ def get_default_collection(self, attribute):
+ if isinstance(attribute.default, types.FunctionType):
+ default = attribute.default(self)
+ elif isinstance(attribute.default, Reference):
+ if attribute.default._resource is Self:
+ default = getattr(self, attribute.default._attribute)
+ else:
+ default = getattr(attribute.default._resource,
+ attribute.default._attribute)
+ else:
+ default = attribute.default
+ value = InstrumentedList(default)
+ value._attribute = attribute
+ value._instance = self
+ return value
+
+ def get_default(self, attribute):
+ if isinstance(attribute.default, types.FunctionType):
+ value = attribute.default(self)
+ elif isinstance(attribute.default, Reference):
+ if attribute.default._resource is Self:
+ value = getattr(self, attribute.default._attribute)
+ else:
+ value = getattr(attribute.default._resource,
+ attribute.default._attribute)
+ else:
+ value = copy.deepcopy(attribute.default)
+ return value
+
+ def async_get_task(self, attribute_name):
+ task = getattr(self, '_get_{}'.format(attribute_name))()
+ assert not isinstance(task, tuple)
+ return task
+
+
+ def async_set_task(self, attribute_name, value):
+ raise NotImplementedError
+ return async_task(async_set_task, attribute_name, value)
+
+ @classmethod
+ def get_attribute(cls, key):
+ # Searchs if it is a recursive attribute
+ try:
+ pos = key.find('.')
+ if pos >= 0:
+ attr, subattr = key[0:pos], key[pos+1: len(key)]
+ return getattr(cls,attr).type.get_attribute(subattr)
+ return getattr(cls, key)
+ except AttributeError:
+ return None
+
+ @classmethod
+ def _sanitize(cls):
+ """Sanitize the object model to accomodate for multiple declaration
+ styles
+
+ In particular, this method:
+ - set names to all attributes
+ """
+ cls._reverse_attributes = dict()
+ cur_reverse_attributes = dict()
+ for name, obj in vars(cls).items():
+ if not isinstance(obj, ObjectSpecification):
+ continue
+ if isinstance(obj, Attribute):
+ obj.name = name
+
+ # Remember whether a reverse_name is defined before loading
+ # inherited properties from parent
+ has_reverse = bool(obj.reverse_name)
+
+ # Handle overloaded attributes
+ # By recursion, it is sufficient to look into the parent
+ for base in cls.__bases__:
+ if hasattr(base, name):
+ parent_attribute = getattr(base, name)
+ obj.merge(parent_attribute)
+ assert obj.type
+
+ # Handle reverse attribute
+ #
+ # NOTE: we need to do this after merging to be sure we get all
+ # properties inherited from parent (eg. multiplicity)
+ if has_reverse:
+ a = {
+ 'name' : obj.reverse_name,
+ 'description' : obj.reverse_description,
+ 'multiplicity' : Multiplicity.reverse(obj.multiplicity),
+ 'auto' : obj.reverse_auto,
+ }
+ reverse_attribute = Attribute(cls, **a)
+ reverse_attribute.is_aggregate = True
+
+ cur_reverse_attributes[obj.type] = reverse_attribute
+
+ #print('*** class backref ***', cls, obj, reverse_attribute)
+ if not obj in cls._reverse_attributes:
+ cls._reverse_attributes[obj] = list()
+ cls._reverse_attributes[obj].append(reverse_attribute)
+
+ for kls, a in cur_reverse_attributes.items():
+ setattr(kls, a.name, a)
+
+ @classmethod
+ def iter_attributes(cls, aggregates = False):
+ for name in dir(cls):
+ attribute = getattr(cls, name)
+ if not isinstance(attribute, Attribute):
+ continue
+ if attribute.is_aggregate and not aggregates:
+ continue
+
+ yield attribute
+
+ def iter_keys(self):
+ for attribute in self.iter_attributes():
+ if attribute.key == True:
+ yield attribute
+
+ def get_keys(self):
+ return list(self.iter_keys())
+
+ def auto_instanciate(self, attribute):
+ if self.managed is False:
+ raise ResourceNotFound(E_AUTO_UNM.format(attribute, self))
+ cstr_attributes = dict()
+
+ for a in attribute.type.iter_attributes():
+ if not a.mandatory:
+ continue
+
+ # Let's find attributes in the remote class that are of my
+ # class, and let's setup them to me
+ if issubclass(a.type, Resource) and isinstance(self, a.type):
+ cstr_attributes[a.name] = self
+ continue
+
+ if hasattr(self, a.name):
+ cstr_attributes[a.name] = getattr(self, a.name)
+
+ capabilities = set()
+ reqs = self._state.manager.get_instance_requirements(self)
+ for req in reqs:
+ if req._type != attribute.name:
+ continue
+
+ for attr_name, prop in req.properties.items():
+ value = next(iter(prop.value))
+ capabilities |= req._capabilities
+
+ # We need to find a subclass of self._resource with proper capabilities
+ cls = self._state.manager.get_resource_with_capabilities(
+ attribute.type, capabilities)
+
+ # Before creating a new instance of a class, let's check
+ resource = cls(**cstr_attributes)
+
+ self._state.manager.commit_resource(resource)
+ return resource
+
+ def get_attributes(self, aggregates = False):
+ return list(self.iter_attributes(aggregates = aggregates))
+
+ def get_attribute_names(self, aggregates = False):
+ return set(a.name
+ for a in self.iter_attributes(aggregates = aggregates))
+
+ def get_attribute_dict(self, field_names = None, aggregates = False,
+ uuid = True):
+ assert not field_names or field_names.is_star()
+ attributes = self.get_attributes(aggregates = aggregates)
+
+ ret = dict()
+ for a in attributes:
+ if not a.is_set(self):
+ continue
+ value = getattr(self, a.name)
+ if a.is_collection:
+ ret[a.name] = list()
+ for x in value:
+ if uuid and isinstance(x, Resource):
+ x = x._state.uuid._uuid
+ ret[a.name].append(x)
+ else:
+ if uuid and isinstance(value, Resource):
+ value = value._state.uuid._uuid
+ ret[a.name] = value
+ return ret
+
+ def get_tuple(self):
+ return (self.__class__, self._get_attribute_dict())
+
+ @property
+ def state(self):
+ return self._state.state
+
+ @state.setter
+ def state(self, state):
+ self._state.state = state
+
+ def get_types(self):
+ return [cls.__name__.lower() for cls in self.__class__.mro()
+ if cls.__name__ not in ('ABC', 'BaseType', 'object')]
+
+ def get_type(self):
+ return self.__class__.__name__.lower()
+
+ def has_type(self, typ):
+ return typ in self.get_types()
+
+ def __repr__(self):
+ # Showing aggregate attributes can cause infinite loops
+ name = self._state.uuid if self.name in (None, NEVER_SET) else self.name
+ return '<{}: {} {}>'.format(self.__class__.__name__, name,
+ ', '.join('{}={}'.format(k,v)
+ for k, v in self.get_attribute_dict().items()))
+
+ def __str__(self):
+ return self.__repr__()
+
+ #---------------------------------------------------------------------------
+ # Resource helpers
+ #---------------------------------------------------------------------------
+
+ def get_dependencies(self, allow_unresolved = False):
+ if not self._deps:
+ deps = set()
+ for a in self.iter_attributes():
+ if not issubclass(a.type, Resource):
+ continue
+ if a.is_aggregate:
+ continue
+
+ value = getattr(self, a.name)
+ if not value:
+ continue
+
+ if a.multiplicity in (Multiplicity.OneToOne,
+ Multiplicity.ManyToOne):
+ resource = value
+ if not resource:
+ log.warning('Null resource')
+ continue
+ if not resource.managed:
+ continue
+ uuid = resource._state.uuid
+ # Avoid considering oneself as a dependency due to
+ # ResourceAttribute(Self)
+ if uuid != self._state.uuid:
+ deps.add(uuid)
+ else:
+ resources = value
+ for cpt, resource in enumerate(resources):
+ if not resource:
+ log.warning('Null resource in collection')
+ continue
+ if not resource.managed:
+ continue
+ uuid = resource._state.uuid
+ deps.add(uuid)
+ self._deps = deps
+ return self._deps
+
+ def make_name(self, *args, type=True, id=True):
+ l = list()
+ if type:
+ l.append(self.__class__.__name__)
+ l.extend(list(args))
+ if id:
+ N = 3
+ uuid = ''.join(random.choice(string.ascii_uppercase +
+ string.digits) for _ in range(N))
+ l.append(uuid)
+ name = NAME_SEP.join(str(x) for x in l)
+ return name
+
+
+ def check_requirements(self):
+ for attr in self.iter_attributes():
+ if issubclass(attr.type, Resource) and attr.requirements:
+ for req in attr.requirements:
+ instance = getattr(self, attr.name)
+ req.check(instance)
+
+ #--------------------------------------------------------------------------
+ # Triggers
+ #--------------------------------------------------------------------------
+
+ @deprecated
+ def trigger(self, action, attribute_name, *args, **kwargs):
+ self._state.manager.trigger(self, action, attribute_name,
+ *args, **kwargs)
+
+ #--------------------------------------------------------------------------
+ # Object model
+ #
+ # Only assignment is implemented here, other operators are overloaded in
+ # the Attribute class (core.attribute.Attribute)
+ #--------------------------------------------------------------------------
+
+ def format(self, fmt):
+ return fmt.format(**self.get_attribute_dict(uuid = False))
+
+ def get_tag(self, tag_name, default = NEVER_SET):
+ """
+ A tag corresponds to a propery that is required by a class in all of
+ its inheritors. For instance, a service requires than a subclass
+ informs about the 'service_name' tag, which is a class member named
+ according to the following convention : __service_name__.
+ """
+ tag = '__{}__'.format(tag_name)
+ if not tag in vars(self.__class__):
+ if default is NEVER_SET:
+ return default
+ raise NotImplementedError('Missing tag {} in class {}'.format(tag,
+ self.__class__.__name__))
+ return getattr(self.__class__, tag)
+
+ def iter_backrefs(self):
+ for base in self.__class__.mro():
+ if not hasattr(base, '_reverse_attributes'):
+ continue
+ for attr, rattrs in base._reverse_attributes.items():
+ instances = self.get(attr.name, allow_never_set = True)
+ if instances in (None, NEVER_SET):
+ continue
+ if not attr.is_collection:
+ instances = [instances]
+ for instance in instances:
+ for rattr in rattrs:
+ yield instance, rattr
+
+ #---------------------------------------------------------------------------
+ # Accessors
+ #---------------------------------------------------------------------------
+
+ @classmethod
+ def has_attribute(cls, name):
+ return name in [a.name for a in cls.attributes()]
+
+ def has_callback(self, action, attribute):
+ return hasattr(self, '_{}_{}'.format(action, attribute.name))
+
+ def is_setup(self):
+ return self.state in (ResourceState.SETUP_PENDING,
+ ResourceState.SETUP, ResourceState.DIRTY)
+
+ __get__ = None
+ __create__ = None
+ __delete__ = None
+
+#-------------------------------------------------------------------------------
+# Helper functions
+#-------------------------------------------------------------------------------
+
+# The following Mixin are useful to convert an expresson of subresources into
+# an expression of tasks.
+
+class ConcurrentMixin:
+ async def async_commit_to_manager(self, manager):
+ await asyncio.gather(*[element.async_commit_to_manager(manager)
+ for element in self._elements])
+ await asyncio.gather(*[e._state.clean.wait() for e in self._elements])
+ self._state.clean.set()
+
+class SequentialMixin:
+ async def async_commit_to_manager(self, manager):
+ for element in self._elements:
+ await element.async_commit_to_manager(manager)
+ await element._state.clean.wait()
+ self._state.clean.set()
+
+class CompositionMixin:
+ async def async_commit_to_manager(self, manager):
+ for element in self._elements:
+ await element.async_commit_to_manager(manager)
+ await element._state.clean.wait()
+ self._state.clean.set()
+
+_Resource, EmptyResource = SchedulingAlgebra(BaseResource, ConcurrentMixin,
+ CompositionMixin, SequentialMixin)
+
+class ManagedResource(_Resource):
+ def __init__(self, *args, **kwargs):
+ from vicn.core.resource_mgr import ResourceManager
+ owner = kwargs.get('owner', None)
+ name = kwargs.get('name', None)
+
+ manager = ResourceManager()
+ self.register_to_manager(manager, name=name)
+
+ # Manager is needed for reference and reverse attributes
+ super().__init__(*args, **kwargs)
+
+ async def async_commit_to_manager(self, manager):
+ if not self.managed:
+ return
+ self._state.manager.commit_resource(self)
+
+ def register_to_manager(self, manager, name = None):
+ if not self.managed:
+ return
+ manager.add_resource(self, name = name)
+
+Resource = ManagedResource
+
+class BashResource(Resource):
+ """
+ __get__ : use return code of the bash command
+
+ Intermediate values and attributes: should be dict-like
+ Actually, we should collect attributes: dict update/remove, map/reduce
+ """
+ __node__ = None
+ __cmd_get__ = None
+ __cmd_create__ = None
+ __cmd_delete__ = None
+
+ def __get__(self):
+ assert self.__cmd_get__
+ return BashTask(self.node, self.__cmd_get__, {'self': self})
+
+ def __create__(self):
+ assert self.__cmd_create__
+ return BashTask(self.node, self.__cmd_create__, {'self': self})
+
+ def __delete__(self):
+ assert self.__cmd_delete__
+ return BashTask(self.node, self.__cmd_delete__, {'self': self})
+
diff --git a/vicn/core/resource_factory.py b/vicn/core/resource_factory.py
new file mode 100644
index 00000000..15504729
--- /dev/null
+++ b/vicn/core/resource_factory.py
@@ -0,0 +1,84 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2017 Cisco and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import copy
+import inspect
+import logging
+import pkgutil
+import traceback
+
+from netmodel.model.type import Type
+from netmodel.util.singleton import Singleton
+
+log = logging.getLogger(__name__)
+
+# Blacklist : resource that are temporarily disabled loaded
+RESOURCE_BLACKLIST = ['LinuxBridge']
+
+class ResourceFactory(metaclass=Singleton):
+ """
+ This manages classes, not instances
+ """
+
+ def __init__(self):
+ self._registry = dict()
+ self._register_all()
+
+ def _register_all(self):
+ log.info("Registering resources")
+ from vicn.core.resource import Resource
+
+ from vicn import resource as package
+ prefix = package.__name__ + "."
+
+ # Because aggregates might not be instanciated in order, we accumulate
+ # them and register them at the end
+ delayed_aggregates = dict()
+
+ # Explored modules are automatically imported by walk_modules + it
+ # allows to explore recursively resources/
+ # http://docs.python.org/2/library/pkgutil.html
+ for importer, modname, ispkg in pkgutil.walk_packages(package.__path__,
+ prefix, onerror = None):
+ try:
+ module = __import__(modname, fromlist = "dummy")
+
+ classes = [m[1] for m in inspect.getmembers(module,
+ inspect.isclass) if m[1].__module__ == modname]
+ for cls in classes:
+ if not issubclass(cls, Resource):
+ continue
+
+ if cls.__name__ in RESOURCE_BLACKLIST:
+ print('Skipped blacklisted resource ' + cls.__name__)
+ continue
+
+ # Register module to resource factory
+ self._registry[cls.__qualname__] = cls
+ Type._registry[cls.__qualname__.lower()] = cls
+
+ except Exception as e:
+ log.warning("Cannot load %s : %s: %s" % (modname, e,
+ traceback.format_exc()))
+
+ log.info("Registered resources are: {%s}" % ", ".join(sorted(
+ self._registry.keys())))
+
+ def get_available_resources(self):
+ return self._registry
+
diff --git a/vicn/core/resource_mgr.py b/vicn/core/resource_mgr.py
new file mode 100644
index 00000000..f6082488
--- /dev/null
+++ b/vicn/core/resource_mgr.py
@@ -0,0 +1,1436 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2017 Cisco and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys, logging, asyncio, socket, functools
+import time
+
+# LXD workaround
+from pylxd.exceptions import NotFound as LxdNotFound, LXDAPIException
+
+from netmodel.model.filter import Filter
+from netmodel.model.query import Query, ACTION_SELECT, ACTION_INSERT
+from netmodel.model.query import ACTION_UPDATE, ACTION_SUBSCRIBE
+from netmodel.model.sql_parser import SQLParser
+from netmodel.network.packet import Packet
+from netmodel.network.router import Router
+from netmodel.util.toposort import toposort, toposort_flatten
+from netmodel.util.meta import inheritors
+from netmodel.util.singleton import Singleton
+from netmodel.util.misc import is_iterable
+from vicn.core.attribute import NEVER_SET
+from vicn.core.exception import VICNException, ResourceNotFound
+from vicn.core.resource_factory import ResourceFactory
+from vicn.core.resource import Resource, FactoryResource, EmptyResource
+from vicn.core.sa_collections import InstrumentedList
+from vicn.core.state import InstanceState, ResourceState
+from vicn.core.state import AttributeState, Operations, PendingValue
+from vicn.core.task import TaskManager, wait_task, task, async_task
+from vicn.core.task import EmptyTask, BashTask
+
+log = logging.getLogger(__name__)
+
+ENABLE_LXD_WORKAROUND = True
+
+# Monitoring queries
+
+Q_SUB_VPP = 'SUBSCRIBE SUM(*) FROM interface WHERE device_name INCLUDED [{}]'
+Q_SUB_IF = 'SUBSCRIBE * FROM interface WHERE device_name == "{}"'
+Q_SUB_STATS = 'SUBSCRIBE * FROM stats'
+Q_SUB_EMULATOR_IF = 'SUBSCRIBE * FROM interface WHERE id == "{}"'
+Q_SUB_EMULATOR = 'SUBSCRIBE * FROM interface WHERE device_name == "{}"'
+
+# Log messages
+
+S_WAIT_DEP = ' .. Waiting for dependency {}'
+S_WAIT_DEP_OK = ' .. Done waiting for dependency {}'
+S_WAIT = ' .. Waiting for {}'
+S_WAIT_OK = ' .. Done waiting for {}'
+S_WAIT_PRED = ' - Waiting for initialization of predecessors...'
+S_WAIT_SRS = ' - Waiting for subresources...'
+S_REG_SR = ' . Registering subresource to manager {}...'
+S_WAIT_SR = ' . Waiting for subresource: {}'
+S_WAIT_SR_OK = ' . Subresource is ready: {}'
+S_AFTER = ' . AFTER TYPE={}'
+
+S_INIT_DONE = 'INIT done. Resource exists. Process attribute dict {}'
+S_GET_DONE = 'GET done. Resource does not exist (exception was: {})'
+S_KEYS_OK = 'Keys initialized, resource can now be created.'
+S_CREATE_OK = 'CREATE success. Process attribute dict {}'
+
+#------------------------------------------------------------------------------
+# Helpers
+#------------------------------------------------------------------------------
+
+async def wait_resource(resource):
+ await resource._state.clean.wait()
+
+async def wait_resource_init(resource):
+ await resource._state.init.wait()
+
+async def wait_resources(resources):
+ await asyncio.gather(*[wait_resource(r) for r in resources])
+
+wait_resource_task = async_task(wait_resource)
+wait_resources_task = async_task(wait_resources)
+
+#------------------------------------------------------------------------------
+
+class ResourceManager(metaclass=Singleton):
+ """
+ A ResourceManager is in charge of managing resources, their lifecycle, and
+ interfaces to them.
+ """
+
+ def __init__(self, base, settings):
+
+ # Base directory for scenario
+ self._base = base
+
+ # Resources sorted via dependency (instances)
+ self._resources = dict()
+
+ self._deps = None
+
+ # Store resource requirements used for automatic instanciation
+ # instance -> attribute -> requirements
+ self._instance_requirements = dict()
+
+ self._dirty = set()
+ self._auto_commit = False
+
+ # class -> Requirements
+ self._class_requirements = dict()
+
+ self._map_uuid_name = dict()
+ self._map_name_uuid = dict()
+ self._map_str_uuid = dict()
+
+ # The task manager is used to schedule tasks used for resource
+ # synchronization
+ self._task_mgr = TaskManager()
+
+ # Store experiment settings
+ self._settings = settings
+
+ # Cache available resource types
+ _available = ResourceFactory().get_available_resources()
+ self._available = { k.lower(): v for k, v in _available.items() }
+
+ # API / user interface
+ self._router = Router(vicn_callback = self._on_vicn_command)
+ self._router.add_interface('unixserver')
+ self._router.add_interface('local', router = self._router)
+ self._router.add_interface('vicn', manager = self)
+
+ ws_port = self.get('websocket_port')
+ self._ws = self._router.add_interface('websocketserver',
+ port = ws_port)
+
+ # Monitoring
+ self._monitored = set()
+ self._pending_monitoring = set()
+
+ # For debug
+ self._committed = set()
+
+ def terminate(self):
+ self._router.terminate()
+
+ #--------------------------------------------------------------------------
+ # Settings
+ #--------------------------------------------------------------------------
+
+ def set_settings(self, settings):
+ if settings is None:
+ return
+ self._settings.update(settings)
+
+ def get(self, setting):
+ return self._settings[setting]
+
+ def set(self, setting, value):
+ self._settings[setting] = value
+
+ #--------------------------------------------------------------------------
+ # Monitoring
+ #
+ # XXX This code should be deprecated / moved into a separate module.
+ # Planned for a future release.
+ #--------------------------------------------------------------------------
+
+ def _on_vicn_command(self, command):
+ if command == 'setup':
+ self.setup()
+ elif command == 'teardown':
+ self.teardown()
+ elif command == 'monitor':
+ self.monitor()
+ elif command == 'terminate':
+ loop = asyncio.get_event_loop()
+ loop.stop()
+ else:
+ # open_terminal, ...
+ raise NotImplementedError
+
+ def _broadcast(self, query):
+ if not self._ws:
+ return
+ self._ws.execute(query)
+
+ def _broadcast_packet(self, packet):
+ self._broadcast(packet.to_query())
+
+ def _on_ns_record(self, packet):
+ query = packet.to_query()
+
+ if not query.object_name == 'interface':
+ return
+ q = Query(ACTION_UPDATE, 'channel', filter = query.filter,
+ params = query.params)
+ q.reply = True
+
+ self._ws.execute(q)
+ return None
+
+ def _on_netmon_record(self, packet):
+ query = packet.to_query()
+
+ # Find channel related to query
+ # NOTE: we update the channel twice, once for each interface...
+ if query.object_name == 'interface':
+ device_names = [value for key, op, value in query.filter.to_list()
+ if key == 'device_name']
+ if not device_names:
+ log.error('No device name in packet=', packet)
+ return
+ device_name = device_names[0]
+ node_name = query.params['node']
+ node = ResourceManager().by_name(node_name)
+ if node is None:
+ return None
+ for interface in node.interfaces:
+ if interface.device_name == device_name:
+ if interface.channel:
+ f = Filter.from_list([['id', '==',
+ interface.channel._state.uuid._uuid]])
+ q = Query(ACTION_UPDATE, 'channel', filter = f,
+ params = query.params)
+ q.reply = True
+ self._ws.execute(q)
+ return None
+ return None
+ return None
+ return None
+
+ def _on_netmon_channel_record(self, packet):
+ query = packet.to_query()
+ if query.object_name == 'interface':
+ device_names = [value for key, op, value in query.filter.to_list()
+ if key == 'device_name']
+ if not device_names:
+ log.error('No device name in packet=', packet)
+ return
+
+ device_name = device_names[0]
+
+ f = Filter.from_list([['id', '==', device_name]])
+ q = Query(ACTION_UPDATE, 'channel', filter = f,
+ params = query.params)
+ q.reply = True
+ self._ws.execute(q)
+ return None
+
+ return None
+
+ def _on_vpp_record(self, packet, pylink_id):
+ query = packet.to_query()
+ if query.object_name == 'interface':
+ device_names = [value for key, op, value in query.filter.to_list()
+ if key == 'device_name']
+ if not device_names:
+ log.error('No device name in packet=', packet)
+ return
+
+ # We might want to check if the query has SUM(*)
+ f = Filter.from_list([['id', '==', pylink_id]])
+ q = Query(ACTION_UPDATE, 'channel', filter = f,
+ params = query.params)
+ q.reply = True
+ self._ws.execute(q)
+ return None
+
+ print('discard packet in on_netmon_channel_record', query)
+ return None
+
+ #--------------------------------------------------------------------------
+ # Resource management
+ #--------------------------------------------------------------------------
+
+ def create_from_dict(self, **resource):
+ resource_type = resource.pop('type', None)
+ assert resource_type
+
+ return self.create(resource_type.lower(), **resource)
+
+ def create(self, resource_type, **attributes):
+ cls = self._available.get(resource_type)
+ if not cls:
+ raise Exception("Ignored resource with unknown type %s: %r" %
+ (resource_type, attributes))
+
+ resource = cls(**attributes)
+
+ name = attributes.get('name', None)
+ if name:
+ self._map_uuid_name[resource._state.uuid] = name
+ self._map_name_uuid[name] = resource._state.uuid
+
+ return resource
+
+ def get_resource_type_names(self):
+ return ResourceFactory().get_available_resources().keys()
+
+ def add_resource(self, instance, name = None):
+ instance._state = InstanceState(self, instance, name = name)
+
+ self._resources[instance._state.uuid] = instance
+ self._map_str_uuid[instance._state.uuid._uuid] = instance._state.uuid
+ self._deps = None
+
+ def commit_resource(self, resource):
+ """
+ Committing a resource creates an asyncio function implementing a state
+ management automaton.
+ """
+ asyncio.ensure_future(self._process_resource(resource))
+
+ def commit(self):
+ """
+ Commit all resource whose owner is not set, and mark unmanaged
+ resources as clean.
+
+ This function is used at initialization.
+ """
+
+ # Start FSM for all managed resources
+ for resource in self.get_resources():
+ if resource.owner is not None:
+ continue
+ if resource.managed == False:
+ asyncio.ensure_future(self._set_resource_state(resource,
+ ResourceState.CLEAN))
+ continue
+
+ self.commit_resource(resource)
+
+ def setup(self, commit=False):
+ """
+ This function is in charge of setting up all resources needed by the
+ experiment. Since it might be a long process, it should be asynchronous
+ at some point. So far, we let resources take care of this by themselves.
+ """
+ self._auto_commit = commit
+ if commit:
+ self.commit()
+
+ def get_resource_with_capabilities(self, cls, capabilities):
+ if '__type__' in cls.__dict__ and cls.__type__ == FactoryResource:
+ candidates = inheritors(cls)
+ if not candidates:
+ log.error('Abstract resource with no candidates: %s',
+ cls.__name__)
+ return None
+
+ for delegate in candidates:
+ if capabilities and (not '__capabilities__' in vars(delegate)
+ or not capabilities.issubset(delegate.__capabilities__)):
+ continue
+ log.info("Abstract resource %s, delegated %s among %r" % \
+ (cls.__name__, delegate.__name__, candidates))
+ return delegate
+ return None
+ else:
+ if capabilities and (not '__capabilities__' in vars(delegate) or
+ not capabilities.issubset(delegate.__capabilities__)):
+ log.error('Capabilities conflict for resource : %s',
+ cls.__name__)
+ raise VICNException
+ return cls
+
+ def find(self, resource_tuple):
+ cls, attr_dict = resource_tuple
+ for instance in self.by_type(cls):
+ cur_attr_dict = instance._get_attribute_dict()
+ common_keys = [k for k in cur_attr_dict.keys()
+ if k in attr_dict.keys()]
+ if all(attr_dict[k] == cur_attr_dict[k] for k in common_keys):
+ return instance
+ return None
+
+ def __iter__(self):
+ for resource in self._resources.values():
+ yield resource
+
+ def resources(self):
+ return list(self.__iter__())
+
+ def _sort_resources(self):
+ deps = {}
+ for instance in self.resources():
+ deps[instance._state.uuid] = \
+ instance.get_dependencies(allow_unresolved = True)
+
+ self._deps = toposort_flatten(deps)
+
+ def _sorted_resources(self):
+ """
+ Iterates on resources based on their dependencies
+ """
+ if not self._deps:
+ self._sort_resources()
+ for dep in self._deps:
+ try:
+ yield self._resources[dep]
+ except KeyError:
+ log.error('Dependency not found : {}'.format(dep))
+ raise InvalidResource
+
+ def sorted_resources(self):
+ return list(self._sorted_resources())
+
+ #--------------------------------------------------------------------------
+ # Queries
+ #--------------------------------------------------------------------------
+
+ def by_uuid(self, uuid):
+ return self._resources.get(uuid)
+
+ def by_uuid_str(self, uuid_str):
+ uuid = self._map_str_uuid.get(uuid_str)
+ return self._resources.get(uuid)
+
+ def by_name(self, name):
+ uuid = self._map_name_uuid.get(name)
+ return self.by_uuid(uuid)
+
+ def get_resources(self):
+ return self._resources.values()
+
+ def get_aggregates(self, resource_name, resource_cls):
+ """
+ Get aggregated object.
+ """
+ if not resource_name in self._aggregates:
+ return None
+ all_aggregates = self._aggregates[resource_name]
+
+ if not resource_cls in all_aggregates:
+ return None
+ aggregates = all_aggregates[resource_cls]
+
+ assert all(isinstance(x, resource_cls) for x in aggregates)
+ return aggregates
+
+ def get_aggregate(self, resource_name, resource_cls):
+ aggregates = self.get_aggregates(resource_name, resource_cls)
+ if not aggregates:
+ return None
+ assert len(aggregates) == 1
+ return next(aggregates)
+
+ def by_type(self, type):
+ return [r for r in self if isinstance(r, type)]
+
+ def by_type_str(self, typestr):
+ cls = self._available.get(typestr.lower())
+ if not cls:
+ return list()
+ return self.by_type(cls)
+
+ #--------------------------------------------------------------------------
+ # Requirements
+ #--------------------------------------------------------------------------
+
+ def add_instance_requirement(self, instance, requirement):
+ uuid = instance._state.uuid
+ if not uuid in self._instance_requirements:
+ self._instance_requirements[uuid] = list()
+ self._instance_requirements[uuid].append(requirement)
+
+ def get_instance_requirements(self, instance):
+ uuid = instance._state.uuid
+ return self._instance_requirements.get(uuid, dict())
+
+ def add_class_requirement(self, cls, requirement):
+ if not cls in self._class_requirements:
+ self._class_requirements[cls] = list()
+ self._class_requirements[cls].append(requirement)
+
+ #--------------------------------------------------------------------------
+ # Events
+ #--------------------------------------------------------------------------
+
+ def on(self, resource_name, event, action):
+ resource = self._resources.get(resource_name, None)
+ if not resource:
+ return
+
+ resource.on(event, action)
+
+ #--------------------------------------------------------------------------
+ # Task management
+ #--------------------------------------------------------------------------
+
+ def schedule(self, task):
+ if task is None or isinstance(task, EmptyTask):
+ return
+ self._task_mgr.schedule(task)
+
+ #--------------------------------------------------------------------------
+ # Asynchronous resource API
+ #
+ # The manager is the only one to submit tasks to the scheduler since it can
+ # store and share the results, manage concurrent access, etc.
+ # As many functions are not thread safe, we make sure that they are all
+ # executed in the manager's thread (=main thread).
+ #--------------------------------------------------------------------------
+
+ async def resource_exits(self, resource):
+ await self._resource_get()
+ await self.wait_resource_exists(resource)
+ return resource._state.exists
+
+ async def wait_attr_init(self, resource, attribute_name):
+ await resource._state.attr_init[attribute_name].wait()
+
+ async def wait_attr_clean(self, resource, attribute_name):
+ await resource._state.attr_clean[attribute_name].wait()
+
+ async def attribute_get(self, resource, attribute, value):
+ await self.wait_attr_init(resource, attribute)
+ return resource.get(attribute)
+
+ async def attribute_set(self, resource, attribute_name, value,
+ blocking=True):
+ with await resource._state.write_lock:
+ # Add the current operation to the pending list
+ # NOTE: collections are unordered and can be updated concurrently
+ #self._attribute_set_pending_value(resource, attribute_name)
+ resource._state.dirty[attribute_name].trigger(Operations.SET,
+ value)
+
+ attr_state = resource._state.attr_state[attribute_name]
+ if attr_state == AttributeState.CLEAN:
+ resource._state.attr_state[attribute_name] = \
+ AttributeState.DIRTY
+ elif attr_state in [
+ # Nothing to do since we know the attribute value will be
+ # processed later.
+ # If the attribute was not processed by default, we would have
+ # to change the state of the attribute so that it gets
+ # processed.
+ AttributeState.UNINITIALIZED,
+ AttributeState.INITIALIZED,
+ AttributeState.PENDING_INIT,
+ AttributeState.DIRTY]:
+ pass
+ else:
+ # We cannot have the lock for instance if the attribute is
+ # being updated.
+ raise RuntimeError
+
+ resource_state = resource._state.state
+ if resource_state == ResourceState.CLEAN:
+ resource._state.state = ResourceState.DIRTY
+ resource._state.change_event.set()
+ elif resource_state in [
+ ResourceState.UNINITIALIZED,
+ ResourceState.INITIALIZED,
+ ResourceState.PENDING_KEYS,
+ ResourceState.KEYS_OK,
+ ResourceState.PENDING_DEPS,
+ ResourceState.DEPS_OK,
+ ResourceState.PENDING_CREATE,
+ ResourceState.CREATED,
+ ResourceState.DIRTY,
+ ]:
+ pass # Nothing to do, the attribute will get processed
+ else:
+ # ResourceState.PENDING_UPDATE
+ # other
+ raise RuntimeError("Resource cannot be in state".format(
+ resource_state))
+
+ if blocking:
+ await self.wait_attr_clean(resource, attribute_name)
+
+ #---------------------------------------------------------------------------
+ # Resource dependency management
+ #---------------------------------------------------------------------------
+
+ async def _resource_wait_attributes(self, resource):
+ """Check dependencies and requirements
+
+ Inspect all attributes for referenced resources, and their eventual
+ requirements.
+ """
+ self.log(resource, ' - Waiting for attribute dependencies...')
+ for attr in resource.iter_attributes():
+ if issubclass(attr.type, Resource):
+ deps = resource.get(attr.name)
+ if deps is None:
+ # Not really a dependency, we expect mandatory to prevent
+ # us to continue if we should not
+ continue
+
+ if not attr.is_collection:
+ deps = [deps]
+
+ for dep in deps:
+ # XXX This could be done in parallel
+ if not dep.managed:
+ continue
+ dep_pfx = '{}:{}'.format(dep.get_type(), dep.get_uuid())
+ self.log(resource, S_WAIT_DEP. format(dep_pfx))
+ await wait_resource(dep)
+ self.log(resource, S_WAIT_DEP_OK. format(dep_pfx))
+
+ if not attr.requirements:
+ continue
+
+ for req in attr.requirements:
+ dep_attr_name = req.requirement_type
+ dep_attr = dep.get_attribute(dep_attr_name)
+ assert issubclass(dep_attr.type, Resource)
+ dep_attr_value = dep.get(dep_attr_name)
+
+ if not dep_attr_value:
+ dep_attr_value = dep.auto_instanciate(dep_attr)
+ setattr(dep, dep_attr_name, dep_attr_value)
+
+ dep_attr_value_pfx = '{}:{}'.format(
+ dep_attr_value.get_type(),
+ dep_attr_value.get_uuid())
+ self.log(resource,
+ S_WAIT_DEP.format(dep_attr_value_pfx))
+ await wait_resource(dep_attr_value)
+ self.log(resource,
+ S_WAIT_DEP_OK .format(dep_attr_value_pfx))
+
+ async def _resource_wait_predecessors(self, resource):
+ after = resource.__after__()
+ if after:
+ self.log(resource, ' - Waiting for predecessors...')
+ for resource_type in after:
+ self.log(resource, ' . AFTER TYPE={}'.format(resource_type))
+ befores = resource._state.manager.by_type_str(resource_type)
+ for before in befores:
+ if not before.managed:
+ continue
+ before_pfx = '{}:{}'.format(before.get_type(),
+ before.get_uuid())
+ self.log(resource, S_WAIT.format(before_pfx))
+ await wait_resource(before)
+ self.log(resource, S_WAIT_OK.format(before_pfx))
+
+ after_init = resource.__after_init__()
+ if after_init:
+ self.log(resource, S_WAIT_PRED)
+ for resource_type in after_init:
+ self.log(resource, S_AFTER.format(resource_type))
+ befores = resource._state.manager.by_type_str(resource_type)
+ for before in befores:
+ if not before.managed:
+ continue
+ before_pfx = '{}:{}'.format(before.get_type(),
+ before.get_uuid())
+ self.log(resource, S_WAIT.format(before_pfx))
+ await wait_resource_init(before)
+ self.log(resource, S_WAIT_OK.format(before_pfx))
+
+ async def _resource_wait_subresources(self, resource):
+ self.log(resource, S_WAIT_SRS)
+
+ # We should accumulate subresources through the hierarchy
+ sr = EmptyResource()
+ for base in reversed(resource.__class__.mro()):
+ if '__subresources__' not in vars(base):
+ continue
+ sr = sr > base.__subresources__(resource)
+
+ if sr is not None and not isinstance(sr, EmptyResource):
+ resource.set_subresources(sr)
+ pfx_sr = '{}:{}'.format(sr.get_type(), sr.get_uuid())
+ self.log(resource, S_REG_SR .format(pfx_sr))
+ await sr.async_commit_to_manager(self)
+ self.log(resource, S_WAIT_SR.format(pfx_sr))
+ await wait_resource(sr)
+ self.log(resource, S_WAIT_SR_OK.format(pfx_sr))
+
+ async def _resource_wait_dependencies(self, resource):
+ self.log(resource, 'Waiting for dependencies...')
+ await self._resource_wait_attributes(resource)
+ await self._resource_wait_predecessors(resource)
+ await self._resource_wait_subresources(resource)
+
+ def _task_resource_action(self, resource, action):
+ """Perform action: __get__, __create__, __delete__ on the full class
+ hierarchy.
+ """
+ task = EmptyTask()
+ for base in reversed(resource.__class__.mro()):
+ # To avoid adding several times the same task
+ if action not in vars(base):
+ continue
+ func = getattr(base, action, None)
+ if func is None:
+ continue
+ t = func(resource)
+
+ task = task > t
+
+ return task
+
+ #--------------------------------------------------------------------------
+ # Resource model
+ #--------------------------------------------------------------------------
+
+ def _task_attribute_op(self, resource, attribute, op):
+ return getattr(resource, '_{}_{}'.format(op, attribute.name))()
+
+ #--------------------------------------------------------------------------
+ # Attribute FSM
+ #--------------------------------------------------------------------------
+
+ def _attribute_is_dirty(self, resource, attribute):
+ """
+ Precondition:
+ Attribute has been retrieved
+ """
+ pending_value = resource._state.dirty[attribute.name]
+ return pending_value.value != NEVER_SET
+
+ async def __set_attribute_state(self, resource, attribute_name, state):
+ """Sets the resource state (no-lock version)
+
+ It is important to centralize state change since some states are
+ associated with Events().
+ """
+ resource._state.attr_state[attribute_name] = state
+ if state in [
+ AttributeState.INITIALIZED,
+ AttributeState.CLEAN,
+ AttributeState.DIRTY
+ ]:
+ resource._state.attr_init[attribute_name].set()
+ else:
+ raise RuntimeError("Inconsistent resource state {}".format(state))
+
+ if state in [AttributeState.CLEAN]:
+ resource._state.attr_clean[attribute_name].set()
+ elif state in [
+ AttributeState.INITIALIZED,
+ AttributeState.DIRTY
+ ]:
+ resource._state.attr_clean[attribute_name].clear()
+ else:
+ raise RuntimeError
+
+ async def _set_attribute_state(self, resource, attribute_name, state):
+ """Sets the attribute state (lock version)
+ """
+ with await resource._state.attr_lock[attribute_name]:
+ await self.__set_attribute_state(resource, attribute_name, state)
+
+ def _trigger_attr_state_change(self, resource, attribute, fut):
+ try:
+ ret = fut.result()
+ resource._state.attr_change_success[attribute.name] = True
+ resource._state.attr_change_value[attribute.name] = ret
+ except Exception as e:
+ resource._state.attr_change_success[attribute.name] = False
+ resource._state.attr_change_value[attribute.name] = e
+ resource._state.attr_change_event[attribute.name].set()
+
+ async def attribute_process(self, resource, attribute):
+ """
+ Temporary FSM executing in parallel for attribute management. Those FSM
+ are under the responsability of the main resource FSM.
+
+ Precondition:
+ Attribute state is initialized
+ """
+ self.attr_log(resource, attribute,
+ 'Starting attribute FSM for {}'.format(attribute.name))
+
+ new_state = None
+ while new_state != AttributeState.CLEAN:
+ #with await resource._state.attr_lock[attribute.name]:
+ state = resource._state.attr_state[attribute.name]
+ self.attr_log(resource, attribute,
+ 'Current state is {}'.format(state))
+
+ if resource._state.attr_change_success == False:
+ log.error('Attribute error')
+ e = resource._state.attr_change_value[attribute.name]
+ import traceback; traceback.print_tb(e.__traceback__)
+ raise NotImplementedError
+
+ # Signal update errors to the parent resource
+ resource._state.attr_change_event[attribute.name].set()
+
+ elif state == AttributeState.UNINITIALIZED:
+ pending_state = AttributeState.PENDING_INIT
+ elif state in AttributeState.INITIALIZED:
+ pending_state = AttributeState.PENDING_UPDATE
+ elif state == AttributeState.DIRTY:
+ pending_state = AttributeState.PENDING_UPDATE
+ elif state in [
+ AttributeState.PENDING_INIT,
+ AttributeState.PENDING_UPDATE
+ ]:
+ # Nothing to do
+ pending_state = None
+ elif state == AttributeState.CLEAN:
+ return
+ else:
+ raise RuntimeError
+
+ if pending_state is None:
+ self.attr_log(resource, attribute,
+ 'Nothing to do. Waiting for event...')
+ await resource._state.attr_change_event[attribute.name].wait()
+ resource._state.attr_change_event[attribute.name].clear()
+ self.attr_log(resource, attribute, 'Wake up from event')
+ continue
+
+ if pending_state == AttributeState.PENDING_INIT:
+ task = self._task_attribute_op(resource, attribute, 'get')
+ elif pending_state == AttributeState.PENDING_UPDATE:
+ pending_value = resource._state.dirty[attribute.name]
+
+ if pending_value.value == NEVER_SET:
+ assert len(pending_value.operations) == 0
+ task = EmptyTask()
+ else:
+ try:
+ task = self._task_attribute_op(resource, attribute,
+ Operations.SET)
+ except Exception as e:
+ log.warning('No attribute setter attribute {}'.format(
+ attribute))
+ task = EmptyTask()
+ else:
+ raise RuntimeError
+
+ if task is not None and not isinstance(task, EmptyTask):
+ state_change = functools.partial( \
+ self._trigger_attr_state_change, resource, attribute)
+ task.add_done_callback(state_change)
+ self.attr_log(resource, attribute,
+ 'Trigger {} -> {}. Waiting task completion'.format(
+ state, pending_state))
+ self.schedule(task)
+
+ await resource._state.attr_change_event[attribute.name].wait()
+ resource._state.attr_change_event[attribute.name].clear()
+ self.attr_log(resource, attribute,
+ 'Completed {} -> {}. Success = {}'.format(
+ state, pending_state,
+ resource._state.attr_change_success[attribute.name]))
+ else:
+ # If this value is not reset, attributes get updated many times
+ resource._state.attr_change_value[attribute.name] = NEVER_SET
+
+ if pending_state == AttributeState.PENDING_INIT:
+ if resource._state.attr_change_success[attribute.name] == True:
+ attrs = resource._state.attr_change_value[attribute.name]
+ self.attr_log(resource, attribute,
+ 'INIT success. Value = {}'.format(attrs))
+ found = self._process_attr_dict(resource, attribute, attrs)
+ if not found:
+ log.error('Attribute missing return attrs: {}'.format(
+ attrs))
+ found = self._process_attr_dict(resource, attribute,
+ attrs)
+ new_state = AttributeState.INITIALIZED
+ else:
+ attrs = resource._state.attr_change_value[attribute.name]
+ self.attr_log(resource, attribute,
+ 'INIT gave no value. Value = {}'.format(attrs))
+ new_state = AttributeState.INITIALIZED
+
+ elif pending_state == AttributeState.PENDING_UPDATE:
+ if resource._state.attr_change_success[attribute.name] == True:
+ attrs = resource._state.attr_change_value[attribute.name]
+ self.attr_log(resource, attribute,
+ 'UPDATE success. Value = {}. Attribute is CLEAN'.format(attrs))
+ if attrs != NEVER_SET:
+ # None could be interpreted as the return value. Also,
+ # we need not to overwrite the value from get
+ self._process_attr_dict(resource, attribute, attrs)
+
+ # We might do this for all returned attributes
+ cur_value = vars(resource)[attribute.name]
+ if attribute.is_collection:
+ tmp = InstrumentedList(pending_value.value)
+ tmp._attribute = cur_value._attribute
+ tmp._instance = cur_value._instance
+ else:
+ tmp = pending_value.value
+ vars(resource)[attribute.name] = tmp
+ pending_value.clear()
+
+ new_state = AttributeState.CLEAN
+ else:
+ log.error('Attribute error')
+ e = resource._state.attr_change_value[attribute.name]
+ import traceback; traceback.print_tb(e.__traceback__)
+ raise NotImplementedError
+
+ else:
+ raise RuntimeError
+
+ # Setting attribute state
+ await self._set_attribute_state(resource, attribute.name,
+ new_state)
+
+ #--------------------------------------------------------------------------
+ # Resource FSM
+ #--------------------------------------------------------------------------
+
+ def parse_query(self, line):
+ dic = SQLParser().parse(line)
+ if not dic:
+ raise RuntimeError("Can't parse input command: %s" % command)
+
+ return Query.from_dict(dic)
+
+ def _monitor_netmon(self, resource):
+ ip = resource.node.host_interface.ip_address
+ if not ip:
+ log.error('IP of monitored Node is None')
+ import os; os._exit(1)
+
+ ws = self._router.add_interface('websocketclient', address=ip,
+ hook=self._on_netmon_record)
+
+ node = resource.node
+ for interface in node.interfaces:
+ if not interface.monitored:
+ continue
+
+ if interface.get_type() == 'dpdkdevice' and hasattr(node,'vpp'):
+
+ # Check if vICN has already subscribed for one interface in
+ # the channel
+ if hasattr(interface.channel,'already_subscribed'):
+ continue
+
+ channel_id = interface.channel._state.uuid._uuid
+
+ update_vpp = functools.partial(self._on_vpp_record,
+ pylink_id = channel_id)
+ ws_vpp = self._router.add_interface('websocketclient',
+ address=ip, hook=update_vpp)
+
+ aggregate_interfaces = list()
+ for _interface in node.interfaces:
+ if not _interface.get_type() == 'dpdkdevice' and \
+ _interface.monitored:
+ aggregate_interfaces.append('"' +
+ _interface.device_name + '"')
+
+ q_str = Q_SUB_VPP.format(','.join(aggregate_interfaces))
+ q = self.parse_query(q_str)
+ packet = Packet.from_query(q)
+ self._router._flow_table.add(packet, None, ws_vpp)
+ ws_vpp.send(packet)
+
+ # Prevent vICN to subscribe to other interfaces of the same
+ # channel
+ interface.channel.already_subscribed = True
+
+ else:
+ q_str = Q_SUB_IF.format(interface.device_name)
+ q = self.parse_query(q_str)
+ packet = Packet.from_query(q)
+ self._router._flow_table.add(packet, None, ws)
+ ws.send(packet)
+
+ def _monitor_emulator(self, resource):
+ ns = resource
+ ip = ns.node.bridge.ip_address # host_interface.ip_address
+
+ ws_ns = self._router.add_interface('websocketclient', address = ip,
+ port = ns.control_port,
+ hook = self._on_ns_record)
+ ws = self._router.add_interface('websocketclient', address = ip,
+ hook = self._on_netmon_channel_record)
+
+ for station in ns.stations:
+ if not station.managed:
+ interface = [i for i in station.interfaces if i.channel == ns]
+ assert len(interface) == 1
+ interface = interface[0]
+ identifier = interface.name
+ else:
+ iface = ns._sta_ifs[station]
+ identifier = iface._state.uuid._uuid
+
+ # Monitor the wireless channel for position and link rate
+ q_str = Q_SUB_EMULATOR_IF.format(identifier)
+ q = self.parse_query(q_str)
+ packet = Packet.from_query(q)
+ self._router._flow_table.add(packet, None, ws_ns)
+ ws_ns.send(packet)
+
+ # We also need to subscribe on the node for the tap interfaces
+ # for individual bandwidth monitoring
+ tap = ns._sta_taps[station]
+ q_str = Q_SUB_EMULATOR.format(tap.device_name)
+ q = self.parse_query(q_str)
+ packet = Packet.from_query(q)
+ self._router._flow_table.add(packet, None, ws)
+ ws.send(packet)
+
+ def _monitor(self, resource):
+ if resource.get_type() == 'centralip':
+ for uuid in self._pending_monitoring:
+ pending = self.by_uuid(uuid)
+ self._monitor(pending)
+ self._pending_monitoring.clear()
+ return
+
+ central_ip = self.by_type_str('centralip')
+ if not central_ip:
+ raise NotImplementedError('Missing CentralIP in experiment')
+ central_ip = central_ip[0]
+
+ uuid = resource.get_uuid()
+
+ if central_ip._state.state != ResourceState.CLEAN:
+ self._pending_monitoring.add(uuid)
+ return
+
+ if uuid in self._monitored:
+ return
+ self._monitored.add(uuid)
+
+ if resource.get_type() == 'netmon':
+ if resource.node.get_type() != 'lxccontainer':
+ return
+ self._monitor_netmon(resource)
+
+ elif resource.has_type('emulatedchannel'):
+ self._monitor_emulator(resource)
+
+ async def __set_resource_state(self, resource, state):
+ """Sets the resource state (no-lock version)
+
+ It is important to centralize state change since some states are
+ associated with Events().
+ """
+ resource._state.state = state
+ if state == ResourceState.CLEAN:
+ # Monitoring hook
+ self._monitor(resource)
+ resource._state.clean.set()
+ else:
+ resource._state.clean.clear()
+ if state == ResourceState.INITIALIZED:
+ resource._state.init.set()
+
+ async def _set_resource_state(self, resource, state):
+ """Sets the resource state (lock version)
+ """
+ with await resource._state.lock:
+ await self.__set_resource_state(resource, state)
+
+ def _trigger_state_change(self, resource, fut):
+ try:
+ ret = fut.result()
+ resource._state.change_success = True
+ resource._state.change_value = ret
+ except Exception as e:
+ resource._state.change_success = False
+ resource._state.change_value = e
+ resource._state.change_event.set()
+
+ def _process_attr_dict(self, resource, attribute, attrs):
+ if not isinstance(attrs, dict):
+ if attribute is None:
+ return False
+ attrs = {attribute.name: attrs}
+ resource.set_many(attrs, current=True)
+ return True
+
+ async def _task_resource_update(self, resource):
+ # Monitor all FSM one by one and inform about errors.
+ futs = list()
+ attrs = list()
+ for attr in resource.iter_attributes():
+ if resource.is_local_attribute(attr.name):
+ continue
+ if attr.key:
+ # Those attributes are already done
+ continue
+
+ attrs.append(attr)
+ fut = self.attribute_process(resource, attr)
+ futs.append(fut)
+
+ if not futs:
+ self.log(resource, 'No attribute to update')
+ return None
+
+ await asyncio.gather(*futs)
+
+ # Inform the resource about the outcome of the update process
+ # Error if at least one attribute failed.
+ resource._state.change_success = all(
+ resource._state.attr_change_success[attr.name]
+ for attr in attrs)
+ self.log(resource,
+ 'All attributes FSM terminated with success={}'.format(
+ resource._state.change_success))
+
+ if resource._state.change_success:
+ ret = [ resource._state.attr_change_value[attr.name]
+ for attr in attrs]
+ return ret
+ else:
+ raise NotImplementedError('At least one attribute failed')
+
+ async def _task_resource_keys(self, resource):
+ # Monitor all FSM one by one and inform about errors.
+ futs = list()
+ attrs = list()
+ for attr in resource.get_keys():
+ if resource.is_local_attribute(attr.name):
+ continue
+ attrs.append(attr)
+ fut = self.attribute_process(resource, attr)
+ futs.append(fut)
+
+ if not futs:
+ self.log(resource, 'No key attribute to update')
+ return None
+
+ await asyncio.gather(*futs)
+
+ # Inform the resource about the outcome of the update process
+ # Error if at least one attribute failed.
+ resource._state.change_success = all(
+ resource._state.attr_change_success[attr.name]
+ for attr in attrs)
+ self.log(resource,
+ 'KEY attributes FSM terminated with success={}'.format(
+ resource._state.change_success))
+
+ if resource._state.change_success:
+ ret = resource._state.attr_change_value
+ return ret
+ else:
+ raise NotImplementedError('At least one attribute failed')
+
+ #--------------------------------------------------------------------------
+ # Logging
+ #--------------------------------------------------------------------------
+
+ def log(self, resource, msg=None):
+ resource._state.log.append(msg)
+
+ # Display on screen
+ #pfx = '[{}] {}: '.format(resource.get_type(), resource.get_uuid())
+ #print(pfx, msg)
+
+ def attr_log(self, resource, attribute, msg):
+ resource._state.attr_log[attribute.name].append(msg)
+
+ # Display on screen
+ #pfx = '[{}] {} / {}: '.format(resource.get_type(), resource.get_uuid(),
+ # attribute.name)
+ #print(pfx, msg)
+
+ #--------------------------------------------------------------------------
+
+ async def _process_resource(self, resource):
+ """
+ We need to schedule the first set of subresources, knowing others will
+ be orchestrated by the operators
+ - subresources need to enter the system in order
+ -> we just add them to the manager in time
+ - but they need to be managed by the system
+ - in particular, the owner waits for the system to complete
+ subresoruces: this is the implementation of __get__ __create__
+ __delete__ in the base resource
+ """
+ pfx = '[{}] {}: '.format(resource.get_type(), resource.get_uuid())
+
+ self.log(resource, 'Starting FSM...')
+
+ # When a resource is managed, it will get automatically monitored by
+ # adding the netmon resource on it.
+ from vicn.resource.node import Node
+ if resource.get_type() == 'lxccontainer':
+ self.log(resource,
+ 'Associating monitoring to lxc container resource...')
+ instance = self.create('netmon', node=resource)
+ self.commit_resource(instance)
+
+ # FIXME
+ elif resource.get_type() == 'physical' and resource.managed and \
+ len(self.by_type_str('emulatedchannel')) > 0:
+ self.log(resource,
+ 'Associating monitoring to physical node resource...')
+ instance = self.create('netmon', node=resource)
+ self.commit_resource(instance)
+
+ state = None
+
+ while True:
+ with await resource._state.lock:
+
+ # FSM implementation
+ state = resource._state.state
+ self.log(resource, 'Current state is {}'.format(state))
+
+ if resource._state.change_success == False:
+ e = resource._state.change_value
+ import traceback; traceback.print_tb(e.__traceback__)
+ raise NotImplementedError
+
+ elif state == ResourceState.UNINITIALIZED:
+ pending_state = ResourceState.PENDING_DEPS
+ elif state == ResourceState.DEPS_OK:
+ pending_state = ResourceState.PENDING_INIT
+
+ elif state == ResourceState.INITIALIZED:
+ pending_state = ResourceState.PENDING_GET
+
+ elif state == ResourceState.GET_DONE:
+ if resource.get_keys():
+ pending_state = ResourceState.PENDING_KEYS
+ else:
+ pending_state = ResourceState.PENDING_CREATE
+
+ elif state == ResourceState.KEYS_OK:
+ pending_state = ResourceState.PENDING_CREATE
+
+ elif state in [ResourceState.CREATED, ResourceState.DIRTY]:
+ pending_state = ResourceState.PENDING_UPDATE
+
+ elif state == ResourceState.DELETED:
+ raise NotImplementedError
+ # Nothing to do unless explicitely requested
+ pending_state = None
+
+ elif state in [
+ ResourceState.PENDING_DEPS,
+ ResourceState.PENDING_INIT,
+ ResourceState.PENDING_CREATE,
+ ResourceState.PENDING_DELETE,
+ ResourceState.CLEAN
+ ]:
+ # Nothing to do
+ pending_state = None
+ else:
+ raise RuntimeError
+
+ # Implement state changes
+ #
+ # If a task is already pending, we simply wait for it to complete
+ if pending_state is None:
+ # Wait for an external change
+ self.log(resource, 'Nothing to do. Waiting for event...')
+ await resource._state.change_event.wait()
+ self.log(resource, 'Wake up from event')
+ resource._state.change_event.clear()
+ continue
+
+ if pending_state == ResourceState.PENDING_DEPS:
+ # XXX Maybe for any action, we need to wait for dependencies to
+ # be up to date
+ task = async_task(functools.partial(self._resource_wait_dependencies, resource))()
+
+ elif pending_state == ResourceState.PENDING_INIT:
+ task = self._task_resource_action(resource, '__initialize__')
+
+ elif pending_state == ResourceState.PENDING_GET:
+ task = self._task_resource_action(resource, '__get__')
+ if isinstance(task, BashTask):
+ task.set_default_parse_for_get()
+
+ elif pending_state == ResourceState.PENDING_KEYS:
+ task = async_task(functools.partial(self._task_resource_keys, resource))()
+
+ elif pending_state == ResourceState.PENDING_CREATE:
+ task = self._task_resource_action(resource, '__create__')
+
+ elif pending_state == ResourceState.PENDING_UPDATE:
+ # Instead of tasks, we wait for many smaller autoamtons to
+ # terminate
+ await resource._state.write_lock.acquire()
+ task = async_task(functools.partial(self._task_resource_update, resource))()
+
+ elif pending_state == ResourceState.PENDING_DELETE:
+ task = self._task_resource_action(resource, '__delete__')
+
+ else:
+ raise RuntimeError
+
+ if task is not None and not isinstance(task, EmptyTask):
+ state_change = functools.partial(self._trigger_state_change, resource)
+ task.add_done_callback(state_change)
+ self.schedule(task)
+
+ self.log(resource, 'Trigger {} -> {}. Waiting task completion'.format(
+ state, pending_state))
+ await resource._state.change_event.wait()
+ resource._state.change_event.clear()
+ self.log(resource, 'Completed {} -> {}. Success = {}'.format(
+ state, pending_state, resource._state.change_success))
+
+ # If no task, can assume there is an instant switch to the next step...
+
+ # Update state based on task results
+ if pending_state == ResourceState.PENDING_DEPS:
+ # XXX NO CHANGE SUCCESS TEST ??
+ new_state = ResourceState.DEPS_OK
+
+ elif pending_state == ResourceState.PENDING_INIT:
+ if resource._state.change_success == True:
+ attrs = resource._state.change_value
+ self.log(resource, 'INIT done.')
+ new_state = ResourceState.INITIALIZED
+ else:
+ e = resource._state.change_value
+ import traceback; traceback.print_tb(e.__traceback__)
+ raise NotImplementedError
+
+ elif pending_state == ResourceState.PENDING_GET:
+ if resource._state.change_success == True:
+ attrs = resource._state.change_value
+ self.log(resource, S_INIT_DONE.format(attrs))
+ self._process_attr_dict(resource, None, attrs)
+ new_state = ResourceState.CREATED
+ else:
+ e = resource._state.change_value
+ if ENABLE_LXD_WORKAROUND and \
+ resource.get_type() != 'lxccontainer' and \
+ isinstance(e, LxdNotFound):
+ # "not found" is the normal exception when the container
+ # does not exists. anyways the bug should only occur
+ # with container.execute(), not container.get()
+ log.error('LXD Fix (not found). Reset resource')
+ new_state = ResourceState.UNINITIALIZED
+ elif ENABLE_LXD_WORKAROUND and isinstance(e, LXDAPIException):
+ # "not found" is the normal exception when the container
+ # does not exists. anyways the bug should only occur
+ # with container.execute(), not container.get()
+ log.error('LXD Fix (API error). Reset resource')
+ new_state = ResourceState.UNINITIALIZED
+ elif isinstance(e, ResourceNotFound):
+ # The resource does not exist
+ self.log(resource, S_GET_DONE.format(
+ resource._state.change_value))
+ new_state = ResourceState.GET_DONE
+ resource._state.change_value = None
+ else:
+ e = resource._state.change_value
+ import traceback; traceback.print_tb(e.__traceback__)
+ raise NotImplementedError
+ resource._state.change_success = True
+
+ elif pending_state == ResourceState.PENDING_KEYS:
+ if resource._state.change_success == True:
+ new_state = ResourceState.KEYS_OK
+ self.log(resource, S_KEYS_OK)
+ else:
+ e = resource._state.change_value
+ self.log(resource, 'KEYS failed: {}'.format(e))
+
+ if ENABLE_LXD_WORKAROUND and isinstance(e, LxdNotFound):
+ log.error('LXD Fix (not found). Reset resource')
+ new_state = ResourceState.CREATED
+ resource._state.change_success = True
+ else:
+ e = resource._state.change_value
+ import traceback; traceback.print_tb(e.__traceback__)
+ raise NotImplementedError
+
+ elif pending_state == ResourceState.PENDING_CREATE:
+ if resource._state.change_success == True:
+ attrs = resource._state.change_value
+ self.log(resource, S_CREATE_OK.format(attrs))
+ self._process_attr_dict(resource, None, attrs)
+ new_state = ResourceState.CREATED
+ else:
+ e = resource._state.change_value
+
+ if ENABLE_LXD_WORKAROUND and isinstance(e, LxdNotFound):
+ log.error('LXD Fix (not found). Reset resource')
+ new_state = ResourceState.UNINITIALIZED
+ resource._state.change_success = True
+ elif ENABLE_LXD_WORKAROUND and \
+ isinstance(e, LXDAPIException):
+ log.error('LXD Fix (API error). Reset resource')
+ new_state = ResourceState.UNINITIALIZED
+ resource._state.change_success = True
+ elif 'File exists' in str(e):
+ new_state = ResourceState.CREATED
+ resource._state.change_success = True
+ elif 'dpkg --configure -a' in str(e):
+ resource._dpkg_configure_a = True
+ new_state = ResourceState.UNINITIALIZED
+ resource._state.change_success = True
+ else:
+ self.log(resource, 'CREATE failed: {}'.format(e))
+ e = resource._state.change_value
+ import traceback; traceback.print_tb(e.__traceback__)
+ raise NotImplementedError
+
+ elif pending_state == ResourceState.PENDING_UPDATE:
+ if resource._state.change_success == True:
+ self.log(resource, 'Update finished, resource is CLEAN.')
+ new_state = ResourceState.CLEAN
+ resource._state.write_lock.release()
+ else:
+ e = resource._state.change_value
+ self.log(resource, 'UPDATE failed: {}'.format(e))
+
+ if ENABLE_LXD_WORKAROUND and isinstance(e, LxdNotFound):
+ log.error('LXD Fix (not found). Reset resource')
+ new_state = ResourceState.CREATED
+ resource._state.change_success = True
+ resource._state.write_lock.release()
+ else:
+ e = resource._state.change_value
+ resource._state.write_lock.release()
+ import traceback; traceback.print_tb(e.__traceback__)
+ raise NotImplementedError
+
+ elif pending_state == ResourceState.PENDING_DELETE:
+ raise NotImplementedError
+ new_state = None
+ else:
+ raise RuntimeError
+
+ await self._set_resource_state(resource, new_state)
+
diff --git a/vicn/core/sa_collections.py b/vicn/core/sa_collections.py
new file mode 100644
index 00000000..e627caa5
--- /dev/null
+++ b/vicn/core/sa_collections.py
@@ -0,0 +1,249 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# This module is derived from code from SQLAlchemy
+#
+# orm/collections.py
+# Copyright (C) 2005-2016 the SQLAlchemy authors and contributors
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: http://www.opensource.org/licenses/mit-license.php
+#
+
+import logging
+
+from vicn.core.sa_compat import py2k
+from vicn.core.exception import VICNListException
+
+log = logging.getLogger(__name__)
+
+def _list_decorators():
+ """Tailored instrumentation wrappers for any list-like class."""
+
+ def _tidy(fn):
+ fn._sa_instrumented = True
+ fn.__doc__ = getattr(list, fn.__name__).__doc__
+
+ def append(fn):
+ def append(self, item):
+ try:
+ item = self._attribute.do_list_add(self._instance, item)
+ fn(self, item)
+ except VICNListException as e:
+ pass
+ _tidy(append)
+ return append
+
+ def remove(fn):
+ def remove(self, value):
+ # testlib.pragma exempt:__eq__
+ try:
+ self._attribute.do_list_remove(self._instance, value)
+ fn(self, value)
+ except : pass
+ _tidy(remove)
+ return remove
+
+ def insert(fn):
+ def insert(self, index, value):
+ try:
+ value = self._attribute.do_list_add(self._instance, item)
+ fn(self, index, value)
+ except : pass
+ _tidy(insert)
+ return insert
+
+ def __getitem__(fn):
+ def __getitem__(self, index):
+ item = fn(self, index)
+ return self._attribute.handle_getitem(self._instance, item)
+ _tidy(__getitem__)
+ return __getitem__
+
+ def __setitem__(fn):
+ def __setitem__(self, index, value):
+ if not isinstance(index, slice):
+ existing = self[index]
+ if existing is not None:
+ try:
+ self._attribute.do_list_remove(self._instance, existing)
+ except: pass
+ try:
+ value = self._attribute.do_list_add(self._instance, value)
+ fn(self, index, value)
+ except: pass
+ else:
+ # slice assignment requires __delitem__, insert, __len__
+ step = index.step or 1
+ start = index.start or 0
+ if start < 0:
+ start += len(self)
+ if index.stop is not None:
+ stop = index.stop
+ else:
+ stop = len(self)
+ if stop < 0:
+ stop += len(self)
+
+ if step == 1:
+ for i in range(start, stop, step):
+ if len(self) > start:
+ del self[start]
+
+ for i, item in enumerate(value):
+ self.insert(i + start, item)
+ else:
+ rng = list(range(start, stop, step))
+ if len(value) != len(rng):
+ raise ValueError(
+ "attempt to assign sequence of size %s to "
+ "extended slice of size %s" % (len(value),
+ len(rng)))
+ for i, item in zip(rng, value):
+ self.__setitem__(i, item)
+ _tidy(__setitem__)
+ return __setitem__
+
+ def __delitem__(fn):
+ def __delitem__(self, index):
+ if not isinstance(index, slice):
+ item = self[index]
+ try:
+ self._attribute.do_list_remove(self._instance, item)
+ fn(self, index)
+ except : pass
+ else:
+ # slice deletion requires __getslice__ and a slice-groking
+ # __getitem__ for stepped deletion
+ # note: not breaking this into atomic dels
+ has_except = False
+ for item in self[index]:
+ try:
+ self._attribute.do_list_remove(self._instance, item)
+ except : has_except = True
+ if not has_except:
+ fn(self, index)
+ _tidy(__delitem__)
+ return __delitem__
+
+ if py2k:
+ def __setslice__(fn):
+ def __setslice__(self, start, end, values):
+ has_except = False
+ for value in self[start:end]:
+ try:
+ self._attribute.do_list_remove(self._instance, value)
+ except : has_except = True
+ #values = [self._attribute.do_list_add(self._instance, value) for value in values]
+ _values = list()
+ for value in values:
+ try:
+ _values.append(self._attribute.do_list_add(self._instance, value))
+ except: has_except = True
+ if not has_except:
+ fn(self, start, end, _values)
+ _tidy(__setslice__)
+ return __setslice__
+
+ def __delslice__(fn):
+ def __delslice__(self, start, end):
+ has_except = False
+ for value in self[start:end]:
+ try:
+ self._attribute.do_list_remove(self._instance, value)
+ except : has_except = True
+ if not has_except:
+ fn(self, start, end)
+ _tidy(__delslice__)
+ return __delslice__
+
+ def extend(fn):
+ def extend(self, iterable):
+ for value in iterable:
+ self.append(value)
+ _tidy(extend)
+ return extend
+
+ def __iadd__(fn):
+ def __iadd__(self, iterable):
+ # list.__iadd__ takes any iterable and seems to let TypeError
+ # raise as-is instead of returning NotImplemented
+ for value in iterable:
+ self.append(value)
+ return self
+ _tidy(__iadd__)
+ return __iadd__
+
+ def pop(fn):
+ def pop(self, index=-1):
+ try:
+ self._attribute.do_list_remove(self._instance, item)
+ item = fn(self, index)
+ return item
+ except : return None
+ _tidy(pop)
+ return pop
+
+ def __iter__(fn):
+ def __iter__(self):
+ for item in fn(self):
+ yield self._attribute.handle_getitem(self._instance, item)
+ _tidy(__iter__)
+ return __iter__
+
+ def __repr__(fn):
+ def __repr__(self):
+ return '<Collection {} {}>'.format(id(self), list.__repr__(self))
+ _tidy(__repr__)
+ return __repr__
+
+ __str__ = __repr__
+ #def __str__(fn):
+ # def __str__(self):
+ # return str(list(self))
+ # _tidy(__str__)
+ # return __str__
+
+ if not py2k:
+ def clear(fn):
+ def clear(self, index=-1):
+ has_except = False
+ for item in self:
+ try:
+ self._attribute.do_list_remove(self._instance, item)
+ except : has_except = True
+ if not has_except:
+ fn(self)
+ _tidy(clear)
+ return clear
+
+ # __imul__ : not wrapping this. all members of the collection are already
+ # present, so no need to fire appends... wrapping it with an explicit
+ # decorator is still possible, so events on *= can be had if they're
+ # desired. hard to imagine a use case for __imul__, though.
+
+ l = locals().copy()
+ l.pop('_tidy')
+ return l
+
+def _instrument_list(cls):
+ # inspired by sqlalchemy
+ for method, decorator in _list_decorators().items():
+ fn = getattr(cls, method, None)
+ if fn:
+ #if (fn and method not in methods and
+ # not hasattr(fn, '_sa_instrumented')):
+ setattr(cls, method, decorator(fn))
+
+class InstrumentedList(list):
+
+ def __contains__(self, key):
+ from vicn.core.resource import Resource
+ if isinstance(key, Resource):
+ key = key.get_uuid()
+ return list.__contains__(self, key)
+
+ def __lshift__(self, item):
+ self.append(item)
+
+_instrument_list(InstrumentedList)
diff --git a/vicn/core/sa_compat.py b/vicn/core/sa_compat.py
new file mode 100644
index 00000000..34211455
--- /dev/null
+++ b/vicn/core/sa_compat.py
@@ -0,0 +1,270 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# This module originates from SQLAlchemy
+#
+# util/compat.py
+# Copyright (C) 2005-2016 the SQLAlchemy authors and contributors
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: http://www.opensource.org/licenses/mit-license.php
+#
+
+"""Handle Python version/platform incompatibilities."""
+
+import sys
+
+try:
+ import threading
+except ImportError:
+ import dummy_threading as threading
+
+py36 = sys.version_info >= (3, 6)
+py33 = sys.version_info >= (3, 3)
+py32 = sys.version_info >= (3, 2)
+py3k = sys.version_info >= (3, 0)
+py2k = sys.version_info < (3, 0)
+py265 = sys.version_info >= (2, 6, 5)
+jython = sys.platform.startswith('java')
+pypy = hasattr(sys, 'pypy_version_info')
+win32 = sys.platform.startswith('win')
+cpython = not pypy and not jython # TODO: something better for this ?
+
+import collections
+next = next
+
+if py3k:
+ import pickle
+else:
+ try:
+ import cPickle as pickle
+ except ImportError:
+ import pickle
+
+# work around http://bugs.python.org/issue2646
+if py265:
+ safe_kwarg = lambda arg: arg
+else:
+ safe_kwarg = str
+
+ArgSpec = collections.namedtuple("ArgSpec",
+ ["args", "varargs", "keywords", "defaults"])
+
+if py3k:
+ import builtins
+
+ from inspect import getfullargspec as inspect_getfullargspec
+ from urllib.parse import (quote_plus, unquote_plus,
+ parse_qsl, quote, unquote)
+ import configparser
+ from io import StringIO
+
+ from io import BytesIO as byte_buffer
+
+ def inspect_getargspec(func):
+ return ArgSpec(
+ *inspect_getfullargspec(func)[0:4]
+ )
+
+ string_types = str,
+ binary_types = bytes,
+ binary_type = bytes
+ text_type = str
+ int_types = int,
+ iterbytes = iter
+
+ def u(s):
+ return s
+
+ def ue(s):
+ return s
+
+ def b(s):
+ return s.encode("latin-1")
+
+ if py32:
+ callable = callable
+ else:
+ def callable(fn):
+ return hasattr(fn, '__call__')
+
+ def cmp(a, b):
+ return (a > b) - (a < b)
+
+ from functools import reduce
+
+ print_ = getattr(builtins, "print")
+
+ import_ = getattr(builtins, '__import__')
+
+ import itertools
+ itertools_filterfalse = itertools.filterfalse
+ itertools_filter = filter
+ itertools_imap = map
+ from itertools import zip_longest
+
+ import base64
+
+ def b64encode(x):
+ return base64.b64encode(x).decode('ascii')
+
+ def b64decode(x):
+ return base64.b64decode(x.encode('ascii'))
+
+else:
+ from inspect import getargspec as inspect_getfullargspec
+ inspect_getargspec = inspect_getfullargspec
+ from urllib import quote_plus, unquote_plus, quote, unquote
+ from urlparse import parse_qsl
+ import ConfigParser as configparser
+ from StringIO import StringIO
+ from cStringIO import StringIO as byte_buffer
+
+ string_types = basestring,
+ binary_types = bytes,
+ binary_type = str
+ text_type = unicode
+ int_types = int, long
+
+ def iterbytes(buf):
+ return (ord(byte) for byte in buf)
+
+ def u(s):
+ # this differs from what six does, which doesn't support non-ASCII
+ # strings - we only use u() with
+ # literal source strings, and all our source files with non-ascii
+ # in them (all are tests) are utf-8 encoded.
+ return unicode(s, "utf-8")
+
+ def ue(s):
+ return unicode(s, "unicode_escape")
+
+ def b(s):
+ return s
+
+ def import_(*args):
+ if len(args) == 4:
+ args = args[0:3] + ([str(arg) for arg in args[3]],)
+ return __import__(*args)
+
+ callable = callable
+ cmp = cmp
+ reduce = reduce
+
+ import base64
+ b64encode = base64.b64encode
+ b64decode = base64.b64decode
+
+ def print_(*args, **kwargs):
+ fp = kwargs.pop("file", sys.stdout)
+ if fp is None:
+ return
+ for arg in enumerate(args):
+ if not isinstance(arg, basestring):
+ arg = str(arg)
+ fp.write(arg)
+
+ import itertools
+ itertools_filterfalse = itertools.ifilterfalse
+ itertools_filter = itertools.ifilter
+ itertools_imap = itertools.imap
+ from itertools import izip_longest as zip_longest
+
+
+import time
+if win32 or jython:
+ time_func = time.clock
+else:
+ time_func = time.time
+
+from collections import namedtuple
+from operator import attrgetter as dottedgetter
+
+
+if py3k:
+ def reraise(tp, value, tb=None, cause=None):
+ if cause is not None:
+ assert cause is not value, "Same cause emitted"
+ value.__cause__ = cause
+ if value.__traceback__ is not tb:
+ raise value.with_traceback(tb)
+ raise value
+
+else:
+ # not as nice as that of Py3K, but at least preserves
+ # the code line where the issue occurred
+ exec("def reraise(tp, value, tb=None, cause=None):\n"
+ " if cause is not None:\n"
+ " assert cause is not value, 'Same cause emitted'\n"
+ " raise tp, value, tb\n")
+
+
+def raise_from_cause(exception, exc_info=None):
+ if exc_info is None:
+ exc_info = sys.exc_info()
+ exc_type, exc_value, exc_tb = exc_info
+ cause = exc_value if exc_value is not exception else None
+ reraise(type(exception), exception, tb=exc_tb, cause=cause)
+
+if py3k:
+ exec_ = getattr(builtins, 'exec')
+else:
+ def exec_(func_text, globals_, lcl=None):
+ if lcl is None:
+ exec('exec func_text in globals_')
+ else:
+ exec('exec func_text in globals_, lcl')
+
+
+def with_metaclass(meta, *bases):
+ """Create a base class with a metaclass.
+
+ Drops the middle class upon creation.
+
+ Source: http://lucumr.pocoo.org/2013/5/21/porting-to-python-3-redux/
+
+ """
+
+ class metaclass(meta):
+ __call__ = type.__call__
+ __init__ = type.__init__
+
+ def __new__(cls, name, this_bases, d):
+ if this_bases is None:
+ return type.__new__(cls, name, (), d)
+ return meta(name, bases, d)
+ return metaclass('temporary_class', None, {})
+
+
+from contextlib import contextmanager
+
+try:
+ from contextlib import nested
+except ImportError:
+ # removed in py3k, credit to mitsuhiko for
+ # workaround
+
+ @contextmanager
+ def nested(*managers):
+ exits = []
+ vars = []
+ exc = (None, None, None)
+ try:
+ for mgr in managers:
+ exit = mgr.__exit__
+ enter = mgr.__enter__
+ vars.append(enter())
+ exits.append(exit)
+ yield vars
+ except:
+ exc = sys.exc_info()
+ finally:
+ while exits:
+ exit = exits.pop()
+ try:
+ if exit(*exc):
+ exc = (None, None, None)
+ except:
+ exc = sys.exc_info()
+ if exc != (None, None, None):
+ reraise(exc[0], exc[1], exc[2])
diff --git a/vicn/core/scheduling_algebra.py b/vicn/core/scheduling_algebra.py
new file mode 100644
index 00000000..207856c0
--- /dev/null
+++ b/vicn/core/scheduling_algebra.py
@@ -0,0 +1,97 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2017 Cisco and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+def SchedulingAlgebra(cls, concurrent_mixin=object, composition_mixin=object,
+ sequential_mixin=object): # allow_none = True
+
+ class BaseElement(cls):
+ def __default__(cls, *elements):
+ elts = [e for e in elements
+ if e is not None and not isinstance(e, Empty)]
+ if len(elts) == 0:
+ # The first is always Empty
+ assert len(elements) != 0
+ return elements[0]
+ elif len(elts) == 1:
+ return elts[0]
+ return cls(*elts)
+
+ def __concurrent__(*elements):
+ return BaseElement.__default__(Concurrent, *elements)
+
+ def __composition__(*elements):
+ return BaseElement.__default__(Composition, *elements)
+
+ def __sequential__(*elements):
+ return BaseElement.__default__(Sequential, *elements)
+
+ # Operator: |
+ __or__ = __concurrent__
+
+ # Operator: >
+ __gt__ = __sequential__
+
+ # Operator: @
+ __matmul__ = __composition__
+
+ class Element(BaseElement):
+ def __iter__(self):
+ yield self
+
+ class Operator(BaseElement):
+ def __init__(self, *elements):
+ super().__init__()
+ self._elements = list(elements)
+
+ def __iter__(self):
+ yield self
+ for element in self._elements:
+ for x in element:
+ yield x
+
+ class Concurrent(Operator, concurrent_mixin):
+ # Algebraic rule : ((A // B) // C) ~ (A // B // C)
+ def __concurrent__(self, other):
+ self._elements.append(other)
+ return self
+
+ def __repr__(self):
+ return '<Concurrent {}>'.format(self._elements)
+
+ class Composition(Operator, composition_mixin):
+ def __repr__(self):
+ return '<Composition {}>'.format(self._elements)
+
+ class Sequential(Operator, sequential_mixin):
+ def __repr__(self):
+ return '<Sequential {}>'.format(self._elements)
+
+ class Empty(Element):
+ def __concurrent__(self, other):
+ return other
+
+ def __composition__(self, other):
+ return other
+
+ def __sequential__(self, other):
+ return other
+
+ def __repr__(self):
+ return '<Empty>'
+
+ return Element, Empty
diff --git a/vicn/core/state.py b/vicn/core/state.py
new file mode 100644
index 00000000..d5069b24
--- /dev/null
+++ b/vicn/core/state.py
@@ -0,0 +1,177 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2017 Cisco and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import asyncio
+import random
+import string
+
+class NEVER_SET:
+ pass
+
+# Separator for components of the UUID
+UUID_SEP = '-'
+
+# Length of the random component of the UUID
+UUID_LEN = 5
+
+class ResourceState:
+ UNINITIALIZED = 'UNINITIALIZED'
+ PENDING_DEPS = 'PENDING_DEPS'
+ DEPS_OK = 'DEPS_OK'
+ PENDING_INIT = 'PENDING_INIT'
+ INITIALIZED = 'INITIALIZED'
+ PENDING_GET = 'PENDING_GET'
+ GET_DONE = 'GET_DONE'
+ PENDING_KEYS = 'PENDING_KEYS'
+ KEYS_OK = 'KEYS_OK'
+ PENDING_CREATE = 'PENDING_CREATE'
+ CREATED = 'CREATED'
+ DIRTY = 'DIRTY'
+ CLEAN = 'CLEAN'
+ PENDING_UPDATE = 'PENDING_UPDATE'
+ PENDING_DELETE = 'PENDING_DELETE'
+ DELETED = 'DELETED'
+
+class AttributeState:
+ UNINITIALIZED = 'UNINITIALIZED'
+ INITIALIZED = 'INITIALIZED'
+ DIRTY = 'DIRTY'
+ PENDING_INIT = 'PENDING_INIT'
+ PENDING_UPDATE = 'PENDING_UPDATE'
+ CLEAN = 'CLEAN'
+
+class Operations:
+ SET = 'set'
+ LIST_ADD = 'add'
+ LIST_REMOVE = 'remove'
+ LIST_CLEAR = 'clear'
+
+class UUID:
+ def __init__(self, name, cls):
+ self._uuid = self._make_uuid(name, cls)
+
+ def _make_uuid(self, name, cls):
+ """Generate a unique resource identifier
+
+ The UUID consists in the type of the resource, to which is added a
+ random identifier of length UUID_LEN. Components of the UUID are
+ separated by UUID_SEP.
+ """
+ uuid = ''.join(random.choice(string.ascii_uppercase + string.digits)
+ for _ in range(UUID_LEN))
+ if name:
+ uuid = name # + UUID_SEP + uuid
+ return UUID_SEP.join([cls.__name__, uuid])
+
+ def __repr__(self):
+ return '<UUID {}>'.format(self._uuid)
+
+ def __lt__(self, other):
+ return self._uuid < other._uuid
+
+ __str__ = __repr__
+
+class PendingValue:
+ def __init__(self, value = None):
+ self.clear(value)
+
+ def clear(self, value=NEVER_SET):
+ self.value = NEVER_SET
+ self.operations = list()
+
+ def trigger(self, action, value, cur_value = None):
+
+ if self.value is NEVER_SET:
+ if cur_value is not None:
+ self.value = cur_value
+
+ if action == Operations.SET:
+ self.value = value
+ self.operations = [(Operations.SET, value)]
+ elif action == Operations.LIST_CLEAR:
+ self.value = list()
+ self.operations = [(Operations.LIST_CLEAR, None)]
+ else:
+ if action == Operations.LIST_ADD:
+ self.value.append(value)
+ elif action == Operations.LIST_REMOVE:
+ self.value.remove(value)
+ else:
+ raise RuntimeError
+ self.operations.append((action, value))
+
+class InstanceState:
+ def __init__(self, manager, instance, name = None):
+
+ # Unique identifier for the instance. This is useful for relation
+ # between resources
+ self.uuid = UUID(name, instance.__class__)
+ self.instance = instance
+
+ # Instance manager
+ self.manager = manager
+
+ # Events
+ self.events = dict()
+
+ # Stores the requested value : attribute_name -> requested operations =
+ # LIST set add remove clear
+ self.dirty = dict()
+
+
+ # Initialize resource state
+ self.lock = asyncio.Lock()
+ self.write_lock = asyncio.Lock()
+ self.state = ResourceState.UNINITIALIZED
+ self.clean = asyncio.Event()
+ self.clean.clear()
+ self.init = asyncio.Event()
+ self.init.clear()
+ self.change_event = asyncio.Event()
+ self.change_event.clear()
+ self.change_success = None
+ self.change_value = None
+ self.log = list()
+
+ self.attr_lock = dict()
+ self.attr_init = dict()
+ self.attr_clean = dict()
+ self.attr_state = dict()
+ self.attr_change_event = dict()
+ self.attr_change_success = dict()
+ self.attr_change_value= dict()
+ self.attr_log = dict()
+ # Initialize attribute state
+ for attribute in instance.iter_attributes():
+ self.attr_lock[attribute.name] = asyncio.Lock()
+ self.attr_init[attribute.name] = asyncio.Event()
+ self.attr_clean[attribute.name] = asyncio.Event()
+ self.attr_state[attribute.name] = AttributeState.UNINITIALIZED
+ self.attr_change_event[attribute.name] = asyncio.Event()
+ self.attr_change_event[attribute.name].clear()
+ self.attr_change_success[attribute.name] = None
+ self.attr_change_value[attribute.name] = None
+ self.dirty[attribute.name] = PendingValue(NEVER_SET)
+ self.attr_log[attribute.name] = list()
+
+ def set_dirty(self, attr_name):
+ self.attr_dirty.add(attr_name)
+ self.manager.set_dirty(self.uuid)
+
+ def trigger(self, attribute_name, action, value):
+ self.dirty[attribute_name].trigger(action, value)
diff --git a/vicn/core/task.py b/vicn/core/task.py
new file mode 100644
index 00000000..2e9bc275
--- /dev/null
+++ b/vicn/core/task.py
@@ -0,0 +1,352 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2017 Cisco and/or its affiliates.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import asyncio
+import concurrent.futures
+import functools
+import logging
+import shlex
+import subprocess
+
+from vicn.core.scheduling_algebra import SchedulingAlgebra
+from vicn.core.commands import ReturnValue
+from vicn.core.exception import ResourceNotFound
+from vicn.core.commands import Command, SequentialCommands
+from netmodel.util.process import execute_local
+
+log = logging.getLogger(__name__)
+
+EXECUTOR=concurrent.futures.ThreadPoolExecutor
+#EXECUTOR=concurrent.futures.ProcessPoolExecutor
+
+MAX_WORKERS=50 # None
+
+class BaseTask:
+ """Base class for all tasks
+ """
+
+ def __init__(self):
+ self._future = asyncio.Future()
+
+ def terminate(self):
+ pass
+
+ def start(self):
+ pass
+
+ def stop(self):
+ pass
+
+ def get_future(self):
+ return self._future
+
+ def add_done_callback(self, cb):
+ self._future.add_done_callback(cb)
+
+ def __repr__(self):
+ return '<BaseTask>'
+
+class ConcurrentMixin:
+ async def execute(self):
+ try:
+ for t in self._elements:
+ await t.execute()
+ rets = await asyncio.gather(*[t.get_future()
+ for t in self._elements])
+
+ # The result value is the "union" of all result values
+ # In case of tasks setting the same attributes, they are merged
+ # into a list
+
+ dic = dict()
+ for ret in rets:
+ # Ideally we should get all attribute names, and properly
+ # insert Nones. So far we assume all dicts are present and
+ # complete.
+ if not isinstance(ret, dict):
+ continue
+ for k, v in ret.items():
+ if k in dic:
+ if not isinstance(dic[k], list):
+ dic[k] = [dic[k]]
+ dic[k].append(v)
+ else:
+ dic[k] = [v]
+ self.get_future().set_result(dic)
+ except Exception as e:
+ self.get_future().set_exception(e)
+
+class SequentialMixin:
+ async def execute(self):
+ try:
+ for t in self._elements:
+ await t.execute()
+ await t.get_future()
+ self.get_future().set_result(None)
+ except Exception as e:
+ self.get_future().set_exception(e)
+
+class CompositionMixin:
+ async def execute(self):
+ try:
+ ret = None
+ for t in self._elements:
+ ret = (ret,) if ret is not None else tuple()
+ await t.execute(*ret)
+ ret = await t.get_future()
+ self.get_future().set_result(ret)
+ except Exception as e:
+ print('we need to cancel tasks not executed...')
+ self.get_future().set_exception(e)
+
+Task, EmptyTask = SchedulingAlgebra(BaseTask, ConcurrentMixin,
+ CompositionMixin, SequentialMixin)
+
+def task(fn):
+ def decorator(*args, **kwargs):
+ return PythonTask(fn, *args, **kwargs)
+ return decorator
+
+def async_task(fn, *t_args, **t_kwargs):
+ def decorator(*args, **kwargs):
+ all_args = tuple() + t_args + args
+ all_kwargs = dict()
+ all_kwargs.update(t_kwargs)
+ all_kwargs.update(kwargs)
+ return PythonAsyncTask(fn, *args, **kwargs)
+ return decorator
+
+def inline_task(fn):
+ def decorator(*args, **kwargs):
+ return PythonInlineTask(fn, *args, **kwargs)
+ return decorator
+
+async def wait_task(task):
+ return await task.get_future()
+
+async def run_task(task, manager):
+ manager.schedule(task)
+ ret = await wait_task(task)
+ return ret
+
+async def wait_concurrent_tasks(tasks):
+ await wait_task(Task.__concurrent__(*tasks))
+
+wait_task_task = async_task(wait_task)
+
+def get_attribute_task(resource, attribute_name):
+ @async_task
+ async def func():
+ return await resource.async_get(attribute_name)
+ return func()
+
+def set_attributes_task(resource, attribute_dict):
+ # The difficulty is in setting the pending value without triggering the
+ # manager, and executing the task by ourselves !
+ raise NotImplementedError
+
+def get_attributes_task(resource, attribute_names):
+ assert len(attribute_names) == 1
+ attribute_name = attribute_names[0]
+
+ @async_task
+ async def func():
+ await resource._state.manager.wait_attr_init(resource, attribute_name)
+ ret = await resource.async_get(attribute_name)
+ return {attribute_name: ret}
+ return func()
+
+class PythonTask(Task):
+ def __init__(self, func, *args, **kwargs):
+ super().__init__()
+ self._func = func
+ self._args = args
+ self._kwargs = kwargs
+
+ def _done_callback(self, fut):
+ try:
+ self._future.set_result(fut.result())
+ except Exception as e:
+ self._future.set_exception(e)
+
+ async def execute(self, *args, **kwargs):
+ all_args = self._args + args
+ all_kwargs = dict()
+ all_kwargs.update(self._kwargs)
+ all_kwargs.update(kwargs)
+
+ partial = functools.partial(self._func, *all_args, **all_kwargs)
+
+ loop = asyncio.get_event_loop()
+ fut = loop.run_in_executor(None, partial)
+ fut.add_done_callback(self._done_callback)
+
+ def __repr__(self):
+ return '<Task[py] {} / {} {}>'.format(self._func, self._args,
+ self._kwargs)
+
+class PythonAsyncTask(PythonTask):
+ async def execute(self, *args, **kwargs):
+ all_args = self._args + args
+ all_kwargs = dict()
+ all_kwargs.update(self._kwargs)
+ all_kwargs.update(kwargs)
+
+ partial = asyncio.coroutine(self._func)(*all_args, **all_kwargs)
+
+ fut = asyncio.ensure_future(partial)
+ fut.add_done_callback(self._done_callback)
+
+ def __repr__(self):
+ return '<Task[apy]>'
+
+class PythonInlineTask(PythonTask):
+ async def execute(self, *args, **kwargs):
+ all_args = self._args + args
+ all_kwargs = dict()
+ all_kwargs.update(self._kwargs)
+ all_kwargs.update(kwargs)
+
+ try:
+ ret = self._func(*all_args, **all_kwargs)
+ self._future.set_result(ret)
+ except Exception as e:
+ self._future.set_exception(e)
+ return self._future
+
+class BashTask(Task):
+ def __init__(self, node, cmd, parameters=None, parse=None, as_root=False,
+ output=False, pre=None, post=None, lock=None):
+ super().__init__()
+ self._node = node
+ self._cmd = cmd
+ self._params = parameters if parameters else dict()
+ self._parse = parse
+ self._pre = pre
+ self._post = post
+ self._lock = lock
+
+ self._output = output
+ self._as_root = as_root
+
+ def _default_parse_for_get(self, rv):
+ if not bool(rv):
+ raise ResourceNotFound
+
+ def set_default_parse_for_get(self):
+ if self._parse is None:
+ self._parse = self._default_parse_for_get
+
+ def _done_callback(self, fut):
+ """
+ Note: extends the functionality of the parent _done_callback
+ """
+ try:
+ rv = fut.result()
+ if self._parse is None and rv.return_value != 0:
+ raise Exception('Bash command failed', self.get_full_cmd(), rv)
+ if self._post:
+ self._post()
+ if self._parse:
+ rv = self._parse(rv)
+ self._future.set_result(rv)
+ except Exception as e:
+ self._future.set_exception(e)
+ if self._lock:
+ self._lock.release()
+
+ def get_full_cmd(self):
+ c = SequentialCommands()
+ desc = None
+ for line in self._cmd.splitlines():
+ line = line.strip()
+ if not line:
+ continue
+ if line.startswith('#'):
+ desc = line[1:].strip()
+ continue
+ c << Command(line, description = desc)
+ desc = None
+
+ c.parameters = self._params
+ return c.command.full_commandline
+
+ async def execute(self, *args, **kwargs):
+ """Execute the task, enforcing any eventual locking.
+
+ Returns:
+ asyncio.Future
+
+ Upon completion (eventually error), the Task's future is set.
+ """
+ if len(args) == 1:
+ dic, = args
+ if isinstance(dic, dict):
+ self._params.update(dic)
+ if self._pre:
+ self._pre()
+
+ func = self._node.execute if self._node else execute_local
+ # It is important that the command is contructed only just before it is
+ # executed, so that any object passed as parameters is deferenced right
+ # on time.
+ cmd = self.get_full_cmd()
+ partial = functools.partial(func, cmd, output = bool(self._parse))
+
+ node_str = self._node.name if self._node else '(LOCAL)'
+ cmd_str = cmd[:77] + '...' if len(cmd) > 80 else cmd
+ log.info('Execute: {} - {}'.format(node_str, cmd_str))
+
+ if self._lock:
+ # We need to do lock/unlock around the task execution
+ # Locking now will early block other tasks, but will at the same
+ # time delay them entering the executor queue; so this is
+ # equivalent
+ await self._lock.acquire()
+
+ loop = asyncio.get_event_loop()
+ fut = loop.run_in_executor(None, partial)
+ fut.add_done_callback(self._done_callback)
+
+ def execute_blocking(self):
+ rv = self._node.execute(self.get_full_cmd(), output=True)
+ if self._parse:
+ rv = self._parse(rv)
+ return rv
+
+ def __repr__(self):
+ return '<Task[bash] {} / {}>'.format(self._cmd, self._params)
+
+class TaskManager:
+ def __init__(self):
+ executor = EXECUTOR() if MAX_WORKERS is None \
+ else EXECUTOR(max_workers=MAX_WORKERS)
+ loop = asyncio.get_event_loop()
+ loop.set_default_executor(executor)
+
+ def schedule(self, task):
+ """All instances of BaseTask can be scheduled
+
+ Here we might decide to do more advanced scheduling, like merging bash
+ tasks, etc. thanks to the task algebra.
+ """
+ asyncio.ensure_future(task.execute())
+
+@task
+def ParseRegexTask(rv):
+ return [m.groupdict() for m in rx.finditer(rv.stdout)]