diff options
author | Jordan Augé <jordan.auge+fdio@email.com> | 2017-02-24 14:58:01 +0100 |
---|---|---|
committer | Jordan Augé <jordan.auge+fdio@cisco.com> | 2017-02-24 18:36:29 +0000 |
commit | 85a341d645b57b7cd88a26ed2ea0a314704240ea (patch) | |
tree | bdda2b35003aae20103a796f86daced160b8a730 /vicn/core | |
parent | 9b30fc10fb1cbebe651e5a107e8ca5b24de54675 (diff) |
Initial commit: vICN
Change-Id: I7ce66c4e84a6a1921c63442f858b49e083adc7a7
Signed-off-by: Jordan Augé <jordan.auge+fdio@cisco.com>
Diffstat (limited to 'vicn/core')
-rw-r--r-- | vicn/core/__init__.py | 0 | ||||
-rw-r--r-- | vicn/core/address_mgr.py | 181 | ||||
-rw-r--r-- | vicn/core/api.py | 118 | ||||
-rw-r--r-- | vicn/core/attribute.py | 270 | ||||
-rw-r--r-- | vicn/core/command_helpers.py | 48 | ||||
-rw-r--r-- | vicn/core/commands.py | 376 | ||||
-rw-r--r-- | vicn/core/event.py | 25 | ||||
-rw-r--r-- | vicn/core/exception.py | 39 | ||||
-rw-r--r-- | vicn/core/requirement.py | 229 | ||||
-rw-r--r-- | vicn/core/resource.py | 898 | ||||
-rw-r--r-- | vicn/core/resource_factory.py | 84 | ||||
-rw-r--r-- | vicn/core/resource_mgr.py | 1436 | ||||
-rw-r--r-- | vicn/core/sa_collections.py | 249 | ||||
-rw-r--r-- | vicn/core/sa_compat.py | 270 | ||||
-rw-r--r-- | vicn/core/scheduling_algebra.py | 97 | ||||
-rw-r--r-- | vicn/core/state.py | 177 | ||||
-rw-r--r-- | vicn/core/task.py | 352 |
17 files changed, 4849 insertions, 0 deletions
diff --git a/vicn/core/__init__.py b/vicn/core/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/vicn/core/__init__.py diff --git a/vicn/core/address_mgr.py b/vicn/core/address_mgr.py new file mode 100644 index 00000000..7df5e4ac --- /dev/null +++ b/vicn/core/address_mgr.py @@ -0,0 +1,181 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 Cisco and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import ipaddress +import logging +import random +import struct +import socket + +from netmodel.util.deprecated import deprecated +from netmodel.util.singleton import Singleton +from netmodel.util.toposort import toposort, toposort_flatten + +log = logging.getLogger(__name__) + +#------------------------------------------------------------------------------ +# SharedResource +#------------------------------------------------------------------------------ + +class SharedResource: + """ + Base class for allocating shared resource + """ + def __init__(self): + self._counter = 0 + self._values = dict() + + def __next__(self): + ret = self._counter + self._counter += 1 + return ret + + def get(self, requestor, tag = None): + if requestor not in self._values: + self._values[requestor] = dict() + if tag not in self._values[requestor]: + self._values[requestor][tag] = next(self) + return self._values[requestor][tag] + +#------------------------------------------------------------------------------ + +class Vlan(SharedResource): + """ + SharedResource: Vlan + + Manages VLAN allocation + """ + + def get(self, requestor, tag = None): + if requestor not in self._values: + self._values[requestor] = dict() + if tag not in self._values[requestor]: + self._values[requestor][tag] = (next(self)+1) + return self._values[requestor][tag] + +#------------------------------------------------------------------------------ + +MAX_DEVICE_NAME_SIZE = 15 + +class DeviceName(SharedResource): + + def get(self, *args, prefix = None, **kwargs): + count = super().get(*args, **kwargs) + device_name = '{}{}'.format(prefix, count) + if len(device_name) > MAX_DEVICE_NAME_SIZE: + overflow = len(device_name) - MAX_DEVICE_NAME_SIZE + max_prefix_len = len(prefix) - overflow + device_name = '{}{}'.format(prefix[:max_prefix_len], count) + return device_name + +#------------------------------------------------------------------------------ + +class IpAddress(SharedResource): + pass + +class Ipv4Address(IpAddress): + pass + +class Ipv6Address(IpAddress): + pass + +#------------------------------------------------------------------------------ +# AddressManager +#------------------------------------------------------------------------------ + +class AddressManager(metaclass = Singleton): + """ + The purpose of this class is to generate sequential deterministic MAC/IP + addresses in order to assign them to the node in the network. + """ + + MAP_TYPE = { + 'vlan': Vlan, + 'device_name': DeviceName, + } + + def __init__(self): + self._ips = dict() + self._macs = dict() + + self._pools = dict() + + from vicn.core.resource_mgr import ResourceManager + + network = ResourceManager().get('network') + network = ipaddress.ip_network(network, strict=False) + self._next_ip = network[1] + + mac_address_base = ResourceManager().get('mac_address_base') + self._next_mac = int(mac_address_base, 0) + 1 + + def get(self, resource_type, requestor, *args, tag=None, scope=None, + **kwargs): + """ + Params: + type : the type of shared resource to be requested + requestor: name of the resource that requests the shared resource, in + order to reattribute the same if requested multiple times. + tag: use when a single resource request multiple times the same + resource. + scope: None = global scope by default. Ensure uniqueness of resource + at global scope + """ + if not scope in self._pools: + self._pools[scope] = dict() + if not resource_type in self._pools[scope]: + self._pools[scope][resource_type] = self.MAP_TYPE[resource_type]() + return self._pools[scope][resource_type].get(requestor, tag, + *args, **kwargs) + + #-------------------------------------------------------------------------- + # Attributes + #-------------------------------------------------------------------------- + + def get_mac(self, resource_name): + """ + Generate a new mac address to assign to the containers created. + + :return: The MAC address + """ + + if resource_name in self._macs: + return self._macs[resource_name] + + mac = ':'.join(map(''.join, + zip(*[iter(hex(self._next_mac)[2:].zfill(12))]*2))) + self._next_mac += 1 + + self._macs[resource_name] = mac + return mac + + def get_ip(self, resource): + """ + Generate a new ip address to assign to the containers created. + + :return: The IP address + """ + + if resource in self._ips: + return self._ips[resource] + + ip = str(self._next_ip) + self._next_ip += 1 + + self._ips[resource] = ip + return ip diff --git a/vicn/core/api.py b/vicn/core/api.py new file mode 100644 index 00000000..761177c9 --- /dev/null +++ b/vicn/core/api.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 Cisco and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import asyncio +import json +import logging +import resource as ulimit +import sys + +from netmodel.model.query import Query +from netmodel.model.query import ACTION_SELECT, ACTION_INSERT +from netmodel.model.query import ACTION_UPDATE, ACTION_SUBSCRIBE +from netmodel.network.interface import InterfaceState +from netmodel.util.singleton import Singleton +from vicn.core.exception import NotConfigured +from vicn.core.resource_mgr import ResourceManager +from vicn.resource.node import Node + +DEFAULT_SETTINGS = { + 'network': '192.168.0.0/16', + 'mac_address_base': '0x00163e000000', + 'websocket_port': 9999 +} + +log = logging.getLogger(__name__) + +class Event_ts(asyncio.Event): + def set(self): + self._loop.call_soon_threadsafe(super().set) + +#------------------------------------------------------------------------------ + +class API(metaclass = Singleton): + + def terminate(self): + ResourceManager().terminate() + + def parse_topology_file(self, topology_fn): + log.debug("Parsing topology file %(topology_fn)s" % locals()) + try: + topology_fd = open(topology_fn, 'r') + except IOError: + self.error("Topology file '%(topology_fn)s not found" % locals()) + return None + + try: + topology = json.loads(topology_fd.read()) + + # SETTING + settings = DEFAULT_SETTINGS + settings.update(topology.get('settings')) + + # VICN process-related initializations + nofile = settings.get('ulimit-n', None) + if nofile is not None and nofile > 0: + if nofile < 1024: + log.error('Too few allowed open files for the process') + import os; os._exit(1) + + log.info('Setting open file descriptor limit to {}'.format( + nofile)) + ulimit.setrlimit( + ulimit.RLIMIT_NOFILE, + (nofile, nofile)) + + ResourceManager(base=topology_fn, settings=settings) + + # NODES + resources = topology.get('resources', list()) + for resource in resources: + try: + ResourceManager().create_from_dict(**resource) + except Exception as e: + log.warning("Could not create resource '%r': %r" % \ + (resource, e,)) + import traceback; traceback.print_exc() + continue + + except SyntaxError: + log.error("Error reading topology file '%s'" % (topology_fn,)) + sys.exit(1) + + log.debug("Done parsing topology file %(topology_fn)s" % locals()) + + def configure(self, name, setup=False): + log.info("Parsing configuration file", extra={'category': 'blue'}) + self.parse_topology_file(name) + self._configured = True + ResourceManager().setup(commit=setup) + + def setup(self): + if not self._configured: + raise NotConfigured + ResourceManager().setup() + + def teardown(self): + ResourceManager().teardown() + + def open_terminal(self, node_name): + node = ResourceManager().by_name(node_name) + assert isinstance(node, Node) + + node.open_terminal() diff --git a/vicn/core/attribute.py b/vicn/core/attribute.py new file mode 100644 index 00000000..f6ec7c70 --- /dev/null +++ b/vicn/core/attribute.py @@ -0,0 +1,270 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 Cisco and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import abc +import copy +import logging +import operator +import types + +from netmodel.model.mapper import ObjectSpecification +from netmodel.model.type import Type, Self +from netmodel.util.meta import inheritors +from netmodel.util.misc import is_iterable +from vicn.core.exception import VICNListException +from vicn.core.requirement import Requirement, RequirementList +from vicn.core.sa_collections import InstrumentedList +from vicn.core.state import UUID, NEVER_SET, Operations + +log = logging.getLogger(__name__) + +#------------------------------------------------------------------------------ +# Attribute Multiplicity +#------------------------------------------------------------------------------ + +class Multiplicity: + OneToOne = '1_1' + OneToMany = '1_N' + ManyToOne = 'N_1' + ManyToMany = 'N_N' + + + @staticmethod + def reverse(value): + reverse_map = { + Multiplicity.OneToOne: Multiplicity.OneToOne, + Multiplicity.OneToMany: Multiplicity.ManyToOne, + Multiplicity.ManyToOne: Multiplicity.OneToMany, + Multiplicity.ManyToMany: Multiplicity.ManyToMany, + } + return reverse_map[value] + + +# Default attribute properties values (default to None) +DEFAULT = { + 'multiplicity' : Multiplicity.OneToOne, + 'mandatory' : False, +} + +#------------------------------------------------------------------------------ +# Attribute +#------------------------------------------------------------------------------ + +class Attribute(abc.ABC, ObjectSpecification): + properties = [ + 'name', + 'type', + 'key', + 'description', + 'default', + 'choices', + 'mandatory', + 'multiplicity', + 'ro', + 'auto', + 'func', + 'requirements', + 'reverse_name', + 'reverse_description', + 'reverse_auto' + ] + + def __init__(self, *args, **kwargs): + for key in Attribute.properties: + value = kwargs.pop(key, NEVER_SET) + setattr(self, key, value) + + if len(args) == 1: + self.type, = args + elif len(args) == 2: + self.name, self.type = args + + # self.type is optional since the type can be inherited. Although we + # will have to verify the attribute is complete at some point + if self.type: + if isinstance(self.type, str): + self.type = Type.from_string(self.type) + assert self.type is Self or Type.exists(self.type) + + # Post processing attribute properties + if self.requirements is not NEVER_SET: + self.requirements = RequirementList(self.requirements) + + self.is_aggregate = False + + self._reverse_attributes = list() + + #-------------------------------------------------------------------------- + # Display + #-------------------------------------------------------------------------- + + def __repr__(self): + return '<Attribute {}>'.format(self.name) + + __str__ = __repr__ + + # The following functions are required to allow comparing attributes, and + # using them as dict keys + + def __eq__(self, other): + return self.name == other.name + + def __hash__(self): + return hash(self.name) + + #-------------------------------------------------------------------------- + # Descriptor protocol + # + # see. https://docs.python.org/3/howto/descriptor.html + #-------------------------------------------------------------------------- + + def __get__(self, instance, owner=None): + if not instance: + return self + + return instance.get(self.name, blocking=False) + + def __set__(self, instance, value): + if not instance: + raise NotImplementedError('Setting default value not implemented') + + instance.set(self.name, value, blocking=False) + + def __delete__(self, instance): + raise NotImplementedError + + #-------------------------------------------------------------------------- + + def do_list_add(self, instance, value): + if instance.is_local_attribute(self.name): + from vicn.core.resource import Resource + if isinstance(value, Resource): + value = value.get_uuid() + return value + else: + try: + cur_value = vars(instance)[self.name] + if self.is_collection: + # copy the list + cur_value = list(cur_value) + except KeyError as e: + cur_value = None + if self.is_collection: + cur_value = list() + + instance._state.dirty[self.name].trigger(Operations.LIST_ADD, + value, cur_value) + + # prevent instrumented list to perform operation + raise VICNListException + + def do_list_remove(self, instance, value): + if instance.is_local_attribute(self.name): + from vicn.core.resource import Resource + if isinstance(value, Resource): + value = value.get_uuid() + return value + else: + cur_value = vars(instance)[self.name] + if self.is_collection: + # copy the list + cur_value = list(cur_value) + instance._state.dirty[self.name].trigger(Operations.LIST_REMOVE, + value, cur_value) + + # prevent instrumented list to perform operation + raise VICNListException + + def do_list_clear(self, instance): + if instance.is_local_attribute(self.name): + return + else: + cur_value = vars(instance)[self.name] + if self.is_collection: + # copy the list + cur_value = list(cur_value) + instance._state.dirty[self.name].trigger(Operations.LIST_CLEAR, + value, cur_value) + + # prevent instrumented list to perform operation + raise VICNListException + + def handle_getitem(self, instance, item): + if isinstance(item, UUID): + from vicn.core.resource_mgr import ResourceManager + return ResourceManager().by_uuid(item) + return item + + #-------------------------------------------------------------------------- + # Accessors + #-------------------------------------------------------------------------- + + def __getattribute__(self, name): + value = super().__getattribute__(name) + if value is NEVER_SET: + if name == 'default': + return list() if self.is_collection else None + return DEFAULT.get(name, None) + return value + + def has_reverse_attribute(self): + return self.reverse_name and self.multiplicity + + @property + def is_collection(self): + return self.multiplicity in (Multiplicity.OneToMany, + Multiplicity.ManyToMany) + + def is_set(self, instance): + return instance.is_set(self.name) + + #-------------------------------------------------------------------------- + # Operations + #-------------------------------------------------------------------------- + + def merge(self, parent): + for prop in Attribute.properties: + # NOTE: we cannot use getattr otherwise we get the default value, + # and we never override + value = vars(self).get(prop, NEVER_SET) + if value is not NEVER_SET and not is_iterable(value): + continue + + parent_value = vars(parent).get(prop, NEVER_SET) + if parent_value is NEVER_SET: + continue + + if parent_value: + if is_iterable(value): + value.extend(parent_value) + else: + setattr(self, prop, parent_value) + +#------------------------------------------------------------------------------ + +class Reference: + """ + Value reference. + + Attribute value refers to attribute value on a different resource. + Use resource = Self to point to another attribute of the same resource. + """ + + def __init__(self, resource, attribute=None): + self._resource = resource + self._attribute = attribute diff --git a/vicn/core/command_helpers.py b/vicn/core/command_helpers.py new file mode 100644 index 00000000..4732e7b5 --- /dev/null +++ b/vicn/core/command_helpers.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 Cisco and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from vicn.core.commands import Command, Commands + +CMD_PRINT_TO_FILE = 'echo -n "{content}" > {filename}' + +class CommandHelper: + + @staticmethod + def if_cmd(cmd_condition, if_true = None, if_false = None): + cmd = cmd_condition + if if_true: + cmd = cmd & if_true + if if_false: + cmd = cmd | if_false + return cmd + + @staticmethod + def file_exists(filename, if_true = None, if_false = None): + cmd = Command('test -f {}'.format(filename)) + return CommandHelper.if_cmd(cmd, if_true, if_false) + + @staticmethod + def print_to_file(node, filename, content): + escaped_content = content.replace('{', '{{').replace('}', '}}') + return BashTask(self.node, CMD_PRINT_TO_FILE, + {'content': escaped_content, 'filename': filename}) + + @staticmethod + def print_to_file_no_escape(filename, content): + return BashTask(self.node, CMD_PRINT_TO_FILE, + {'content': content, 'filename': filename}) diff --git a/vicn/core/commands.py b/vicn/core/commands.py new file mode 100644 index 00000000..41c06bf5 --- /dev/null +++ b/vicn/core/commands.py @@ -0,0 +1,376 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 Cisco and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import inspect +import logging +import shlex + +from vicn.core.exception import CommandException + +log = logging.getLogger(__name__) + +#------------------------------------------------------------------------------ +# Helper functions +#------------------------------------------------------------------------------ + +def bashize(command): + ret = "bash -c " + shlex.quote(command) + return ret + +def parenthesize(command): + return '({})'.format(command) + +def do_parenthesize(command): + if '&&' in command or '||' in command: + if command[0] == '(': + return command + else: + return parenthesize(command) + else: + return command + +#------------------------------------------------------------------------------ + +class ReturnValue: + def __init__(self, return_value = None, stdout = None, stderr = None): + self._return_value = return_value + + # We use accessors since it seems impossible to trigger properties from + # __init__ + self._set_stdout(stdout) + self._set_stderr(stderr) + + def __repr__(self): + return '<ReturnValue ({}) - OUT [{}] - ERR [{}]>'.format( + self._return_value, self._stdout, self._stderr) + + def __str__(self): + return self.__repr__() + + def _clean(self, value): + if value is None or isinstance(value, str): + return value + return value.decode('utf-8') + + def _set_stdout(self, value): + self._stdout = self._clean(value) + + def _set_stderr(self, value): + self._stderr = self._clean(value) + + @property + def stdout(self): + return self._stdout + + @stdout.setter + def stdout(self, value): + return self._set_stdout(value) + + @property + def stderr(self): + return self._stderr + + @stderr.setter + def stderr(self, value): + return self._set_stderr(value) + + @property + def return_value(self): + return self._return_value + + @return_value.setter + def return_value(self, value): + self._return_value = value + + def __bool__(self): + return self._return_value == 0 + +#------------------------------------------------------------------------------ + +class Command: + """ + Bash command + + Todo: + - Commands with ; should be "bashized" + """ + def __init__(self, commandline, node=None, parameters = None, + description = None, callback = None, blocking=True, lock=None): + self._commandline = commandline + self._node = node + self._parameters = parameters if parameters else dict() + self._description = description + self._callback = callback + self._blocking = blocking + self._lock = lock + + def __str__(self): + try: + return self.full_commandline + except: + return self.commandline + ' -- ' + str(self.parameters) + + def __repr__(self): + return '<Command {} -- {}'.format(self.commandline, self.parameters) + + @property + def commandline(self): + return self._commandline + + @property + def full_commandline(self): + cmd = self._commandline.format(**self._parameters) + if ('||' in cmd or '&&' in cmd or '|' in cmd or '<' in cmd or + '>' in cmd): + return bashize(cmd) + return cmd + + @property + def full_commandline_nobashize(self): + """ + TMP to fix issue with bashize heuristic above... + """ + cmd = self._commandline.format(**self.parameters) + return cmd + + @property + def command(self): + return self + + @property + def node(self): + return self._node + + @node.setter + def node(self, node): + self._node = node + + @property + def parameters(self): + return self._parameters + + @parameters.setter + def parameters(self, parameters): + self._parameters = parameters + + @property + def description(self): + if not self._description: + return self._description + return self._description.format(**self.parameters) + + @property + def success_callback(self): + return self._on_success + + @property + def failure_callback(self): + return self._on_failure + + @property + def blocking(self): + return self._blocking + + @property + def lock(self): + return self._lock + + def apply(self, params): + self._parameters.update(params) + return self + + def __and__(self, other): + commandline = self.commandline + ' && ' + other.commandline + all_params = dict(i for c in (self, other) + for i in c.parameters.items()) + return Command(commandline, parameters = all_params) + + def __or__(self, other): + commandline = self.commandline + ' || ' + other.commandline + all_params = dict(i for c in (self, other) + for i in c.parameters.items()) + return Command(commandline, parameters = all_params) + + def __bool__(self): + return bool(self._commandline) + + def submit(self): + CMD_MGR.execute([self]) + + def execute(self): + cmd = self.full_commandline + cmd_str = cmd[:77]+'...' if len(cmd) > 80 else cmd + log.debug('Node {}: {} ({})'.format(self.node.name, cmd_str, + self._description)) + + rv = self.node.execute(cmd) + + if not rv: + raise CommandException + if self._callback: + if self._lock: + self._lock.acquire() + self._callback(rv) + if self._lock: + self._lock.release() + return rv + +#------------------------------------------------------------------------------ + +class BackgroundCommand(Command): + pass + +#------------------------------------------------------------------------------ + +class Commands(Command): + + def __init__(self): + self._commands = list() + self._node = None + + def __repr__(self): + return '<Commands {}>'.format(str(self)) + + def __str__(self): + return self.commandline + + def add(self, command): + if not command: + return + if not isinstance(command, Command): + command = Command(command) + self._commands.append(command) + + def _do_command(self, sep): + if len(self.commands) == 1: + full_cmd = sep.join(c.commandline for c in self.commands) + else: + full_cmd = sep.join(do_parenthesize(c.commandline) + for c in self.commands) + all_params = dict(i for c in self.commands + for i in c.parameters.items()) + + return Command(full_cmd, parameters = all_params) + + @property + def command(self): + raise NotImplementedError('Not implemented') + + @property + def commandline(self): + return self.command.commandline + + @property + def parameters(self): + parameters = dict() + for command in self.commands: + parameters.update(command.parameters) + return parameters + + @parameters.setter + def parameters(self, parameters): + for command in self.commands: + command.parameters = parameters + + @property + def commands(self): + return self._commands + + def apply(self, params): + self._commands = [c.apply(params) for c in self._commands] + return self._commands + + __lshift__ = add + + def __bool__(self): + return any(bool(c) for c in self._commands) + +#------------------------------------------------------------------------------ + +class ParallelCommands(Commands): + @property + def command(self): + log.warning('Commands executed sequentially') + return self._do_command(';') + +#------------------------------------------------------------------------------ + +class SequentialCommands(Commands): + @property + def command(self, fatal = True): + SEP = ' && ' if fatal else '; ' + return self._do_command(SEP) + +#------------------------------------------------------------------------------ + +def sequential_bash_from_docstring(fn): + def decorator(*args, **kwargs): + c = SequentialCommands() + + desc = None + for line in fn.__doc__.splitlines(): + line = line.strip() + if not line: + continue + if line.startswith('#'): + desc = line[1:].strip() + continue + c << Command(line, description = desc) + desc = None + + # XXX we don't support keyword args + arg_info = inspect.getargspec(fn) + assert not arg_info.varargs + c.parameters = dict(zip(arg_info.args, args)) + log.debug('sequential_bash_from_docstring: {}'.format(c)) + + # Execute the code in the function + fn(*args, **kwargs) + + return c + + return decorator + +bash_from_docstring = sequential_bash_from_docstring + +def execute_on_node(fn): + """ + Decorator: execute the command returned by the function on the node found + in attributes. Note that such an attribute should be available. + This assumes the function returns a command + + We need output in case apply_rx is used. This should be made an option + """ + + def wrapper(self, *args, **kwargs): + return self.node.execute(fn(self, *args, **kwargs), output = True) + return wrapper + +def apply_rx(rx): + """ + Apply a compiled regular expression to the result of the decorated function. + Returns a dict (whose keys should be attributes of the resource that need + to be updated). + """ + + def decorator(fn): + def wrapper(*args, **kwargs): + ret = fn(*args, **kwargs) + return [m.groupdict() for m in rx.finditer(ret.stdout)] + return wrapper + return decorator diff --git a/vicn/core/event.py b/vicn/core/event.py new file mode 100644 index 00000000..ee418257 --- /dev/null +++ b/vicn/core/event.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 Cisco and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class Event: + pass + +class AttributeChangedEvent(Event): + def __init__(self, condition=None): + self._condition = condition + diff --git a/vicn/core/exception.py b/vicn/core/exception.py new file mode 100644 index 00000000..d7422723 --- /dev/null +++ b/vicn/core/exception.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 Cisco and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class VICNException(Exception): pass + +class CommandException(VICNException): pass + +class ProcessException(VICNException): pass +class NotConfigured(ProcessException): pass + +class ParameterException(VICNException): pass +class InvalidResource(ParameterException): pass + +class ABCException(VICNException): pass +class NotImplemented(VICNException): pass + +class DependencyException(VICNException): pass +class InitializeException(VICNException): pass +class CheckException(VICNException): pass +class SetupException(VICNException): pass + +class VICNListException(VICNException): pass + +class ResourceNotFound(VICNException): pass diff --git a/vicn/core/requirement.py b/vicn/core/requirement.py new file mode 100644 index 00000000..c42cecd9 --- /dev/null +++ b/vicn/core/requirement.py @@ -0,0 +1,229 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 Cisco and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import enum + +from netmodel.model.mapper import ObjectSpecification +from vicn.core.exception import VICNException + +#------------------------------------------------------------------------------ +# Enums +#------------------------------------------------------------------------------ + +class RequirementScope(enum.Enum): + INSTANCE = 'Instance' + CLASS = 'Class' + +#------------------------------------------------------------------------------ +# Exceptions +#------------------------------------------------------------------------------ + +class RequirementError(VICNException): + + def __init__(self, instance, attr): + super().__init__() + self._instance = instance + self._attr = attr + + def __str__(self): + return "Requirement on {}.{} could not be satisfied:".format( + self._instance, self._attr) + +class RequiredAttributeError(RequirementError): + + def __str__(self): + return super().__str__() + "could not find attribute {}".format( + self._attr) + +class RequiredPropertyError(RequirementError): + def __init__(self, instance, attr, prop): + super().__init__(instance, attr) + self._prop = prop + + def __str__(self): + return super().__str__()+ "property {} is not verified".format( + self._prop) + +#------------------------------------------------------------------------------ +# Class: Property +#------------------------------------------------------------------------------ + +class Property: + + TYPE_ANY_OF = 0 + #XXX cant think of a good use case for that + #TYPE_ALL_OF = 1 + + def __init__(self, value, property_type=TYPE_ANY_OF): + self._type = property_type + try: + self._value = set(value) + except TypeError: #value is not iterable, it is a single value + self._value = set() + self._value.add(value) + + + @property + def property_type(self): + return self._type + + @property + def value(self): + return self._value + + def check(self, value): + return value in self._value + + def __str__(self): + return str(self._value) + + def merge(self, other): + assert self._type is other.property_type, \ + "Properties must be of same type to be merged" + + #if self._type is TYPE_ANY_OF: + self._value.intersection_update(other.value) + #elif self._type is TYPE_ALL_OF: + # self._value.union_update(other.value) + +#------------------------------------------------------------------------------ +# Class: Requirement +#------------------------------------------------------------------------------ + +class Requirement(ObjectSpecification): + """Resource requirement + + This class allows to specify a requirement on a given resource, or on a + class of resources (all instances of a given class). + """ + + #-------------------------------------------------------------------------- + # Constructor + #-------------------------------------------------------------------------- + + def __init__(self, requirement_type, properties = None, + capabilities = None, scope = RequirementScope.INSTANCE, + fatal = True, must_be_setup = False): + """ + Args: + requirement_type (): the attribute on which the requirement is made + properties (Optional[XXX]): XXX (defaults to None) + scope (Optional[enum RequirementScope]): Is the requirement dealing + with an instance, or a class (all instance of the class) + (defaults to RequirementScope.INSTANCE) + fatal (Optional[bool]): is the failure of the requirement fatal + (raises an error), or not (raises a warning) (defaults to True) + must_be_setup (Optional[bool]): defaults to False + """ + self._type = requirement_type + self._properties = {} + if properties: + for prop in properties: + self._properties[prop] = Property(properties[prop]) + self._capabilities = capabilities if capabilities else set() + self._scope = scope + self._fatal = fatal + self._must_be_up = must_be_setup + + #-------------------------------------------------------------------------- + # Accessors and properties + #-------------------------------------------------------------------------- + + @property + def properties(self): + return self._properties + + @property + def requirement_type(self): + return self._type + + @property + def must_be_up(self): + return self._must_be_up + + #-------------------------------------------------------------------------- + # Display + #-------------------------------------------------------------------------- + + def __str__(self): + prop_str = "{" + ",".join(map(lambda x: "'{}': {}".format(x, + self._properties[x]), self._properties.keys())) +"}" + return "<type={}, properties={}, must_be_up={}>".format(self._type, + prop_str, self._must_be_up) + + #-------------------------------------------------------------------------- + # Requirement operators + #-------------------------------------------------------------------------- + + def check(self, instance): + if not hasattr(instance, self._type): + raise RequiredAttributeError(instance, self._type) + + instance_attr = getattr(instance, self._type) + if not instance_attr: + raise TypeError("instance_attr is none") + + for prop in self.properties: + if not hasattr(instance_attr, prop): + raise RequiredAttributeError(instance, self._type) + if not self._properties[prop].check(getattr(instance_attr, prop)): + raise RequiredPropertyError(instance, self._type, prop) + + return True + + #-------------------------------------------------------------------------- + # Requirement logic + #-------------------------------------------------------------------------- + + def merge(self, other): + assert other.requirement_type == self._type, \ + "Cannot merge Requirements with different types" + + for prop in other.properties: + if prop in self._properties: + self._properties[prop].merge(other.properties[prop]) + else: + self._properties[prop] = other.properties[prop] + + if other._capabilities: + self._capabilities |= other._capabilities + +#------------------------------------------------------------------------------ +# Class: Requirement list +#------------------------------------------------------------------------------ + +class RequirementList(list): + + def __init__(self,x=None): + super().__init__() + if x: + self.extend(x) + + def append(self,x): + assert isinstance(x,Requirement) + # XXX O(n) right now, might be able to do better + for req in self: + if req.requirement_type == x.requirement_type: + req.merge(x) + return + + super().append(x) + + def extend(self, x): + for y in x: + self.append(y) diff --git a/vicn/core/resource.py b/vicn/core/resource.py new file mode 100644 index 00000000..53ad2181 --- /dev/null +++ b/vicn/core/resource.py @@ -0,0 +1,898 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 Cisco and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import asyncio +import copy +import logging +import operator +import random +import string +import sys +import traceback +import types + +from abc import ABC, ABCMeta + +from netmodel.model.mapper import ObjectSpecification +from netmodel.model.type import String, Bool, Integer, Dict +from netmodel.model.type import BaseType, Self +from netmodel.util.deprecated import deprecated +from netmodel.util.singleton import Singleton +from vicn.core.attribute import Attribute, Multiplicity, Reference +from vicn.core.attribute import NEVER_SET +from vicn.core.commands import ReturnValue +from vicn.core.event import Event, AttributeChangedEvent +from vicn.core.exception import VICNException, ResourceNotFound +from vicn.core.resource_factory import ResourceFactory +from vicn.core.requirement import Requirement, Property +from vicn.core.sa_collections import InstrumentedList, _list_decorators +from vicn.core.scheduling_algebra import SchedulingAlgebra +from vicn.core.state import ResourceState, UUID +from vicn.core.state import Operations, InstanceState +from vicn.core.task import run_task, BashTask + +log = logging.getLogger(__name__) + +NAME_SEP = '-' + +# Warning and error messages + +W_UNK_ATTR = 'Ignored unknown attribute {} for resource {}' +E_UNK_RES_NAME = 'Unknown resource name for attribute {} in {} ({}) : {}' +E_GET_NON_LOCAL = 'Cannot get non-local attribute {} for resource {}' +E_AUTO_UNM = 'Trying to auto-instanciate attribute {} on unmanaged resource {}' + +#------------------------------------------------------------------------------ +# Resource category +#------------------------------------------------------------------------------ + +# A base resource is not instanciated itself but uses delegates. Which one to +# use is resolved during initialization +class TopLevelResource: pass + +class FactoryResource(TopLevelResource): pass +class CategoryResource(TopLevelResource): pass + +#------------------------------------------------------------------------------ + +class ResourceMetaclass(ABCMeta): + def __init__(cls, class_name, parents, attrs): + """ + Args: + cls: The class type we're registering. + class_name: A String containing the class_name. + parents: The parent class types of 'cls'. + attrs: The attribute (members) of 'cls'. + """ + super().__init__(class_name, parents, attrs) + + # We use the metaclass to create attributes for instance, even before + # the Resource Factory is called. They are needed both for initializing + # attributes and reverse attributes, in whatever order. Only class + # creation allow us to clear _attributes, otherwise, we will just add + # those from the parent, siblings, etc... + cls._sanitize() + +#------------------------------------------------------------------------------ + +class BaseResource(BaseType, ABC, metaclass=ResourceMetaclass): + """Base Resource class + + The base Resource class implements all the logic related to resource + instances. + + See also : + * ResourceManager : logic related to class instanciation + * Resource metaclass : logic related to class construction + * ResourceFactory : logic related to available classes and mapping from + name to type + """ + + __type__ = TopLevelResource + + name = Attribute(String, description = 'Alias name for the resource') + managed = Attribute(Bool, description = 'Flag: resource is managed', + default = True) + owner = Attribute(Self, description = 'Owning resource', default = None) + data = Attribute(Dict, description = 'User data') + + #--------------------------------------------------------------------------- + # Constructor + #--------------------------------------------------------------------------- + + def __new__(cls, *args, **kwargs): + """ + We implement a "factory method" design pattern in the constructor... + """ + # Ensure the resource factory exists and has been initialized, and thus + # that Resource objects are fully created + from vicn.core.resource_mgr import ResourceManager + + ResourceFactory() + + delegate = ResourceManager().get_resource_with_capabilities(cls, set()) + if not delegate: + log.error('No delegate for abstract resource : %s', cls.__name__) + raise VICNException + + instance = super().__new__(delegate) + + return instance + + def __init__(self, *args, **kwargs): + from vicn.core.resource_mgr import ResourceManager + + # Cache dependencies + self._deps = None + + # Internal data tag for resources + self._internal_data = dict() + + mandatory = { a.name for a in self.iter_attributes() if a.mandatory } + + for key, value in kwargs.items(): + attribute = self.get_attribute(key) + if attribute is None: + log.warning(W_UNK_ATTR.format(key, self.get_type())) + continue + + if isinstance(value, Reference): + if value._resource is Self: + value = getattr(self, value._attribute) + else: + value = getattr(value._resource, value._attribute) + + if value and issubclass(attribute.type, Resource): + if attribute.is_collection: + new_value = list() + for x in value: + if isinstance(x, str): + resource = ResourceManager().by_name(x) + elif isinstance(x, UUID): + resource = ResourceManager().by_uuid(x) + else: + resource = x + if not resource: + raise VICNException(E_UNK_RES_NAME.format(key, + self.name, self.__class__.__name__, x)) + element = resource if isinstance(resource, Reference) \ + else resource._state.uuid + new_value.append(element) + value = new_value + else: + if isinstance(value, str): + resource = ResourceManager().by_name(value) + elif isinstance(value, UUID): + resource = ResourceManager().by_uuid(value) + else: + resource = value + if not resource: + raise VICNException(E_UNK_RES_NAME.format(key, + self.name, self.__class__.__name__, value)) + value = value if isinstance(resource, Reference) \ + else resource._state.uuid + self.set(key, value, blocking=False) + mandatory -= { key } + + # Check that all mandatory atttributes have been set + # Mandatory resource attributes will be marked as pending since they + # might be discovered + # Eventually, their absence will be discovered at runtime + if mandatory: + raise VICNException('Mandatory attributes not set: %r' % (mandatory,)) + + # Check requirements + for attr in self.iter_attributes(): + if issubclass(attr.type, Resource) and attr.requirements: + for req in attr.requirements: + instance = self.get(attr.name) + if instance is None: + continue + ResourceManager().add_instance_requirement(instance, req) + + self._subresources = None + + def __after__(self): + return tuple() + + def __after_init__(self): + return tuple() + + def __subresources__(self): + return None + + def set_subresources(self, subresources): + if not subresources: + return + + # Add state to operators + for sr in subresources: + if not hasattr(sr, '_state'): + sr._state = InstanceState(self._state.manager, sr) + + self._subresources = subresources + + def get_uuid(self): + return self._state.uuid + + def from_uuid(self, uuid): + return self._state.manager.by_uuid(uuid) + + #-------------------------------------------------------------------------- + # Object model + #-------------------------------------------------------------------------- + + def get(self, attribute_name, default=NEVER_SET, unref=True, resolve=True, + allow_never_set=True, blocking=True): + + attribute = self.get_attribute(attribute_name) + + # Handling Lambda attributes + if hasattr(attribute, 'func') and attribute.func: + value = attribute.func(self) + else: + if self.is_local_attribute(attribute.name): + value = vars(self).get(attribute.name, NEVER_SET) + else: + # A pending value has priority + value = self._state.dirty.get(attribute.name, NEVER_SET) + if value.value is not NEVER_SET: + value = value.value + else: + # otherwise, let's use a previously fetched value if it + # exists + value = vars(self).get(attribute.name, NEVER_SET) + + if value is NEVER_SET: + if not allow_never_set: + log.error(E_GET_NON_LOCAL.format(attribute_name, + self._state.uuid)) + raise NotImplementedError + + if attribute.is_collection: + value = self.get_default_collection(attribute) + else: + if attribute.auto: + # Automatic instanciation + if attribute.requirements: + log.warning('Ignored requirements {}'.format( + attribute.requirements)) + value = self.auto_instanciate(attribute) + + if value is NEVER_SET: + value = self.get_default(attribute) + + if self.is_local_attribute(attribute.name): + self.set(attribute.name, value) + + if unref and isinstance(value, UUID): + value = self.from_uuid(value) + + if resolve and isinstance(value, Reference): + if value._resource is Self: + value = getattr(self, value._attribute) + else: + value = getattr(value._resource, value._attribute) + + return value + + async def async_get(self, attribute_name, default=NEVER_SET, unref=True, + resolve=True, allow_never_set=False, blocking=True): + attribute = self.get_attribute(attribute_name) + + # Handling Lambda attributes + if hasattr(attribute, 'func') and attribute.func: + value = self.func(self) + else: + if self.is_local_attribute(attribute.name): + value = vars(self).get(attribute.name, NEVER_SET) + else: + + # A pending value has priority + value = self._state.dirty.get(attribute.name, NEVER_SET) + if value.value is not NEVER_SET: + value = value.value + else: + # otherwise, let's use a previously fetched value if it + # exists + value = vars(self).get(attribute.name, NEVER_SET) + if value is NEVER_SET: + await self._state.manager.attribute_get(self, + attribute_name, value) + value = vars(self).get(attribute.name, NEVER_SET) + + # Handling NEVER_SET + if value is NEVER_SET: + if not allow_never_set: + log.error(E_GET_NON_LOCAL.format(attribute_name, + self._state.uuid)) + raise NotImplementedError + + if attribute.is_collection: + value = self.get_default_collection(attribute) + else: + if attribute.auto: + # Automatic instanciation + if attribute.requirements: + log.warning('Ignored requirements {}'.format( + attribute.requirements)) + value = self.auto_instanciate(attribute) + + if value is NEVER_SET: + value = self.get_default(attribute) + + if value is self.is_local_attribute(attribute.name): + self.set(attribute.name, value) + + if unref and isinstance(value, UUID): + value = self.from_uuid(value) + + if resolve and isinstance(value, Reference): + if value._resource is Self: + value = getattr(self, value._attribute) + else: + value = getattr(value._resource, value._attribute) + + return value + + def _set(self, attribute_name, value, current=False, set_reverse=True): + """ + Note that set does not automatically mark a resource dirty. + We might need a flag to avoid dirty by default, which will be useful + when a resource is modified by another resource: eg x.up, or + x.ip_address = y, ... + Returns : task that can be monitored (note that it is not scheduled) + """ + attribute = self.get_attribute(attribute_name) + + if set_reverse and attribute.reverse_name: + for base in self.__class__.mro(): + if not hasattr(base, '_reverse_attributes'): + continue + + for ra in base._reverse_attributes.get(attribute, list()): + # Value information : we need resources, not uuids + if attribute.is_collection: + lst = list() + if value: + for x in value: + if isinstance(x, UUID): + x = self.from_uuid(x) + lst.append(x) + value = InstrumentedList(lst) + value._attribute = attribute + value._instance = self + else: + if isinstance(value, UUID): + value = self.from_uuid(value) + + if ra.multiplicity == Multiplicity.OneToOne: + if value is not None: + value.set(ra.name, self, set_reverse = False) + elif ra.multiplicity == Multiplicity.ManyToOne: + for element in value: + value.set(ra.name, self, set_reverse = False) + elif ra.multiplicity == Multiplicity.OneToMany: + if value is not None: + collection = value.get(ra.name) + collection.append(self) + else: + value is None + elif ra.multiplicity == Multiplicity.ManyToMany: + collection = value.get(ra.name) + value.extend(self) + + # Handling value : we need uuids, not resources + if attribute.is_collection: + if not isinstance(value, InstrumentedList): + lst = list() + if value: + for x in value: + if isinstance(x, Resource): + x = x.get_uuid() + lst.append(x) + + value = InstrumentedList(lst) + else: + value = InstrumentedList([]) + value._attribute = attribute + value._instance = self + else: + if isinstance(value, Resource): + value = value.get_uuid() + return value + + def set(self, attribute_name, value, current=False, set_reverse=True, + blocking = True): + value = self._set(attribute_name, value, current=current, + set_reverse=set_reverse) + if self.is_local_attribute(attribute_name) or current: + # super() + if value is None: + attribute = self.get_attribute(attribute_name) + vars(self)[attribute_name] = value + + else: + fut = self._state.manager.attribute_set(self, attribute_name, value) + asyncio.ensure_future(fut) + + async def async_set(self, attribute_name, value, current=False, + set_reverse=True, blocking=True): + """ + Example: + - setting the ip address on a node's interface + + We need to communicate our intention to the resource manager, which will + process our request in a centralized fashion, and do the necessary + steps for us to set the value properly. + """ + value = self._set(attribute_name, value, current=current, + set_reverse=set_reverse) + await self._state.manager.attribute_set(self, attribute_name, value, + blocking=blocking) + + def set_many(self, attribute_dict, current=False): + if not attribute_dict: + return + for k, v in attribute_dict.items(): + self.set(k, v, current=current) + + def is_set(self, attribute_name): + return attribute_name in vars(self) + +# def clean(self, attribute_name): +# return self._state.manager.attribute_clean(self, attribute_name) + + def is_local_attribute(self, attribute_name): + ACTIONS = ['get', 'set', 'add', 'remove'] + for action in ACTIONS: + method = '_{}_{}'.format(action, attribute_name) + if hasattr(self, method) and getattr(self, method) is not None: + return False + return True + + def get_default_collection(self, attribute): + if isinstance(attribute.default, types.FunctionType): + default = attribute.default(self) + elif isinstance(attribute.default, Reference): + if attribute.default._resource is Self: + default = getattr(self, attribute.default._attribute) + else: + default = getattr(attribute.default._resource, + attribute.default._attribute) + else: + default = attribute.default + value = InstrumentedList(default) + value._attribute = attribute + value._instance = self + return value + + def get_default(self, attribute): + if isinstance(attribute.default, types.FunctionType): + value = attribute.default(self) + elif isinstance(attribute.default, Reference): + if attribute.default._resource is Self: + value = getattr(self, attribute.default._attribute) + else: + value = getattr(attribute.default._resource, + attribute.default._attribute) + else: + value = copy.deepcopy(attribute.default) + return value + + def async_get_task(self, attribute_name): + task = getattr(self, '_get_{}'.format(attribute_name))() + assert not isinstance(task, tuple) + return task + + + def async_set_task(self, attribute_name, value): + raise NotImplementedError + return async_task(async_set_task, attribute_name, value) + + @classmethod + def get_attribute(cls, key): + # Searchs if it is a recursive attribute + try: + pos = key.find('.') + if pos >= 0: + attr, subattr = key[0:pos], key[pos+1: len(key)] + return getattr(cls,attr).type.get_attribute(subattr) + return getattr(cls, key) + except AttributeError: + return None + + @classmethod + def _sanitize(cls): + """Sanitize the object model to accomodate for multiple declaration + styles + + In particular, this method: + - set names to all attributes + """ + cls._reverse_attributes = dict() + cur_reverse_attributes = dict() + for name, obj in vars(cls).items(): + if not isinstance(obj, ObjectSpecification): + continue + if isinstance(obj, Attribute): + obj.name = name + + # Remember whether a reverse_name is defined before loading + # inherited properties from parent + has_reverse = bool(obj.reverse_name) + + # Handle overloaded attributes + # By recursion, it is sufficient to look into the parent + for base in cls.__bases__: + if hasattr(base, name): + parent_attribute = getattr(base, name) + obj.merge(parent_attribute) + assert obj.type + + # Handle reverse attribute + # + # NOTE: we need to do this after merging to be sure we get all + # properties inherited from parent (eg. multiplicity) + if has_reverse: + a = { + 'name' : obj.reverse_name, + 'description' : obj.reverse_description, + 'multiplicity' : Multiplicity.reverse(obj.multiplicity), + 'auto' : obj.reverse_auto, + } + reverse_attribute = Attribute(cls, **a) + reverse_attribute.is_aggregate = True + + cur_reverse_attributes[obj.type] = reverse_attribute + + #print('*** class backref ***', cls, obj, reverse_attribute) + if not obj in cls._reverse_attributes: + cls._reverse_attributes[obj] = list() + cls._reverse_attributes[obj].append(reverse_attribute) + + for kls, a in cur_reverse_attributes.items(): + setattr(kls, a.name, a) + + @classmethod + def iter_attributes(cls, aggregates = False): + for name in dir(cls): + attribute = getattr(cls, name) + if not isinstance(attribute, Attribute): + continue + if attribute.is_aggregate and not aggregates: + continue + + yield attribute + + def iter_keys(self): + for attribute in self.iter_attributes(): + if attribute.key == True: + yield attribute + + def get_keys(self): + return list(self.iter_keys()) + + def auto_instanciate(self, attribute): + if self.managed is False: + raise ResourceNotFound(E_AUTO_UNM.format(attribute, self)) + cstr_attributes = dict() + + for a in attribute.type.iter_attributes(): + if not a.mandatory: + continue + + # Let's find attributes in the remote class that are of my + # class, and let's setup them to me + if issubclass(a.type, Resource) and isinstance(self, a.type): + cstr_attributes[a.name] = self + continue + + if hasattr(self, a.name): + cstr_attributes[a.name] = getattr(self, a.name) + + capabilities = set() + reqs = self._state.manager.get_instance_requirements(self) + for req in reqs: + if req._type != attribute.name: + continue + + for attr_name, prop in req.properties.items(): + value = next(iter(prop.value)) + capabilities |= req._capabilities + + # We need to find a subclass of self._resource with proper capabilities + cls = self._state.manager.get_resource_with_capabilities( + attribute.type, capabilities) + + # Before creating a new instance of a class, let's check + resource = cls(**cstr_attributes) + + self._state.manager.commit_resource(resource) + return resource + + def get_attributes(self, aggregates = False): + return list(self.iter_attributes(aggregates = aggregates)) + + def get_attribute_names(self, aggregates = False): + return set(a.name + for a in self.iter_attributes(aggregates = aggregates)) + + def get_attribute_dict(self, field_names = None, aggregates = False, + uuid = True): + assert not field_names or field_names.is_star() + attributes = self.get_attributes(aggregates = aggregates) + + ret = dict() + for a in attributes: + if not a.is_set(self): + continue + value = getattr(self, a.name) + if a.is_collection: + ret[a.name] = list() + for x in value: + if uuid and isinstance(x, Resource): + x = x._state.uuid._uuid + ret[a.name].append(x) + else: + if uuid and isinstance(value, Resource): + value = value._state.uuid._uuid + ret[a.name] = value + return ret + + def get_tuple(self): + return (self.__class__, self._get_attribute_dict()) + + @property + def state(self): + return self._state.state + + @state.setter + def state(self, state): + self._state.state = state + + def get_types(self): + return [cls.__name__.lower() for cls in self.__class__.mro() + if cls.__name__ not in ('ABC', 'BaseType', 'object')] + + def get_type(self): + return self.__class__.__name__.lower() + + def has_type(self, typ): + return typ in self.get_types() + + def __repr__(self): + # Showing aggregate attributes can cause infinite loops + name = self._state.uuid if self.name in (None, NEVER_SET) else self.name + return '<{}: {} {}>'.format(self.__class__.__name__, name, + ', '.join('{}={}'.format(k,v) + for k, v in self.get_attribute_dict().items())) + + def __str__(self): + return self.__repr__() + + #--------------------------------------------------------------------------- + # Resource helpers + #--------------------------------------------------------------------------- + + def get_dependencies(self, allow_unresolved = False): + if not self._deps: + deps = set() + for a in self.iter_attributes(): + if not issubclass(a.type, Resource): + continue + if a.is_aggregate: + continue + + value = getattr(self, a.name) + if not value: + continue + + if a.multiplicity in (Multiplicity.OneToOne, + Multiplicity.ManyToOne): + resource = value + if not resource: + log.warning('Null resource') + continue + if not resource.managed: + continue + uuid = resource._state.uuid + # Avoid considering oneself as a dependency due to + # ResourceAttribute(Self) + if uuid != self._state.uuid: + deps.add(uuid) + else: + resources = value + for cpt, resource in enumerate(resources): + if not resource: + log.warning('Null resource in collection') + continue + if not resource.managed: + continue + uuid = resource._state.uuid + deps.add(uuid) + self._deps = deps + return self._deps + + def make_name(self, *args, type=True, id=True): + l = list() + if type: + l.append(self.__class__.__name__) + l.extend(list(args)) + if id: + N = 3 + uuid = ''.join(random.choice(string.ascii_uppercase + + string.digits) for _ in range(N)) + l.append(uuid) + name = NAME_SEP.join(str(x) for x in l) + return name + + + def check_requirements(self): + for attr in self.iter_attributes(): + if issubclass(attr.type, Resource) and attr.requirements: + for req in attr.requirements: + instance = getattr(self, attr.name) + req.check(instance) + + #-------------------------------------------------------------------------- + # Triggers + #-------------------------------------------------------------------------- + + @deprecated + def trigger(self, action, attribute_name, *args, **kwargs): + self._state.manager.trigger(self, action, attribute_name, + *args, **kwargs) + + #-------------------------------------------------------------------------- + # Object model + # + # Only assignment is implemented here, other operators are overloaded in + # the Attribute class (core.attribute.Attribute) + #-------------------------------------------------------------------------- + + def format(self, fmt): + return fmt.format(**self.get_attribute_dict(uuid = False)) + + def get_tag(self, tag_name, default = NEVER_SET): + """ + A tag corresponds to a propery that is required by a class in all of + its inheritors. For instance, a service requires than a subclass + informs about the 'service_name' tag, which is a class member named + according to the following convention : __service_name__. + """ + tag = '__{}__'.format(tag_name) + if not tag in vars(self.__class__): + if default is NEVER_SET: + return default + raise NotImplementedError('Missing tag {} in class {}'.format(tag, + self.__class__.__name__)) + return getattr(self.__class__, tag) + + def iter_backrefs(self): + for base in self.__class__.mro(): + if not hasattr(base, '_reverse_attributes'): + continue + for attr, rattrs in base._reverse_attributes.items(): + instances = self.get(attr.name, allow_never_set = True) + if instances in (None, NEVER_SET): + continue + if not attr.is_collection: + instances = [instances] + for instance in instances: + for rattr in rattrs: + yield instance, rattr + + #--------------------------------------------------------------------------- + # Accessors + #--------------------------------------------------------------------------- + + @classmethod + def has_attribute(cls, name): + return name in [a.name for a in cls.attributes()] + + def has_callback(self, action, attribute): + return hasattr(self, '_{}_{}'.format(action, attribute.name)) + + def is_setup(self): + return self.state in (ResourceState.SETUP_PENDING, + ResourceState.SETUP, ResourceState.DIRTY) + + __get__ = None + __create__ = None + __delete__ = None + +#------------------------------------------------------------------------------- +# Helper functions +#------------------------------------------------------------------------------- + +# The following Mixin are useful to convert an expresson of subresources into +# an expression of tasks. + +class ConcurrentMixin: + async def async_commit_to_manager(self, manager): + await asyncio.gather(*[element.async_commit_to_manager(manager) + for element in self._elements]) + await asyncio.gather(*[e._state.clean.wait() for e in self._elements]) + self._state.clean.set() + +class SequentialMixin: + async def async_commit_to_manager(self, manager): + for element in self._elements: + await element.async_commit_to_manager(manager) + await element._state.clean.wait() + self._state.clean.set() + +class CompositionMixin: + async def async_commit_to_manager(self, manager): + for element in self._elements: + await element.async_commit_to_manager(manager) + await element._state.clean.wait() + self._state.clean.set() + +_Resource, EmptyResource = SchedulingAlgebra(BaseResource, ConcurrentMixin, + CompositionMixin, SequentialMixin) + +class ManagedResource(_Resource): + def __init__(self, *args, **kwargs): + from vicn.core.resource_mgr import ResourceManager + owner = kwargs.get('owner', None) + name = kwargs.get('name', None) + + manager = ResourceManager() + self.register_to_manager(manager, name=name) + + # Manager is needed for reference and reverse attributes + super().__init__(*args, **kwargs) + + async def async_commit_to_manager(self, manager): + if not self.managed: + return + self._state.manager.commit_resource(self) + + def register_to_manager(self, manager, name = None): + if not self.managed: + return + manager.add_resource(self, name = name) + +Resource = ManagedResource + +class BashResource(Resource): + """ + __get__ : use return code of the bash command + + Intermediate values and attributes: should be dict-like + Actually, we should collect attributes: dict update/remove, map/reduce + """ + __node__ = None + __cmd_get__ = None + __cmd_create__ = None + __cmd_delete__ = None + + def __get__(self): + assert self.__cmd_get__ + return BashTask(self.node, self.__cmd_get__, {'self': self}) + + def __create__(self): + assert self.__cmd_create__ + return BashTask(self.node, self.__cmd_create__, {'self': self}) + + def __delete__(self): + assert self.__cmd_delete__ + return BashTask(self.node, self.__cmd_delete__, {'self': self}) + diff --git a/vicn/core/resource_factory.py b/vicn/core/resource_factory.py new file mode 100644 index 00000000..15504729 --- /dev/null +++ b/vicn/core/resource_factory.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 Cisco and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import copy +import inspect +import logging +import pkgutil +import traceback + +from netmodel.model.type import Type +from netmodel.util.singleton import Singleton + +log = logging.getLogger(__name__) + +# Blacklist : resource that are temporarily disabled loaded +RESOURCE_BLACKLIST = ['LinuxBridge'] + +class ResourceFactory(metaclass=Singleton): + """ + This manages classes, not instances + """ + + def __init__(self): + self._registry = dict() + self._register_all() + + def _register_all(self): + log.info("Registering resources") + from vicn.core.resource import Resource + + from vicn import resource as package + prefix = package.__name__ + "." + + # Because aggregates might not be instanciated in order, we accumulate + # them and register them at the end + delayed_aggregates = dict() + + # Explored modules are automatically imported by walk_modules + it + # allows to explore recursively resources/ + # http://docs.python.org/2/library/pkgutil.html + for importer, modname, ispkg in pkgutil.walk_packages(package.__path__, + prefix, onerror = None): + try: + module = __import__(modname, fromlist = "dummy") + + classes = [m[1] for m in inspect.getmembers(module, + inspect.isclass) if m[1].__module__ == modname] + for cls in classes: + if not issubclass(cls, Resource): + continue + + if cls.__name__ in RESOURCE_BLACKLIST: + print('Skipped blacklisted resource ' + cls.__name__) + continue + + # Register module to resource factory + self._registry[cls.__qualname__] = cls + Type._registry[cls.__qualname__.lower()] = cls + + except Exception as e: + log.warning("Cannot load %s : %s: %s" % (modname, e, + traceback.format_exc())) + + log.info("Registered resources are: {%s}" % ", ".join(sorted( + self._registry.keys()))) + + def get_available_resources(self): + return self._registry + diff --git a/vicn/core/resource_mgr.py b/vicn/core/resource_mgr.py new file mode 100644 index 00000000..f6082488 --- /dev/null +++ b/vicn/core/resource_mgr.py @@ -0,0 +1,1436 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 Cisco and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys, logging, asyncio, socket, functools +import time + +# LXD workaround +from pylxd.exceptions import NotFound as LxdNotFound, LXDAPIException + +from netmodel.model.filter import Filter +from netmodel.model.query import Query, ACTION_SELECT, ACTION_INSERT +from netmodel.model.query import ACTION_UPDATE, ACTION_SUBSCRIBE +from netmodel.model.sql_parser import SQLParser +from netmodel.network.packet import Packet +from netmodel.network.router import Router +from netmodel.util.toposort import toposort, toposort_flatten +from netmodel.util.meta import inheritors +from netmodel.util.singleton import Singleton +from netmodel.util.misc import is_iterable +from vicn.core.attribute import NEVER_SET +from vicn.core.exception import VICNException, ResourceNotFound +from vicn.core.resource_factory import ResourceFactory +from vicn.core.resource import Resource, FactoryResource, EmptyResource +from vicn.core.sa_collections import InstrumentedList +from vicn.core.state import InstanceState, ResourceState +from vicn.core.state import AttributeState, Operations, PendingValue +from vicn.core.task import TaskManager, wait_task, task, async_task +from vicn.core.task import EmptyTask, BashTask + +log = logging.getLogger(__name__) + +ENABLE_LXD_WORKAROUND = True + +# Monitoring queries + +Q_SUB_VPP = 'SUBSCRIBE SUM(*) FROM interface WHERE device_name INCLUDED [{}]' +Q_SUB_IF = 'SUBSCRIBE * FROM interface WHERE device_name == "{}"' +Q_SUB_STATS = 'SUBSCRIBE * FROM stats' +Q_SUB_EMULATOR_IF = 'SUBSCRIBE * FROM interface WHERE id == "{}"' +Q_SUB_EMULATOR = 'SUBSCRIBE * FROM interface WHERE device_name == "{}"' + +# Log messages + +S_WAIT_DEP = ' .. Waiting for dependency {}' +S_WAIT_DEP_OK = ' .. Done waiting for dependency {}' +S_WAIT = ' .. Waiting for {}' +S_WAIT_OK = ' .. Done waiting for {}' +S_WAIT_PRED = ' - Waiting for initialization of predecessors...' +S_WAIT_SRS = ' - Waiting for subresources...' +S_REG_SR = ' . Registering subresource to manager {}...' +S_WAIT_SR = ' . Waiting for subresource: {}' +S_WAIT_SR_OK = ' . Subresource is ready: {}' +S_AFTER = ' . AFTER TYPE={}' + +S_INIT_DONE = 'INIT done. Resource exists. Process attribute dict {}' +S_GET_DONE = 'GET done. Resource does not exist (exception was: {})' +S_KEYS_OK = 'Keys initialized, resource can now be created.' +S_CREATE_OK = 'CREATE success. Process attribute dict {}' + +#------------------------------------------------------------------------------ +# Helpers +#------------------------------------------------------------------------------ + +async def wait_resource(resource): + await resource._state.clean.wait() + +async def wait_resource_init(resource): + await resource._state.init.wait() + +async def wait_resources(resources): + await asyncio.gather(*[wait_resource(r) for r in resources]) + +wait_resource_task = async_task(wait_resource) +wait_resources_task = async_task(wait_resources) + +#------------------------------------------------------------------------------ + +class ResourceManager(metaclass=Singleton): + """ + A ResourceManager is in charge of managing resources, their lifecycle, and + interfaces to them. + """ + + def __init__(self, base, settings): + + # Base directory for scenario + self._base = base + + # Resources sorted via dependency (instances) + self._resources = dict() + + self._deps = None + + # Store resource requirements used for automatic instanciation + # instance -> attribute -> requirements + self._instance_requirements = dict() + + self._dirty = set() + self._auto_commit = False + + # class -> Requirements + self._class_requirements = dict() + + self._map_uuid_name = dict() + self._map_name_uuid = dict() + self._map_str_uuid = dict() + + # The task manager is used to schedule tasks used for resource + # synchronization + self._task_mgr = TaskManager() + + # Store experiment settings + self._settings = settings + + # Cache available resource types + _available = ResourceFactory().get_available_resources() + self._available = { k.lower(): v for k, v in _available.items() } + + # API / user interface + self._router = Router(vicn_callback = self._on_vicn_command) + self._router.add_interface('unixserver') + self._router.add_interface('local', router = self._router) + self._router.add_interface('vicn', manager = self) + + ws_port = self.get('websocket_port') + self._ws = self._router.add_interface('websocketserver', + port = ws_port) + + # Monitoring + self._monitored = set() + self._pending_monitoring = set() + + # For debug + self._committed = set() + + def terminate(self): + self._router.terminate() + + #-------------------------------------------------------------------------- + # Settings + #-------------------------------------------------------------------------- + + def set_settings(self, settings): + if settings is None: + return + self._settings.update(settings) + + def get(self, setting): + return self._settings[setting] + + def set(self, setting, value): + self._settings[setting] = value + + #-------------------------------------------------------------------------- + # Monitoring + # + # XXX This code should be deprecated / moved into a separate module. + # Planned for a future release. + #-------------------------------------------------------------------------- + + def _on_vicn_command(self, command): + if command == 'setup': + self.setup() + elif command == 'teardown': + self.teardown() + elif command == 'monitor': + self.monitor() + elif command == 'terminate': + loop = asyncio.get_event_loop() + loop.stop() + else: + # open_terminal, ... + raise NotImplementedError + + def _broadcast(self, query): + if not self._ws: + return + self._ws.execute(query) + + def _broadcast_packet(self, packet): + self._broadcast(packet.to_query()) + + def _on_ns_record(self, packet): + query = packet.to_query() + + if not query.object_name == 'interface': + return + q = Query(ACTION_UPDATE, 'channel', filter = query.filter, + params = query.params) + q.reply = True + + self._ws.execute(q) + return None + + def _on_netmon_record(self, packet): + query = packet.to_query() + + # Find channel related to query + # NOTE: we update the channel twice, once for each interface... + if query.object_name == 'interface': + device_names = [value for key, op, value in query.filter.to_list() + if key == 'device_name'] + if not device_names: + log.error('No device name in packet=', packet) + return + device_name = device_names[0] + node_name = query.params['node'] + node = ResourceManager().by_name(node_name) + if node is None: + return None + for interface in node.interfaces: + if interface.device_name == device_name: + if interface.channel: + f = Filter.from_list([['id', '==', + interface.channel._state.uuid._uuid]]) + q = Query(ACTION_UPDATE, 'channel', filter = f, + params = query.params) + q.reply = True + self._ws.execute(q) + return None + return None + return None + return None + + def _on_netmon_channel_record(self, packet): + query = packet.to_query() + if query.object_name == 'interface': + device_names = [value for key, op, value in query.filter.to_list() + if key == 'device_name'] + if not device_names: + log.error('No device name in packet=', packet) + return + + device_name = device_names[0] + + f = Filter.from_list([['id', '==', device_name]]) + q = Query(ACTION_UPDATE, 'channel', filter = f, + params = query.params) + q.reply = True + self._ws.execute(q) + return None + + return None + + def _on_vpp_record(self, packet, pylink_id): + query = packet.to_query() + if query.object_name == 'interface': + device_names = [value for key, op, value in query.filter.to_list() + if key == 'device_name'] + if not device_names: + log.error('No device name in packet=', packet) + return + + # We might want to check if the query has SUM(*) + f = Filter.from_list([['id', '==', pylink_id]]) + q = Query(ACTION_UPDATE, 'channel', filter = f, + params = query.params) + q.reply = True + self._ws.execute(q) + return None + + print('discard packet in on_netmon_channel_record', query) + return None + + #-------------------------------------------------------------------------- + # Resource management + #-------------------------------------------------------------------------- + + def create_from_dict(self, **resource): + resource_type = resource.pop('type', None) + assert resource_type + + return self.create(resource_type.lower(), **resource) + + def create(self, resource_type, **attributes): + cls = self._available.get(resource_type) + if not cls: + raise Exception("Ignored resource with unknown type %s: %r" % + (resource_type, attributes)) + + resource = cls(**attributes) + + name = attributes.get('name', None) + if name: + self._map_uuid_name[resource._state.uuid] = name + self._map_name_uuid[name] = resource._state.uuid + + return resource + + def get_resource_type_names(self): + return ResourceFactory().get_available_resources().keys() + + def add_resource(self, instance, name = None): + instance._state = InstanceState(self, instance, name = name) + + self._resources[instance._state.uuid] = instance + self._map_str_uuid[instance._state.uuid._uuid] = instance._state.uuid + self._deps = None + + def commit_resource(self, resource): + """ + Committing a resource creates an asyncio function implementing a state + management automaton. + """ + asyncio.ensure_future(self._process_resource(resource)) + + def commit(self): + """ + Commit all resource whose owner is not set, and mark unmanaged + resources as clean. + + This function is used at initialization. + """ + + # Start FSM for all managed resources + for resource in self.get_resources(): + if resource.owner is not None: + continue + if resource.managed == False: + asyncio.ensure_future(self._set_resource_state(resource, + ResourceState.CLEAN)) + continue + + self.commit_resource(resource) + + def setup(self, commit=False): + """ + This function is in charge of setting up all resources needed by the + experiment. Since it might be a long process, it should be asynchronous + at some point. So far, we let resources take care of this by themselves. + """ + self._auto_commit = commit + if commit: + self.commit() + + def get_resource_with_capabilities(self, cls, capabilities): + if '__type__' in cls.__dict__ and cls.__type__ == FactoryResource: + candidates = inheritors(cls) + if not candidates: + log.error('Abstract resource with no candidates: %s', + cls.__name__) + return None + + for delegate in candidates: + if capabilities and (not '__capabilities__' in vars(delegate) + or not capabilities.issubset(delegate.__capabilities__)): + continue + log.info("Abstract resource %s, delegated %s among %r" % \ + (cls.__name__, delegate.__name__, candidates)) + return delegate + return None + else: + if capabilities and (not '__capabilities__' in vars(delegate) or + not capabilities.issubset(delegate.__capabilities__)): + log.error('Capabilities conflict for resource : %s', + cls.__name__) + raise VICNException + return cls + + def find(self, resource_tuple): + cls, attr_dict = resource_tuple + for instance in self.by_type(cls): + cur_attr_dict = instance._get_attribute_dict() + common_keys = [k for k in cur_attr_dict.keys() + if k in attr_dict.keys()] + if all(attr_dict[k] == cur_attr_dict[k] for k in common_keys): + return instance + return None + + def __iter__(self): + for resource in self._resources.values(): + yield resource + + def resources(self): + return list(self.__iter__()) + + def _sort_resources(self): + deps = {} + for instance in self.resources(): + deps[instance._state.uuid] = \ + instance.get_dependencies(allow_unresolved = True) + + self._deps = toposort_flatten(deps) + + def _sorted_resources(self): + """ + Iterates on resources based on their dependencies + """ + if not self._deps: + self._sort_resources() + for dep in self._deps: + try: + yield self._resources[dep] + except KeyError: + log.error('Dependency not found : {}'.format(dep)) + raise InvalidResource + + def sorted_resources(self): + return list(self._sorted_resources()) + + #-------------------------------------------------------------------------- + # Queries + #-------------------------------------------------------------------------- + + def by_uuid(self, uuid): + return self._resources.get(uuid) + + def by_uuid_str(self, uuid_str): + uuid = self._map_str_uuid.get(uuid_str) + return self._resources.get(uuid) + + def by_name(self, name): + uuid = self._map_name_uuid.get(name) + return self.by_uuid(uuid) + + def get_resources(self): + return self._resources.values() + + def get_aggregates(self, resource_name, resource_cls): + """ + Get aggregated object. + """ + if not resource_name in self._aggregates: + return None + all_aggregates = self._aggregates[resource_name] + + if not resource_cls in all_aggregates: + return None + aggregates = all_aggregates[resource_cls] + + assert all(isinstance(x, resource_cls) for x in aggregates) + return aggregates + + def get_aggregate(self, resource_name, resource_cls): + aggregates = self.get_aggregates(resource_name, resource_cls) + if not aggregates: + return None + assert len(aggregates) == 1 + return next(aggregates) + + def by_type(self, type): + return [r for r in self if isinstance(r, type)] + + def by_type_str(self, typestr): + cls = self._available.get(typestr.lower()) + if not cls: + return list() + return self.by_type(cls) + + #-------------------------------------------------------------------------- + # Requirements + #-------------------------------------------------------------------------- + + def add_instance_requirement(self, instance, requirement): + uuid = instance._state.uuid + if not uuid in self._instance_requirements: + self._instance_requirements[uuid] = list() + self._instance_requirements[uuid].append(requirement) + + def get_instance_requirements(self, instance): + uuid = instance._state.uuid + return self._instance_requirements.get(uuid, dict()) + + def add_class_requirement(self, cls, requirement): + if not cls in self._class_requirements: + self._class_requirements[cls] = list() + self._class_requirements[cls].append(requirement) + + #-------------------------------------------------------------------------- + # Events + #-------------------------------------------------------------------------- + + def on(self, resource_name, event, action): + resource = self._resources.get(resource_name, None) + if not resource: + return + + resource.on(event, action) + + #-------------------------------------------------------------------------- + # Task management + #-------------------------------------------------------------------------- + + def schedule(self, task): + if task is None or isinstance(task, EmptyTask): + return + self._task_mgr.schedule(task) + + #-------------------------------------------------------------------------- + # Asynchronous resource API + # + # The manager is the only one to submit tasks to the scheduler since it can + # store and share the results, manage concurrent access, etc. + # As many functions are not thread safe, we make sure that they are all + # executed in the manager's thread (=main thread). + #-------------------------------------------------------------------------- + + async def resource_exits(self, resource): + await self._resource_get() + await self.wait_resource_exists(resource) + return resource._state.exists + + async def wait_attr_init(self, resource, attribute_name): + await resource._state.attr_init[attribute_name].wait() + + async def wait_attr_clean(self, resource, attribute_name): + await resource._state.attr_clean[attribute_name].wait() + + async def attribute_get(self, resource, attribute, value): + await self.wait_attr_init(resource, attribute) + return resource.get(attribute) + + async def attribute_set(self, resource, attribute_name, value, + blocking=True): + with await resource._state.write_lock: + # Add the current operation to the pending list + # NOTE: collections are unordered and can be updated concurrently + #self._attribute_set_pending_value(resource, attribute_name) + resource._state.dirty[attribute_name].trigger(Operations.SET, + value) + + attr_state = resource._state.attr_state[attribute_name] + if attr_state == AttributeState.CLEAN: + resource._state.attr_state[attribute_name] = \ + AttributeState.DIRTY + elif attr_state in [ + # Nothing to do since we know the attribute value will be + # processed later. + # If the attribute was not processed by default, we would have + # to change the state of the attribute so that it gets + # processed. + AttributeState.UNINITIALIZED, + AttributeState.INITIALIZED, + AttributeState.PENDING_INIT, + AttributeState.DIRTY]: + pass + else: + # We cannot have the lock for instance if the attribute is + # being updated. + raise RuntimeError + + resource_state = resource._state.state + if resource_state == ResourceState.CLEAN: + resource._state.state = ResourceState.DIRTY + resource._state.change_event.set() + elif resource_state in [ + ResourceState.UNINITIALIZED, + ResourceState.INITIALIZED, + ResourceState.PENDING_KEYS, + ResourceState.KEYS_OK, + ResourceState.PENDING_DEPS, + ResourceState.DEPS_OK, + ResourceState.PENDING_CREATE, + ResourceState.CREATED, + ResourceState.DIRTY, + ]: + pass # Nothing to do, the attribute will get processed + else: + # ResourceState.PENDING_UPDATE + # other + raise RuntimeError("Resource cannot be in state".format( + resource_state)) + + if blocking: + await self.wait_attr_clean(resource, attribute_name) + + #--------------------------------------------------------------------------- + # Resource dependency management + #--------------------------------------------------------------------------- + + async def _resource_wait_attributes(self, resource): + """Check dependencies and requirements + + Inspect all attributes for referenced resources, and their eventual + requirements. + """ + self.log(resource, ' - Waiting for attribute dependencies...') + for attr in resource.iter_attributes(): + if issubclass(attr.type, Resource): + deps = resource.get(attr.name) + if deps is None: + # Not really a dependency, we expect mandatory to prevent + # us to continue if we should not + continue + + if not attr.is_collection: + deps = [deps] + + for dep in deps: + # XXX This could be done in parallel + if not dep.managed: + continue + dep_pfx = '{}:{}'.format(dep.get_type(), dep.get_uuid()) + self.log(resource, S_WAIT_DEP. format(dep_pfx)) + await wait_resource(dep) + self.log(resource, S_WAIT_DEP_OK. format(dep_pfx)) + + if not attr.requirements: + continue + + for req in attr.requirements: + dep_attr_name = req.requirement_type + dep_attr = dep.get_attribute(dep_attr_name) + assert issubclass(dep_attr.type, Resource) + dep_attr_value = dep.get(dep_attr_name) + + if not dep_attr_value: + dep_attr_value = dep.auto_instanciate(dep_attr) + setattr(dep, dep_attr_name, dep_attr_value) + + dep_attr_value_pfx = '{}:{}'.format( + dep_attr_value.get_type(), + dep_attr_value.get_uuid()) + self.log(resource, + S_WAIT_DEP.format(dep_attr_value_pfx)) + await wait_resource(dep_attr_value) + self.log(resource, + S_WAIT_DEP_OK .format(dep_attr_value_pfx)) + + async def _resource_wait_predecessors(self, resource): + after = resource.__after__() + if after: + self.log(resource, ' - Waiting for predecessors...') + for resource_type in after: + self.log(resource, ' . AFTER TYPE={}'.format(resource_type)) + befores = resource._state.manager.by_type_str(resource_type) + for before in befores: + if not before.managed: + continue + before_pfx = '{}:{}'.format(before.get_type(), + before.get_uuid()) + self.log(resource, S_WAIT.format(before_pfx)) + await wait_resource(before) + self.log(resource, S_WAIT_OK.format(before_pfx)) + + after_init = resource.__after_init__() + if after_init: + self.log(resource, S_WAIT_PRED) + for resource_type in after_init: + self.log(resource, S_AFTER.format(resource_type)) + befores = resource._state.manager.by_type_str(resource_type) + for before in befores: + if not before.managed: + continue + before_pfx = '{}:{}'.format(before.get_type(), + before.get_uuid()) + self.log(resource, S_WAIT.format(before_pfx)) + await wait_resource_init(before) + self.log(resource, S_WAIT_OK.format(before_pfx)) + + async def _resource_wait_subresources(self, resource): + self.log(resource, S_WAIT_SRS) + + # We should accumulate subresources through the hierarchy + sr = EmptyResource() + for base in reversed(resource.__class__.mro()): + if '__subresources__' not in vars(base): + continue + sr = sr > base.__subresources__(resource) + + if sr is not None and not isinstance(sr, EmptyResource): + resource.set_subresources(sr) + pfx_sr = '{}:{}'.format(sr.get_type(), sr.get_uuid()) + self.log(resource, S_REG_SR .format(pfx_sr)) + await sr.async_commit_to_manager(self) + self.log(resource, S_WAIT_SR.format(pfx_sr)) + await wait_resource(sr) + self.log(resource, S_WAIT_SR_OK.format(pfx_sr)) + + async def _resource_wait_dependencies(self, resource): + self.log(resource, 'Waiting for dependencies...') + await self._resource_wait_attributes(resource) + await self._resource_wait_predecessors(resource) + await self._resource_wait_subresources(resource) + + def _task_resource_action(self, resource, action): + """Perform action: __get__, __create__, __delete__ on the full class + hierarchy. + """ + task = EmptyTask() + for base in reversed(resource.__class__.mro()): + # To avoid adding several times the same task + if action not in vars(base): + continue + func = getattr(base, action, None) + if func is None: + continue + t = func(resource) + + task = task > t + + return task + + #-------------------------------------------------------------------------- + # Resource model + #-------------------------------------------------------------------------- + + def _task_attribute_op(self, resource, attribute, op): + return getattr(resource, '_{}_{}'.format(op, attribute.name))() + + #-------------------------------------------------------------------------- + # Attribute FSM + #-------------------------------------------------------------------------- + + def _attribute_is_dirty(self, resource, attribute): + """ + Precondition: + Attribute has been retrieved + """ + pending_value = resource._state.dirty[attribute.name] + return pending_value.value != NEVER_SET + + async def __set_attribute_state(self, resource, attribute_name, state): + """Sets the resource state (no-lock version) + + It is important to centralize state change since some states are + associated with Events(). + """ + resource._state.attr_state[attribute_name] = state + if state in [ + AttributeState.INITIALIZED, + AttributeState.CLEAN, + AttributeState.DIRTY + ]: + resource._state.attr_init[attribute_name].set() + else: + raise RuntimeError("Inconsistent resource state {}".format(state)) + + if state in [AttributeState.CLEAN]: + resource._state.attr_clean[attribute_name].set() + elif state in [ + AttributeState.INITIALIZED, + AttributeState.DIRTY + ]: + resource._state.attr_clean[attribute_name].clear() + else: + raise RuntimeError + + async def _set_attribute_state(self, resource, attribute_name, state): + """Sets the attribute state (lock version) + """ + with await resource._state.attr_lock[attribute_name]: + await self.__set_attribute_state(resource, attribute_name, state) + + def _trigger_attr_state_change(self, resource, attribute, fut): + try: + ret = fut.result() + resource._state.attr_change_success[attribute.name] = True + resource._state.attr_change_value[attribute.name] = ret + except Exception as e: + resource._state.attr_change_success[attribute.name] = False + resource._state.attr_change_value[attribute.name] = e + resource._state.attr_change_event[attribute.name].set() + + async def attribute_process(self, resource, attribute): + """ + Temporary FSM executing in parallel for attribute management. Those FSM + are under the responsability of the main resource FSM. + + Precondition: + Attribute state is initialized + """ + self.attr_log(resource, attribute, + 'Starting attribute FSM for {}'.format(attribute.name)) + + new_state = None + while new_state != AttributeState.CLEAN: + #with await resource._state.attr_lock[attribute.name]: + state = resource._state.attr_state[attribute.name] + self.attr_log(resource, attribute, + 'Current state is {}'.format(state)) + + if resource._state.attr_change_success == False: + log.error('Attribute error') + e = resource._state.attr_change_value[attribute.name] + import traceback; traceback.print_tb(e.__traceback__) + raise NotImplementedError + + # Signal update errors to the parent resource + resource._state.attr_change_event[attribute.name].set() + + elif state == AttributeState.UNINITIALIZED: + pending_state = AttributeState.PENDING_INIT + elif state in AttributeState.INITIALIZED: + pending_state = AttributeState.PENDING_UPDATE + elif state == AttributeState.DIRTY: + pending_state = AttributeState.PENDING_UPDATE + elif state in [ + AttributeState.PENDING_INIT, + AttributeState.PENDING_UPDATE + ]: + # Nothing to do + pending_state = None + elif state == AttributeState.CLEAN: + return + else: + raise RuntimeError + + if pending_state is None: + self.attr_log(resource, attribute, + 'Nothing to do. Waiting for event...') + await resource._state.attr_change_event[attribute.name].wait() + resource._state.attr_change_event[attribute.name].clear() + self.attr_log(resource, attribute, 'Wake up from event') + continue + + if pending_state == AttributeState.PENDING_INIT: + task = self._task_attribute_op(resource, attribute, 'get') + elif pending_state == AttributeState.PENDING_UPDATE: + pending_value = resource._state.dirty[attribute.name] + + if pending_value.value == NEVER_SET: + assert len(pending_value.operations) == 0 + task = EmptyTask() + else: + try: + task = self._task_attribute_op(resource, attribute, + Operations.SET) + except Exception as e: + log.warning('No attribute setter attribute {}'.format( + attribute)) + task = EmptyTask() + else: + raise RuntimeError + + if task is not None and not isinstance(task, EmptyTask): + state_change = functools.partial( \ + self._trigger_attr_state_change, resource, attribute) + task.add_done_callback(state_change) + self.attr_log(resource, attribute, + 'Trigger {} -> {}. Waiting task completion'.format( + state, pending_state)) + self.schedule(task) + + await resource._state.attr_change_event[attribute.name].wait() + resource._state.attr_change_event[attribute.name].clear() + self.attr_log(resource, attribute, + 'Completed {} -> {}. Success = {}'.format( + state, pending_state, + resource._state.attr_change_success[attribute.name])) + else: + # If this value is not reset, attributes get updated many times + resource._state.attr_change_value[attribute.name] = NEVER_SET + + if pending_state == AttributeState.PENDING_INIT: + if resource._state.attr_change_success[attribute.name] == True: + attrs = resource._state.attr_change_value[attribute.name] + self.attr_log(resource, attribute, + 'INIT success. Value = {}'.format(attrs)) + found = self._process_attr_dict(resource, attribute, attrs) + if not found: + log.error('Attribute missing return attrs: {}'.format( + attrs)) + found = self._process_attr_dict(resource, attribute, + attrs) + new_state = AttributeState.INITIALIZED + else: + attrs = resource._state.attr_change_value[attribute.name] + self.attr_log(resource, attribute, + 'INIT gave no value. Value = {}'.format(attrs)) + new_state = AttributeState.INITIALIZED + + elif pending_state == AttributeState.PENDING_UPDATE: + if resource._state.attr_change_success[attribute.name] == True: + attrs = resource._state.attr_change_value[attribute.name] + self.attr_log(resource, attribute, + 'UPDATE success. Value = {}. Attribute is CLEAN'.format(attrs)) + if attrs != NEVER_SET: + # None could be interpreted as the return value. Also, + # we need not to overwrite the value from get + self._process_attr_dict(resource, attribute, attrs) + + # We might do this for all returned attributes + cur_value = vars(resource)[attribute.name] + if attribute.is_collection: + tmp = InstrumentedList(pending_value.value) + tmp._attribute = cur_value._attribute + tmp._instance = cur_value._instance + else: + tmp = pending_value.value + vars(resource)[attribute.name] = tmp + pending_value.clear() + + new_state = AttributeState.CLEAN + else: + log.error('Attribute error') + e = resource._state.attr_change_value[attribute.name] + import traceback; traceback.print_tb(e.__traceback__) + raise NotImplementedError + + else: + raise RuntimeError + + # Setting attribute state + await self._set_attribute_state(resource, attribute.name, + new_state) + + #-------------------------------------------------------------------------- + # Resource FSM + #-------------------------------------------------------------------------- + + def parse_query(self, line): + dic = SQLParser().parse(line) + if not dic: + raise RuntimeError("Can't parse input command: %s" % command) + + return Query.from_dict(dic) + + def _monitor_netmon(self, resource): + ip = resource.node.host_interface.ip_address + if not ip: + log.error('IP of monitored Node is None') + import os; os._exit(1) + + ws = self._router.add_interface('websocketclient', address=ip, + hook=self._on_netmon_record) + + node = resource.node + for interface in node.interfaces: + if not interface.monitored: + continue + + if interface.get_type() == 'dpdkdevice' and hasattr(node,'vpp'): + + # Check if vICN has already subscribed for one interface in + # the channel + if hasattr(interface.channel,'already_subscribed'): + continue + + channel_id = interface.channel._state.uuid._uuid + + update_vpp = functools.partial(self._on_vpp_record, + pylink_id = channel_id) + ws_vpp = self._router.add_interface('websocketclient', + address=ip, hook=update_vpp) + + aggregate_interfaces = list() + for _interface in node.interfaces: + if not _interface.get_type() == 'dpdkdevice' and \ + _interface.monitored: + aggregate_interfaces.append('"' + + _interface.device_name + '"') + + q_str = Q_SUB_VPP.format(','.join(aggregate_interfaces)) + q = self.parse_query(q_str) + packet = Packet.from_query(q) + self._router._flow_table.add(packet, None, ws_vpp) + ws_vpp.send(packet) + + # Prevent vICN to subscribe to other interfaces of the same + # channel + interface.channel.already_subscribed = True + + else: + q_str = Q_SUB_IF.format(interface.device_name) + q = self.parse_query(q_str) + packet = Packet.from_query(q) + self._router._flow_table.add(packet, None, ws) + ws.send(packet) + + def _monitor_emulator(self, resource): + ns = resource + ip = ns.node.bridge.ip_address # host_interface.ip_address + + ws_ns = self._router.add_interface('websocketclient', address = ip, + port = ns.control_port, + hook = self._on_ns_record) + ws = self._router.add_interface('websocketclient', address = ip, + hook = self._on_netmon_channel_record) + + for station in ns.stations: + if not station.managed: + interface = [i for i in station.interfaces if i.channel == ns] + assert len(interface) == 1 + interface = interface[0] + identifier = interface.name + else: + iface = ns._sta_ifs[station] + identifier = iface._state.uuid._uuid + + # Monitor the wireless channel for position and link rate + q_str = Q_SUB_EMULATOR_IF.format(identifier) + q = self.parse_query(q_str) + packet = Packet.from_query(q) + self._router._flow_table.add(packet, None, ws_ns) + ws_ns.send(packet) + + # We also need to subscribe on the node for the tap interfaces + # for individual bandwidth monitoring + tap = ns._sta_taps[station] + q_str = Q_SUB_EMULATOR.format(tap.device_name) + q = self.parse_query(q_str) + packet = Packet.from_query(q) + self._router._flow_table.add(packet, None, ws) + ws.send(packet) + + def _monitor(self, resource): + if resource.get_type() == 'centralip': + for uuid in self._pending_monitoring: + pending = self.by_uuid(uuid) + self._monitor(pending) + self._pending_monitoring.clear() + return + + central_ip = self.by_type_str('centralip') + if not central_ip: + raise NotImplementedError('Missing CentralIP in experiment') + central_ip = central_ip[0] + + uuid = resource.get_uuid() + + if central_ip._state.state != ResourceState.CLEAN: + self._pending_monitoring.add(uuid) + return + + if uuid in self._monitored: + return + self._monitored.add(uuid) + + if resource.get_type() == 'netmon': + if resource.node.get_type() != 'lxccontainer': + return + self._monitor_netmon(resource) + + elif resource.has_type('emulatedchannel'): + self._monitor_emulator(resource) + + async def __set_resource_state(self, resource, state): + """Sets the resource state (no-lock version) + + It is important to centralize state change since some states are + associated with Events(). + """ + resource._state.state = state + if state == ResourceState.CLEAN: + # Monitoring hook + self._monitor(resource) + resource._state.clean.set() + else: + resource._state.clean.clear() + if state == ResourceState.INITIALIZED: + resource._state.init.set() + + async def _set_resource_state(self, resource, state): + """Sets the resource state (lock version) + """ + with await resource._state.lock: + await self.__set_resource_state(resource, state) + + def _trigger_state_change(self, resource, fut): + try: + ret = fut.result() + resource._state.change_success = True + resource._state.change_value = ret + except Exception as e: + resource._state.change_success = False + resource._state.change_value = e + resource._state.change_event.set() + + def _process_attr_dict(self, resource, attribute, attrs): + if not isinstance(attrs, dict): + if attribute is None: + return False + attrs = {attribute.name: attrs} + resource.set_many(attrs, current=True) + return True + + async def _task_resource_update(self, resource): + # Monitor all FSM one by one and inform about errors. + futs = list() + attrs = list() + for attr in resource.iter_attributes(): + if resource.is_local_attribute(attr.name): + continue + if attr.key: + # Those attributes are already done + continue + + attrs.append(attr) + fut = self.attribute_process(resource, attr) + futs.append(fut) + + if not futs: + self.log(resource, 'No attribute to update') + return None + + await asyncio.gather(*futs) + + # Inform the resource about the outcome of the update process + # Error if at least one attribute failed. + resource._state.change_success = all( + resource._state.attr_change_success[attr.name] + for attr in attrs) + self.log(resource, + 'All attributes FSM terminated with success={}'.format( + resource._state.change_success)) + + if resource._state.change_success: + ret = [ resource._state.attr_change_value[attr.name] + for attr in attrs] + return ret + else: + raise NotImplementedError('At least one attribute failed') + + async def _task_resource_keys(self, resource): + # Monitor all FSM one by one and inform about errors. + futs = list() + attrs = list() + for attr in resource.get_keys(): + if resource.is_local_attribute(attr.name): + continue + attrs.append(attr) + fut = self.attribute_process(resource, attr) + futs.append(fut) + + if not futs: + self.log(resource, 'No key attribute to update') + return None + + await asyncio.gather(*futs) + + # Inform the resource about the outcome of the update process + # Error if at least one attribute failed. + resource._state.change_success = all( + resource._state.attr_change_success[attr.name] + for attr in attrs) + self.log(resource, + 'KEY attributes FSM terminated with success={}'.format( + resource._state.change_success)) + + if resource._state.change_success: + ret = resource._state.attr_change_value + return ret + else: + raise NotImplementedError('At least one attribute failed') + + #-------------------------------------------------------------------------- + # Logging + #-------------------------------------------------------------------------- + + def log(self, resource, msg=None): + resource._state.log.append(msg) + + # Display on screen + #pfx = '[{}] {}: '.format(resource.get_type(), resource.get_uuid()) + #print(pfx, msg) + + def attr_log(self, resource, attribute, msg): + resource._state.attr_log[attribute.name].append(msg) + + # Display on screen + #pfx = '[{}] {} / {}: '.format(resource.get_type(), resource.get_uuid(), + # attribute.name) + #print(pfx, msg) + + #-------------------------------------------------------------------------- + + async def _process_resource(self, resource): + """ + We need to schedule the first set of subresources, knowing others will + be orchestrated by the operators + - subresources need to enter the system in order + -> we just add them to the manager in time + - but they need to be managed by the system + - in particular, the owner waits for the system to complete + subresoruces: this is the implementation of __get__ __create__ + __delete__ in the base resource + """ + pfx = '[{}] {}: '.format(resource.get_type(), resource.get_uuid()) + + self.log(resource, 'Starting FSM...') + + # When a resource is managed, it will get automatically monitored by + # adding the netmon resource on it. + from vicn.resource.node import Node + if resource.get_type() == 'lxccontainer': + self.log(resource, + 'Associating monitoring to lxc container resource...') + instance = self.create('netmon', node=resource) + self.commit_resource(instance) + + # FIXME + elif resource.get_type() == 'physical' and resource.managed and \ + len(self.by_type_str('emulatedchannel')) > 0: + self.log(resource, + 'Associating monitoring to physical node resource...') + instance = self.create('netmon', node=resource) + self.commit_resource(instance) + + state = None + + while True: + with await resource._state.lock: + + # FSM implementation + state = resource._state.state + self.log(resource, 'Current state is {}'.format(state)) + + if resource._state.change_success == False: + e = resource._state.change_value + import traceback; traceback.print_tb(e.__traceback__) + raise NotImplementedError + + elif state == ResourceState.UNINITIALIZED: + pending_state = ResourceState.PENDING_DEPS + elif state == ResourceState.DEPS_OK: + pending_state = ResourceState.PENDING_INIT + + elif state == ResourceState.INITIALIZED: + pending_state = ResourceState.PENDING_GET + + elif state == ResourceState.GET_DONE: + if resource.get_keys(): + pending_state = ResourceState.PENDING_KEYS + else: + pending_state = ResourceState.PENDING_CREATE + + elif state == ResourceState.KEYS_OK: + pending_state = ResourceState.PENDING_CREATE + + elif state in [ResourceState.CREATED, ResourceState.DIRTY]: + pending_state = ResourceState.PENDING_UPDATE + + elif state == ResourceState.DELETED: + raise NotImplementedError + # Nothing to do unless explicitely requested + pending_state = None + + elif state in [ + ResourceState.PENDING_DEPS, + ResourceState.PENDING_INIT, + ResourceState.PENDING_CREATE, + ResourceState.PENDING_DELETE, + ResourceState.CLEAN + ]: + # Nothing to do + pending_state = None + else: + raise RuntimeError + + # Implement state changes + # + # If a task is already pending, we simply wait for it to complete + if pending_state is None: + # Wait for an external change + self.log(resource, 'Nothing to do. Waiting for event...') + await resource._state.change_event.wait() + self.log(resource, 'Wake up from event') + resource._state.change_event.clear() + continue + + if pending_state == ResourceState.PENDING_DEPS: + # XXX Maybe for any action, we need to wait for dependencies to + # be up to date + task = async_task(functools.partial(self._resource_wait_dependencies, resource))() + + elif pending_state == ResourceState.PENDING_INIT: + task = self._task_resource_action(resource, '__initialize__') + + elif pending_state == ResourceState.PENDING_GET: + task = self._task_resource_action(resource, '__get__') + if isinstance(task, BashTask): + task.set_default_parse_for_get() + + elif pending_state == ResourceState.PENDING_KEYS: + task = async_task(functools.partial(self._task_resource_keys, resource))() + + elif pending_state == ResourceState.PENDING_CREATE: + task = self._task_resource_action(resource, '__create__') + + elif pending_state == ResourceState.PENDING_UPDATE: + # Instead of tasks, we wait for many smaller autoamtons to + # terminate + await resource._state.write_lock.acquire() + task = async_task(functools.partial(self._task_resource_update, resource))() + + elif pending_state == ResourceState.PENDING_DELETE: + task = self._task_resource_action(resource, '__delete__') + + else: + raise RuntimeError + + if task is not None and not isinstance(task, EmptyTask): + state_change = functools.partial(self._trigger_state_change, resource) + task.add_done_callback(state_change) + self.schedule(task) + + self.log(resource, 'Trigger {} -> {}. Waiting task completion'.format( + state, pending_state)) + await resource._state.change_event.wait() + resource._state.change_event.clear() + self.log(resource, 'Completed {} -> {}. Success = {}'.format( + state, pending_state, resource._state.change_success)) + + # If no task, can assume there is an instant switch to the next step... + + # Update state based on task results + if pending_state == ResourceState.PENDING_DEPS: + # XXX NO CHANGE SUCCESS TEST ?? + new_state = ResourceState.DEPS_OK + + elif pending_state == ResourceState.PENDING_INIT: + if resource._state.change_success == True: + attrs = resource._state.change_value + self.log(resource, 'INIT done.') + new_state = ResourceState.INITIALIZED + else: + e = resource._state.change_value + import traceback; traceback.print_tb(e.__traceback__) + raise NotImplementedError + + elif pending_state == ResourceState.PENDING_GET: + if resource._state.change_success == True: + attrs = resource._state.change_value + self.log(resource, S_INIT_DONE.format(attrs)) + self._process_attr_dict(resource, None, attrs) + new_state = ResourceState.CREATED + else: + e = resource._state.change_value + if ENABLE_LXD_WORKAROUND and \ + resource.get_type() != 'lxccontainer' and \ + isinstance(e, LxdNotFound): + # "not found" is the normal exception when the container + # does not exists. anyways the bug should only occur + # with container.execute(), not container.get() + log.error('LXD Fix (not found). Reset resource') + new_state = ResourceState.UNINITIALIZED + elif ENABLE_LXD_WORKAROUND and isinstance(e, LXDAPIException): + # "not found" is the normal exception when the container + # does not exists. anyways the bug should only occur + # with container.execute(), not container.get() + log.error('LXD Fix (API error). Reset resource') + new_state = ResourceState.UNINITIALIZED + elif isinstance(e, ResourceNotFound): + # The resource does not exist + self.log(resource, S_GET_DONE.format( + resource._state.change_value)) + new_state = ResourceState.GET_DONE + resource._state.change_value = None + else: + e = resource._state.change_value + import traceback; traceback.print_tb(e.__traceback__) + raise NotImplementedError + resource._state.change_success = True + + elif pending_state == ResourceState.PENDING_KEYS: + if resource._state.change_success == True: + new_state = ResourceState.KEYS_OK + self.log(resource, S_KEYS_OK) + else: + e = resource._state.change_value + self.log(resource, 'KEYS failed: {}'.format(e)) + + if ENABLE_LXD_WORKAROUND and isinstance(e, LxdNotFound): + log.error('LXD Fix (not found). Reset resource') + new_state = ResourceState.CREATED + resource._state.change_success = True + else: + e = resource._state.change_value + import traceback; traceback.print_tb(e.__traceback__) + raise NotImplementedError + + elif pending_state == ResourceState.PENDING_CREATE: + if resource._state.change_success == True: + attrs = resource._state.change_value + self.log(resource, S_CREATE_OK.format(attrs)) + self._process_attr_dict(resource, None, attrs) + new_state = ResourceState.CREATED + else: + e = resource._state.change_value + + if ENABLE_LXD_WORKAROUND and isinstance(e, LxdNotFound): + log.error('LXD Fix (not found). Reset resource') + new_state = ResourceState.UNINITIALIZED + resource._state.change_success = True + elif ENABLE_LXD_WORKAROUND and \ + isinstance(e, LXDAPIException): + log.error('LXD Fix (API error). Reset resource') + new_state = ResourceState.UNINITIALIZED + resource._state.change_success = True + elif 'File exists' in str(e): + new_state = ResourceState.CREATED + resource._state.change_success = True + elif 'dpkg --configure -a' in str(e): + resource._dpkg_configure_a = True + new_state = ResourceState.UNINITIALIZED + resource._state.change_success = True + else: + self.log(resource, 'CREATE failed: {}'.format(e)) + e = resource._state.change_value + import traceback; traceback.print_tb(e.__traceback__) + raise NotImplementedError + + elif pending_state == ResourceState.PENDING_UPDATE: + if resource._state.change_success == True: + self.log(resource, 'Update finished, resource is CLEAN.') + new_state = ResourceState.CLEAN + resource._state.write_lock.release() + else: + e = resource._state.change_value + self.log(resource, 'UPDATE failed: {}'.format(e)) + + if ENABLE_LXD_WORKAROUND and isinstance(e, LxdNotFound): + log.error('LXD Fix (not found). Reset resource') + new_state = ResourceState.CREATED + resource._state.change_success = True + resource._state.write_lock.release() + else: + e = resource._state.change_value + resource._state.write_lock.release() + import traceback; traceback.print_tb(e.__traceback__) + raise NotImplementedError + + elif pending_state == ResourceState.PENDING_DELETE: + raise NotImplementedError + new_state = None + else: + raise RuntimeError + + await self._set_resource_state(resource, new_state) + diff --git a/vicn/core/sa_collections.py b/vicn/core/sa_collections.py new file mode 100644 index 00000000..e627caa5 --- /dev/null +++ b/vicn/core/sa_collections.py @@ -0,0 +1,249 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# This module is derived from code from SQLAlchemy +# +# orm/collections.py +# Copyright (C) 2005-2016 the SQLAlchemy authors and contributors +# +# This module is part of SQLAlchemy and is released under +# the MIT License: http://www.opensource.org/licenses/mit-license.php +# + +import logging + +from vicn.core.sa_compat import py2k +from vicn.core.exception import VICNListException + +log = logging.getLogger(__name__) + +def _list_decorators(): + """Tailored instrumentation wrappers for any list-like class.""" + + def _tidy(fn): + fn._sa_instrumented = True + fn.__doc__ = getattr(list, fn.__name__).__doc__ + + def append(fn): + def append(self, item): + try: + item = self._attribute.do_list_add(self._instance, item) + fn(self, item) + except VICNListException as e: + pass + _tidy(append) + return append + + def remove(fn): + def remove(self, value): + # testlib.pragma exempt:__eq__ + try: + self._attribute.do_list_remove(self._instance, value) + fn(self, value) + except : pass + _tidy(remove) + return remove + + def insert(fn): + def insert(self, index, value): + try: + value = self._attribute.do_list_add(self._instance, item) + fn(self, index, value) + except : pass + _tidy(insert) + return insert + + def __getitem__(fn): + def __getitem__(self, index): + item = fn(self, index) + return self._attribute.handle_getitem(self._instance, item) + _tidy(__getitem__) + return __getitem__ + + def __setitem__(fn): + def __setitem__(self, index, value): + if not isinstance(index, slice): + existing = self[index] + if existing is not None: + try: + self._attribute.do_list_remove(self._instance, existing) + except: pass + try: + value = self._attribute.do_list_add(self._instance, value) + fn(self, index, value) + except: pass + else: + # slice assignment requires __delitem__, insert, __len__ + step = index.step or 1 + start = index.start or 0 + if start < 0: + start += len(self) + if index.stop is not None: + stop = index.stop + else: + stop = len(self) + if stop < 0: + stop += len(self) + + if step == 1: + for i in range(start, stop, step): + if len(self) > start: + del self[start] + + for i, item in enumerate(value): + self.insert(i + start, item) + else: + rng = list(range(start, stop, step)) + if len(value) != len(rng): + raise ValueError( + "attempt to assign sequence of size %s to " + "extended slice of size %s" % (len(value), + len(rng))) + for i, item in zip(rng, value): + self.__setitem__(i, item) + _tidy(__setitem__) + return __setitem__ + + def __delitem__(fn): + def __delitem__(self, index): + if not isinstance(index, slice): + item = self[index] + try: + self._attribute.do_list_remove(self._instance, item) + fn(self, index) + except : pass + else: + # slice deletion requires __getslice__ and a slice-groking + # __getitem__ for stepped deletion + # note: not breaking this into atomic dels + has_except = False + for item in self[index]: + try: + self._attribute.do_list_remove(self._instance, item) + except : has_except = True + if not has_except: + fn(self, index) + _tidy(__delitem__) + return __delitem__ + + if py2k: + def __setslice__(fn): + def __setslice__(self, start, end, values): + has_except = False + for value in self[start:end]: + try: + self._attribute.do_list_remove(self._instance, value) + except : has_except = True + #values = [self._attribute.do_list_add(self._instance, value) for value in values] + _values = list() + for value in values: + try: + _values.append(self._attribute.do_list_add(self._instance, value)) + except: has_except = True + if not has_except: + fn(self, start, end, _values) + _tidy(__setslice__) + return __setslice__ + + def __delslice__(fn): + def __delslice__(self, start, end): + has_except = False + for value in self[start:end]: + try: + self._attribute.do_list_remove(self._instance, value) + except : has_except = True + if not has_except: + fn(self, start, end) + _tidy(__delslice__) + return __delslice__ + + def extend(fn): + def extend(self, iterable): + for value in iterable: + self.append(value) + _tidy(extend) + return extend + + def __iadd__(fn): + def __iadd__(self, iterable): + # list.__iadd__ takes any iterable and seems to let TypeError + # raise as-is instead of returning NotImplemented + for value in iterable: + self.append(value) + return self + _tidy(__iadd__) + return __iadd__ + + def pop(fn): + def pop(self, index=-1): + try: + self._attribute.do_list_remove(self._instance, item) + item = fn(self, index) + return item + except : return None + _tidy(pop) + return pop + + def __iter__(fn): + def __iter__(self): + for item in fn(self): + yield self._attribute.handle_getitem(self._instance, item) + _tidy(__iter__) + return __iter__ + + def __repr__(fn): + def __repr__(self): + return '<Collection {} {}>'.format(id(self), list.__repr__(self)) + _tidy(__repr__) + return __repr__ + + __str__ = __repr__ + #def __str__(fn): + # def __str__(self): + # return str(list(self)) + # _tidy(__str__) + # return __str__ + + if not py2k: + def clear(fn): + def clear(self, index=-1): + has_except = False + for item in self: + try: + self._attribute.do_list_remove(self._instance, item) + except : has_except = True + if not has_except: + fn(self) + _tidy(clear) + return clear + + # __imul__ : not wrapping this. all members of the collection are already + # present, so no need to fire appends... wrapping it with an explicit + # decorator is still possible, so events on *= can be had if they're + # desired. hard to imagine a use case for __imul__, though. + + l = locals().copy() + l.pop('_tidy') + return l + +def _instrument_list(cls): + # inspired by sqlalchemy + for method, decorator in _list_decorators().items(): + fn = getattr(cls, method, None) + if fn: + #if (fn and method not in methods and + # not hasattr(fn, '_sa_instrumented')): + setattr(cls, method, decorator(fn)) + +class InstrumentedList(list): + + def __contains__(self, key): + from vicn.core.resource import Resource + if isinstance(key, Resource): + key = key.get_uuid() + return list.__contains__(self, key) + + def __lshift__(self, item): + self.append(item) + +_instrument_list(InstrumentedList) diff --git a/vicn/core/sa_compat.py b/vicn/core/sa_compat.py new file mode 100644 index 00000000..34211455 --- /dev/null +++ b/vicn/core/sa_compat.py @@ -0,0 +1,270 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# This module originates from SQLAlchemy +# +# util/compat.py +# Copyright (C) 2005-2016 the SQLAlchemy authors and contributors +# +# This module is part of SQLAlchemy and is released under +# the MIT License: http://www.opensource.org/licenses/mit-license.php +# + +"""Handle Python version/platform incompatibilities.""" + +import sys + +try: + import threading +except ImportError: + import dummy_threading as threading + +py36 = sys.version_info >= (3, 6) +py33 = sys.version_info >= (3, 3) +py32 = sys.version_info >= (3, 2) +py3k = sys.version_info >= (3, 0) +py2k = sys.version_info < (3, 0) +py265 = sys.version_info >= (2, 6, 5) +jython = sys.platform.startswith('java') +pypy = hasattr(sys, 'pypy_version_info') +win32 = sys.platform.startswith('win') +cpython = not pypy and not jython # TODO: something better for this ? + +import collections +next = next + +if py3k: + import pickle +else: + try: + import cPickle as pickle + except ImportError: + import pickle + +# work around http://bugs.python.org/issue2646 +if py265: + safe_kwarg = lambda arg: arg +else: + safe_kwarg = str + +ArgSpec = collections.namedtuple("ArgSpec", + ["args", "varargs", "keywords", "defaults"]) + +if py3k: + import builtins + + from inspect import getfullargspec as inspect_getfullargspec + from urllib.parse import (quote_plus, unquote_plus, + parse_qsl, quote, unquote) + import configparser + from io import StringIO + + from io import BytesIO as byte_buffer + + def inspect_getargspec(func): + return ArgSpec( + *inspect_getfullargspec(func)[0:4] + ) + + string_types = str, + binary_types = bytes, + binary_type = bytes + text_type = str + int_types = int, + iterbytes = iter + + def u(s): + return s + + def ue(s): + return s + + def b(s): + return s.encode("latin-1") + + if py32: + callable = callable + else: + def callable(fn): + return hasattr(fn, '__call__') + + def cmp(a, b): + return (a > b) - (a < b) + + from functools import reduce + + print_ = getattr(builtins, "print") + + import_ = getattr(builtins, '__import__') + + import itertools + itertools_filterfalse = itertools.filterfalse + itertools_filter = filter + itertools_imap = map + from itertools import zip_longest + + import base64 + + def b64encode(x): + return base64.b64encode(x).decode('ascii') + + def b64decode(x): + return base64.b64decode(x.encode('ascii')) + +else: + from inspect import getargspec as inspect_getfullargspec + inspect_getargspec = inspect_getfullargspec + from urllib import quote_plus, unquote_plus, quote, unquote + from urlparse import parse_qsl + import ConfigParser as configparser + from StringIO import StringIO + from cStringIO import StringIO as byte_buffer + + string_types = basestring, + binary_types = bytes, + binary_type = str + text_type = unicode + int_types = int, long + + def iterbytes(buf): + return (ord(byte) for byte in buf) + + def u(s): + # this differs from what six does, which doesn't support non-ASCII + # strings - we only use u() with + # literal source strings, and all our source files with non-ascii + # in them (all are tests) are utf-8 encoded. + return unicode(s, "utf-8") + + def ue(s): + return unicode(s, "unicode_escape") + + def b(s): + return s + + def import_(*args): + if len(args) == 4: + args = args[0:3] + ([str(arg) for arg in args[3]],) + return __import__(*args) + + callable = callable + cmp = cmp + reduce = reduce + + import base64 + b64encode = base64.b64encode + b64decode = base64.b64decode + + def print_(*args, **kwargs): + fp = kwargs.pop("file", sys.stdout) + if fp is None: + return + for arg in enumerate(args): + if not isinstance(arg, basestring): + arg = str(arg) + fp.write(arg) + + import itertools + itertools_filterfalse = itertools.ifilterfalse + itertools_filter = itertools.ifilter + itertools_imap = itertools.imap + from itertools import izip_longest as zip_longest + + +import time +if win32 or jython: + time_func = time.clock +else: + time_func = time.time + +from collections import namedtuple +from operator import attrgetter as dottedgetter + + +if py3k: + def reraise(tp, value, tb=None, cause=None): + if cause is not None: + assert cause is not value, "Same cause emitted" + value.__cause__ = cause + if value.__traceback__ is not tb: + raise value.with_traceback(tb) + raise value + +else: + # not as nice as that of Py3K, but at least preserves + # the code line where the issue occurred + exec("def reraise(tp, value, tb=None, cause=None):\n" + " if cause is not None:\n" + " assert cause is not value, 'Same cause emitted'\n" + " raise tp, value, tb\n") + + +def raise_from_cause(exception, exc_info=None): + if exc_info is None: + exc_info = sys.exc_info() + exc_type, exc_value, exc_tb = exc_info + cause = exc_value if exc_value is not exception else None + reraise(type(exception), exception, tb=exc_tb, cause=cause) + +if py3k: + exec_ = getattr(builtins, 'exec') +else: + def exec_(func_text, globals_, lcl=None): + if lcl is None: + exec('exec func_text in globals_') + else: + exec('exec func_text in globals_, lcl') + + +def with_metaclass(meta, *bases): + """Create a base class with a metaclass. + + Drops the middle class upon creation. + + Source: http://lucumr.pocoo.org/2013/5/21/porting-to-python-3-redux/ + + """ + + class metaclass(meta): + __call__ = type.__call__ + __init__ = type.__init__ + + def __new__(cls, name, this_bases, d): + if this_bases is None: + return type.__new__(cls, name, (), d) + return meta(name, bases, d) + return metaclass('temporary_class', None, {}) + + +from contextlib import contextmanager + +try: + from contextlib import nested +except ImportError: + # removed in py3k, credit to mitsuhiko for + # workaround + + @contextmanager + def nested(*managers): + exits = [] + vars = [] + exc = (None, None, None) + try: + for mgr in managers: + exit = mgr.__exit__ + enter = mgr.__enter__ + vars.append(enter()) + exits.append(exit) + yield vars + except: + exc = sys.exc_info() + finally: + while exits: + exit = exits.pop() + try: + if exit(*exc): + exc = (None, None, None) + except: + exc = sys.exc_info() + if exc != (None, None, None): + reraise(exc[0], exc[1], exc[2]) diff --git a/vicn/core/scheduling_algebra.py b/vicn/core/scheduling_algebra.py new file mode 100644 index 00000000..207856c0 --- /dev/null +++ b/vicn/core/scheduling_algebra.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 Cisco and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +def SchedulingAlgebra(cls, concurrent_mixin=object, composition_mixin=object, + sequential_mixin=object): # allow_none = True + + class BaseElement(cls): + def __default__(cls, *elements): + elts = [e for e in elements + if e is not None and not isinstance(e, Empty)] + if len(elts) == 0: + # The first is always Empty + assert len(elements) != 0 + return elements[0] + elif len(elts) == 1: + return elts[0] + return cls(*elts) + + def __concurrent__(*elements): + return BaseElement.__default__(Concurrent, *elements) + + def __composition__(*elements): + return BaseElement.__default__(Composition, *elements) + + def __sequential__(*elements): + return BaseElement.__default__(Sequential, *elements) + + # Operator: | + __or__ = __concurrent__ + + # Operator: > + __gt__ = __sequential__ + + # Operator: @ + __matmul__ = __composition__ + + class Element(BaseElement): + def __iter__(self): + yield self + + class Operator(BaseElement): + def __init__(self, *elements): + super().__init__() + self._elements = list(elements) + + def __iter__(self): + yield self + for element in self._elements: + for x in element: + yield x + + class Concurrent(Operator, concurrent_mixin): + # Algebraic rule : ((A // B) // C) ~ (A // B // C) + def __concurrent__(self, other): + self._elements.append(other) + return self + + def __repr__(self): + return '<Concurrent {}>'.format(self._elements) + + class Composition(Operator, composition_mixin): + def __repr__(self): + return '<Composition {}>'.format(self._elements) + + class Sequential(Operator, sequential_mixin): + def __repr__(self): + return '<Sequential {}>'.format(self._elements) + + class Empty(Element): + def __concurrent__(self, other): + return other + + def __composition__(self, other): + return other + + def __sequential__(self, other): + return other + + def __repr__(self): + return '<Empty>' + + return Element, Empty diff --git a/vicn/core/state.py b/vicn/core/state.py new file mode 100644 index 00000000..d5069b24 --- /dev/null +++ b/vicn/core/state.py @@ -0,0 +1,177 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 Cisco and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import asyncio +import random +import string + +class NEVER_SET: + pass + +# Separator for components of the UUID +UUID_SEP = '-' + +# Length of the random component of the UUID +UUID_LEN = 5 + +class ResourceState: + UNINITIALIZED = 'UNINITIALIZED' + PENDING_DEPS = 'PENDING_DEPS' + DEPS_OK = 'DEPS_OK' + PENDING_INIT = 'PENDING_INIT' + INITIALIZED = 'INITIALIZED' + PENDING_GET = 'PENDING_GET' + GET_DONE = 'GET_DONE' + PENDING_KEYS = 'PENDING_KEYS' + KEYS_OK = 'KEYS_OK' + PENDING_CREATE = 'PENDING_CREATE' + CREATED = 'CREATED' + DIRTY = 'DIRTY' + CLEAN = 'CLEAN' + PENDING_UPDATE = 'PENDING_UPDATE' + PENDING_DELETE = 'PENDING_DELETE' + DELETED = 'DELETED' + +class AttributeState: + UNINITIALIZED = 'UNINITIALIZED' + INITIALIZED = 'INITIALIZED' + DIRTY = 'DIRTY' + PENDING_INIT = 'PENDING_INIT' + PENDING_UPDATE = 'PENDING_UPDATE' + CLEAN = 'CLEAN' + +class Operations: + SET = 'set' + LIST_ADD = 'add' + LIST_REMOVE = 'remove' + LIST_CLEAR = 'clear' + +class UUID: + def __init__(self, name, cls): + self._uuid = self._make_uuid(name, cls) + + def _make_uuid(self, name, cls): + """Generate a unique resource identifier + + The UUID consists in the type of the resource, to which is added a + random identifier of length UUID_LEN. Components of the UUID are + separated by UUID_SEP. + """ + uuid = ''.join(random.choice(string.ascii_uppercase + string.digits) + for _ in range(UUID_LEN)) + if name: + uuid = name # + UUID_SEP + uuid + return UUID_SEP.join([cls.__name__, uuid]) + + def __repr__(self): + return '<UUID {}>'.format(self._uuid) + + def __lt__(self, other): + return self._uuid < other._uuid + + __str__ = __repr__ + +class PendingValue: + def __init__(self, value = None): + self.clear(value) + + def clear(self, value=NEVER_SET): + self.value = NEVER_SET + self.operations = list() + + def trigger(self, action, value, cur_value = None): + + if self.value is NEVER_SET: + if cur_value is not None: + self.value = cur_value + + if action == Operations.SET: + self.value = value + self.operations = [(Operations.SET, value)] + elif action == Operations.LIST_CLEAR: + self.value = list() + self.operations = [(Operations.LIST_CLEAR, None)] + else: + if action == Operations.LIST_ADD: + self.value.append(value) + elif action == Operations.LIST_REMOVE: + self.value.remove(value) + else: + raise RuntimeError + self.operations.append((action, value)) + +class InstanceState: + def __init__(self, manager, instance, name = None): + + # Unique identifier for the instance. This is useful for relation + # between resources + self.uuid = UUID(name, instance.__class__) + self.instance = instance + + # Instance manager + self.manager = manager + + # Events + self.events = dict() + + # Stores the requested value : attribute_name -> requested operations = + # LIST set add remove clear + self.dirty = dict() + + + # Initialize resource state + self.lock = asyncio.Lock() + self.write_lock = asyncio.Lock() + self.state = ResourceState.UNINITIALIZED + self.clean = asyncio.Event() + self.clean.clear() + self.init = asyncio.Event() + self.init.clear() + self.change_event = asyncio.Event() + self.change_event.clear() + self.change_success = None + self.change_value = None + self.log = list() + + self.attr_lock = dict() + self.attr_init = dict() + self.attr_clean = dict() + self.attr_state = dict() + self.attr_change_event = dict() + self.attr_change_success = dict() + self.attr_change_value= dict() + self.attr_log = dict() + # Initialize attribute state + for attribute in instance.iter_attributes(): + self.attr_lock[attribute.name] = asyncio.Lock() + self.attr_init[attribute.name] = asyncio.Event() + self.attr_clean[attribute.name] = asyncio.Event() + self.attr_state[attribute.name] = AttributeState.UNINITIALIZED + self.attr_change_event[attribute.name] = asyncio.Event() + self.attr_change_event[attribute.name].clear() + self.attr_change_success[attribute.name] = None + self.attr_change_value[attribute.name] = None + self.dirty[attribute.name] = PendingValue(NEVER_SET) + self.attr_log[attribute.name] = list() + + def set_dirty(self, attr_name): + self.attr_dirty.add(attr_name) + self.manager.set_dirty(self.uuid) + + def trigger(self, attribute_name, action, value): + self.dirty[attribute_name].trigger(action, value) diff --git a/vicn/core/task.py b/vicn/core/task.py new file mode 100644 index 00000000..2e9bc275 --- /dev/null +++ b/vicn/core/task.py @@ -0,0 +1,352 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 Cisco and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import asyncio +import concurrent.futures +import functools +import logging +import shlex +import subprocess + +from vicn.core.scheduling_algebra import SchedulingAlgebra +from vicn.core.commands import ReturnValue +from vicn.core.exception import ResourceNotFound +from vicn.core.commands import Command, SequentialCommands +from netmodel.util.process import execute_local + +log = logging.getLogger(__name__) + +EXECUTOR=concurrent.futures.ThreadPoolExecutor +#EXECUTOR=concurrent.futures.ProcessPoolExecutor + +MAX_WORKERS=50 # None + +class BaseTask: + """Base class for all tasks + """ + + def __init__(self): + self._future = asyncio.Future() + + def terminate(self): + pass + + def start(self): + pass + + def stop(self): + pass + + def get_future(self): + return self._future + + def add_done_callback(self, cb): + self._future.add_done_callback(cb) + + def __repr__(self): + return '<BaseTask>' + +class ConcurrentMixin: + async def execute(self): + try: + for t in self._elements: + await t.execute() + rets = await asyncio.gather(*[t.get_future() + for t in self._elements]) + + # The result value is the "union" of all result values + # In case of tasks setting the same attributes, they are merged + # into a list + + dic = dict() + for ret in rets: + # Ideally we should get all attribute names, and properly + # insert Nones. So far we assume all dicts are present and + # complete. + if not isinstance(ret, dict): + continue + for k, v in ret.items(): + if k in dic: + if not isinstance(dic[k], list): + dic[k] = [dic[k]] + dic[k].append(v) + else: + dic[k] = [v] + self.get_future().set_result(dic) + except Exception as e: + self.get_future().set_exception(e) + +class SequentialMixin: + async def execute(self): + try: + for t in self._elements: + await t.execute() + await t.get_future() + self.get_future().set_result(None) + except Exception as e: + self.get_future().set_exception(e) + +class CompositionMixin: + async def execute(self): + try: + ret = None + for t in self._elements: + ret = (ret,) if ret is not None else tuple() + await t.execute(*ret) + ret = await t.get_future() + self.get_future().set_result(ret) + except Exception as e: + print('we need to cancel tasks not executed...') + self.get_future().set_exception(e) + +Task, EmptyTask = SchedulingAlgebra(BaseTask, ConcurrentMixin, + CompositionMixin, SequentialMixin) + +def task(fn): + def decorator(*args, **kwargs): + return PythonTask(fn, *args, **kwargs) + return decorator + +def async_task(fn, *t_args, **t_kwargs): + def decorator(*args, **kwargs): + all_args = tuple() + t_args + args + all_kwargs = dict() + all_kwargs.update(t_kwargs) + all_kwargs.update(kwargs) + return PythonAsyncTask(fn, *args, **kwargs) + return decorator + +def inline_task(fn): + def decorator(*args, **kwargs): + return PythonInlineTask(fn, *args, **kwargs) + return decorator + +async def wait_task(task): + return await task.get_future() + +async def run_task(task, manager): + manager.schedule(task) + ret = await wait_task(task) + return ret + +async def wait_concurrent_tasks(tasks): + await wait_task(Task.__concurrent__(*tasks)) + +wait_task_task = async_task(wait_task) + +def get_attribute_task(resource, attribute_name): + @async_task + async def func(): + return await resource.async_get(attribute_name) + return func() + +def set_attributes_task(resource, attribute_dict): + # The difficulty is in setting the pending value without triggering the + # manager, and executing the task by ourselves ! + raise NotImplementedError + +def get_attributes_task(resource, attribute_names): + assert len(attribute_names) == 1 + attribute_name = attribute_names[0] + + @async_task + async def func(): + await resource._state.manager.wait_attr_init(resource, attribute_name) + ret = await resource.async_get(attribute_name) + return {attribute_name: ret} + return func() + +class PythonTask(Task): + def __init__(self, func, *args, **kwargs): + super().__init__() + self._func = func + self._args = args + self._kwargs = kwargs + + def _done_callback(self, fut): + try: + self._future.set_result(fut.result()) + except Exception as e: + self._future.set_exception(e) + + async def execute(self, *args, **kwargs): + all_args = self._args + args + all_kwargs = dict() + all_kwargs.update(self._kwargs) + all_kwargs.update(kwargs) + + partial = functools.partial(self._func, *all_args, **all_kwargs) + + loop = asyncio.get_event_loop() + fut = loop.run_in_executor(None, partial) + fut.add_done_callback(self._done_callback) + + def __repr__(self): + return '<Task[py] {} / {} {}>'.format(self._func, self._args, + self._kwargs) + +class PythonAsyncTask(PythonTask): + async def execute(self, *args, **kwargs): + all_args = self._args + args + all_kwargs = dict() + all_kwargs.update(self._kwargs) + all_kwargs.update(kwargs) + + partial = asyncio.coroutine(self._func)(*all_args, **all_kwargs) + + fut = asyncio.ensure_future(partial) + fut.add_done_callback(self._done_callback) + + def __repr__(self): + return '<Task[apy]>' + +class PythonInlineTask(PythonTask): + async def execute(self, *args, **kwargs): + all_args = self._args + args + all_kwargs = dict() + all_kwargs.update(self._kwargs) + all_kwargs.update(kwargs) + + try: + ret = self._func(*all_args, **all_kwargs) + self._future.set_result(ret) + except Exception as e: + self._future.set_exception(e) + return self._future + +class BashTask(Task): + def __init__(self, node, cmd, parameters=None, parse=None, as_root=False, + output=False, pre=None, post=None, lock=None): + super().__init__() + self._node = node + self._cmd = cmd + self._params = parameters if parameters else dict() + self._parse = parse + self._pre = pre + self._post = post + self._lock = lock + + self._output = output + self._as_root = as_root + + def _default_parse_for_get(self, rv): + if not bool(rv): + raise ResourceNotFound + + def set_default_parse_for_get(self): + if self._parse is None: + self._parse = self._default_parse_for_get + + def _done_callback(self, fut): + """ + Note: extends the functionality of the parent _done_callback + """ + try: + rv = fut.result() + if self._parse is None and rv.return_value != 0: + raise Exception('Bash command failed', self.get_full_cmd(), rv) + if self._post: + self._post() + if self._parse: + rv = self._parse(rv) + self._future.set_result(rv) + except Exception as e: + self._future.set_exception(e) + if self._lock: + self._lock.release() + + def get_full_cmd(self): + c = SequentialCommands() + desc = None + for line in self._cmd.splitlines(): + line = line.strip() + if not line: + continue + if line.startswith('#'): + desc = line[1:].strip() + continue + c << Command(line, description = desc) + desc = None + + c.parameters = self._params + return c.command.full_commandline + + async def execute(self, *args, **kwargs): + """Execute the task, enforcing any eventual locking. + + Returns: + asyncio.Future + + Upon completion (eventually error), the Task's future is set. + """ + if len(args) == 1: + dic, = args + if isinstance(dic, dict): + self._params.update(dic) + if self._pre: + self._pre() + + func = self._node.execute if self._node else execute_local + # It is important that the command is contructed only just before it is + # executed, so that any object passed as parameters is deferenced right + # on time. + cmd = self.get_full_cmd() + partial = functools.partial(func, cmd, output = bool(self._parse)) + + node_str = self._node.name if self._node else '(LOCAL)' + cmd_str = cmd[:77] + '...' if len(cmd) > 80 else cmd + log.info('Execute: {} - {}'.format(node_str, cmd_str)) + + if self._lock: + # We need to do lock/unlock around the task execution + # Locking now will early block other tasks, but will at the same + # time delay them entering the executor queue; so this is + # equivalent + await self._lock.acquire() + + loop = asyncio.get_event_loop() + fut = loop.run_in_executor(None, partial) + fut.add_done_callback(self._done_callback) + + def execute_blocking(self): + rv = self._node.execute(self.get_full_cmd(), output=True) + if self._parse: + rv = self._parse(rv) + return rv + + def __repr__(self): + return '<Task[bash] {} / {}>'.format(self._cmd, self._params) + +class TaskManager: + def __init__(self): + executor = EXECUTOR() if MAX_WORKERS is None \ + else EXECUTOR(max_workers=MAX_WORKERS) + loop = asyncio.get_event_loop() + loop.set_default_executor(executor) + + def schedule(self, task): + """All instances of BaseTask can be scheduled + + Here we might decide to do more advanced scheduling, like merging bash + tasks, etc. thanks to the task algebra. + """ + asyncio.ensure_future(task.execute()) + +@task +def ParseRegexTask(rv): + return [m.groupdict() for m in rx.finditer(rv.stdout)] |