diff options
author | Jordan Augé <jordan.auge+fdio@email.com> | 2017-02-24 14:58:01 +0100 |
---|---|---|
committer | Jordan Augé <jordan.auge+fdio@cisco.com> | 2017-02-24 18:36:29 +0000 |
commit | 85a341d645b57b7cd88a26ed2ea0a314704240ea (patch) | |
tree | bdda2b35003aae20103a796f86daced160b8a730 /vicn/core/resource_mgr.py | |
parent | 9b30fc10fb1cbebe651e5a107e8ca5b24de54675 (diff) |
Initial commit: vICN
Change-Id: I7ce66c4e84a6a1921c63442f858b49e083adc7a7
Signed-off-by: Jordan Augé <jordan.auge+fdio@cisco.com>
Diffstat (limited to 'vicn/core/resource_mgr.py')
-rw-r--r-- | vicn/core/resource_mgr.py | 1436 |
1 files changed, 1436 insertions, 0 deletions
diff --git a/vicn/core/resource_mgr.py b/vicn/core/resource_mgr.py new file mode 100644 index 00000000..f6082488 --- /dev/null +++ b/vicn/core/resource_mgr.py @@ -0,0 +1,1436 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Copyright (c) 2017 Cisco and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys, logging, asyncio, socket, functools +import time + +# LXD workaround +from pylxd.exceptions import NotFound as LxdNotFound, LXDAPIException + +from netmodel.model.filter import Filter +from netmodel.model.query import Query, ACTION_SELECT, ACTION_INSERT +from netmodel.model.query import ACTION_UPDATE, ACTION_SUBSCRIBE +from netmodel.model.sql_parser import SQLParser +from netmodel.network.packet import Packet +from netmodel.network.router import Router +from netmodel.util.toposort import toposort, toposort_flatten +from netmodel.util.meta import inheritors +from netmodel.util.singleton import Singleton +from netmodel.util.misc import is_iterable +from vicn.core.attribute import NEVER_SET +from vicn.core.exception import VICNException, ResourceNotFound +from vicn.core.resource_factory import ResourceFactory +from vicn.core.resource import Resource, FactoryResource, EmptyResource +from vicn.core.sa_collections import InstrumentedList +from vicn.core.state import InstanceState, ResourceState +from vicn.core.state import AttributeState, Operations, PendingValue +from vicn.core.task import TaskManager, wait_task, task, async_task +from vicn.core.task import EmptyTask, BashTask + +log = logging.getLogger(__name__) + +ENABLE_LXD_WORKAROUND = True + +# Monitoring queries + +Q_SUB_VPP = 'SUBSCRIBE SUM(*) FROM interface WHERE device_name INCLUDED [{}]' +Q_SUB_IF = 'SUBSCRIBE * FROM interface WHERE device_name == "{}"' +Q_SUB_STATS = 'SUBSCRIBE * FROM stats' +Q_SUB_EMULATOR_IF = 'SUBSCRIBE * FROM interface WHERE id == "{}"' +Q_SUB_EMULATOR = 'SUBSCRIBE * FROM interface WHERE device_name == "{}"' + +# Log messages + +S_WAIT_DEP = ' .. Waiting for dependency {}' +S_WAIT_DEP_OK = ' .. Done waiting for dependency {}' +S_WAIT = ' .. Waiting for {}' +S_WAIT_OK = ' .. Done waiting for {}' +S_WAIT_PRED = ' - Waiting for initialization of predecessors...' +S_WAIT_SRS = ' - Waiting for subresources...' +S_REG_SR = ' . Registering subresource to manager {}...' +S_WAIT_SR = ' . Waiting for subresource: {}' +S_WAIT_SR_OK = ' . Subresource is ready: {}' +S_AFTER = ' . AFTER TYPE={}' + +S_INIT_DONE = 'INIT done. Resource exists. Process attribute dict {}' +S_GET_DONE = 'GET done. Resource does not exist (exception was: {})' +S_KEYS_OK = 'Keys initialized, resource can now be created.' +S_CREATE_OK = 'CREATE success. Process attribute dict {}' + +#------------------------------------------------------------------------------ +# Helpers +#------------------------------------------------------------------------------ + +async def wait_resource(resource): + await resource._state.clean.wait() + +async def wait_resource_init(resource): + await resource._state.init.wait() + +async def wait_resources(resources): + await asyncio.gather(*[wait_resource(r) for r in resources]) + +wait_resource_task = async_task(wait_resource) +wait_resources_task = async_task(wait_resources) + +#------------------------------------------------------------------------------ + +class ResourceManager(metaclass=Singleton): + """ + A ResourceManager is in charge of managing resources, their lifecycle, and + interfaces to them. + """ + + def __init__(self, base, settings): + + # Base directory for scenario + self._base = base + + # Resources sorted via dependency (instances) + self._resources = dict() + + self._deps = None + + # Store resource requirements used for automatic instanciation + # instance -> attribute -> requirements + self._instance_requirements = dict() + + self._dirty = set() + self._auto_commit = False + + # class -> Requirements + self._class_requirements = dict() + + self._map_uuid_name = dict() + self._map_name_uuid = dict() + self._map_str_uuid = dict() + + # The task manager is used to schedule tasks used for resource + # synchronization + self._task_mgr = TaskManager() + + # Store experiment settings + self._settings = settings + + # Cache available resource types + _available = ResourceFactory().get_available_resources() + self._available = { k.lower(): v for k, v in _available.items() } + + # API / user interface + self._router = Router(vicn_callback = self._on_vicn_command) + self._router.add_interface('unixserver') + self._router.add_interface('local', router = self._router) + self._router.add_interface('vicn', manager = self) + + ws_port = self.get('websocket_port') + self._ws = self._router.add_interface('websocketserver', + port = ws_port) + + # Monitoring + self._monitored = set() + self._pending_monitoring = set() + + # For debug + self._committed = set() + + def terminate(self): + self._router.terminate() + + #-------------------------------------------------------------------------- + # Settings + #-------------------------------------------------------------------------- + + def set_settings(self, settings): + if settings is None: + return + self._settings.update(settings) + + def get(self, setting): + return self._settings[setting] + + def set(self, setting, value): + self._settings[setting] = value + + #-------------------------------------------------------------------------- + # Monitoring + # + # XXX This code should be deprecated / moved into a separate module. + # Planned for a future release. + #-------------------------------------------------------------------------- + + def _on_vicn_command(self, command): + if command == 'setup': + self.setup() + elif command == 'teardown': + self.teardown() + elif command == 'monitor': + self.monitor() + elif command == 'terminate': + loop = asyncio.get_event_loop() + loop.stop() + else: + # open_terminal, ... + raise NotImplementedError + + def _broadcast(self, query): + if not self._ws: + return + self._ws.execute(query) + + def _broadcast_packet(self, packet): + self._broadcast(packet.to_query()) + + def _on_ns_record(self, packet): + query = packet.to_query() + + if not query.object_name == 'interface': + return + q = Query(ACTION_UPDATE, 'channel', filter = query.filter, + params = query.params) + q.reply = True + + self._ws.execute(q) + return None + + def _on_netmon_record(self, packet): + query = packet.to_query() + + # Find channel related to query + # NOTE: we update the channel twice, once for each interface... + if query.object_name == 'interface': + device_names = [value for key, op, value in query.filter.to_list() + if key == 'device_name'] + if not device_names: + log.error('No device name in packet=', packet) + return + device_name = device_names[0] + node_name = query.params['node'] + node = ResourceManager().by_name(node_name) + if node is None: + return None + for interface in node.interfaces: + if interface.device_name == device_name: + if interface.channel: + f = Filter.from_list([['id', '==', + interface.channel._state.uuid._uuid]]) + q = Query(ACTION_UPDATE, 'channel', filter = f, + params = query.params) + q.reply = True + self._ws.execute(q) + return None + return None + return None + return None + + def _on_netmon_channel_record(self, packet): + query = packet.to_query() + if query.object_name == 'interface': + device_names = [value for key, op, value in query.filter.to_list() + if key == 'device_name'] + if not device_names: + log.error('No device name in packet=', packet) + return + + device_name = device_names[0] + + f = Filter.from_list([['id', '==', device_name]]) + q = Query(ACTION_UPDATE, 'channel', filter = f, + params = query.params) + q.reply = True + self._ws.execute(q) + return None + + return None + + def _on_vpp_record(self, packet, pylink_id): + query = packet.to_query() + if query.object_name == 'interface': + device_names = [value for key, op, value in query.filter.to_list() + if key == 'device_name'] + if not device_names: + log.error('No device name in packet=', packet) + return + + # We might want to check if the query has SUM(*) + f = Filter.from_list([['id', '==', pylink_id]]) + q = Query(ACTION_UPDATE, 'channel', filter = f, + params = query.params) + q.reply = True + self._ws.execute(q) + return None + + print('discard packet in on_netmon_channel_record', query) + return None + + #-------------------------------------------------------------------------- + # Resource management + #-------------------------------------------------------------------------- + + def create_from_dict(self, **resource): + resource_type = resource.pop('type', None) + assert resource_type + + return self.create(resource_type.lower(), **resource) + + def create(self, resource_type, **attributes): + cls = self._available.get(resource_type) + if not cls: + raise Exception("Ignored resource with unknown type %s: %r" % + (resource_type, attributes)) + + resource = cls(**attributes) + + name = attributes.get('name', None) + if name: + self._map_uuid_name[resource._state.uuid] = name + self._map_name_uuid[name] = resource._state.uuid + + return resource + + def get_resource_type_names(self): + return ResourceFactory().get_available_resources().keys() + + def add_resource(self, instance, name = None): + instance._state = InstanceState(self, instance, name = name) + + self._resources[instance._state.uuid] = instance + self._map_str_uuid[instance._state.uuid._uuid] = instance._state.uuid + self._deps = None + + def commit_resource(self, resource): + """ + Committing a resource creates an asyncio function implementing a state + management automaton. + """ + asyncio.ensure_future(self._process_resource(resource)) + + def commit(self): + """ + Commit all resource whose owner is not set, and mark unmanaged + resources as clean. + + This function is used at initialization. + """ + + # Start FSM for all managed resources + for resource in self.get_resources(): + if resource.owner is not None: + continue + if resource.managed == False: + asyncio.ensure_future(self._set_resource_state(resource, + ResourceState.CLEAN)) + continue + + self.commit_resource(resource) + + def setup(self, commit=False): + """ + This function is in charge of setting up all resources needed by the + experiment. Since it might be a long process, it should be asynchronous + at some point. So far, we let resources take care of this by themselves. + """ + self._auto_commit = commit + if commit: + self.commit() + + def get_resource_with_capabilities(self, cls, capabilities): + if '__type__' in cls.__dict__ and cls.__type__ == FactoryResource: + candidates = inheritors(cls) + if not candidates: + log.error('Abstract resource with no candidates: %s', + cls.__name__) + return None + + for delegate in candidates: + if capabilities and (not '__capabilities__' in vars(delegate) + or not capabilities.issubset(delegate.__capabilities__)): + continue + log.info("Abstract resource %s, delegated %s among %r" % \ + (cls.__name__, delegate.__name__, candidates)) + return delegate + return None + else: + if capabilities and (not '__capabilities__' in vars(delegate) or + not capabilities.issubset(delegate.__capabilities__)): + log.error('Capabilities conflict for resource : %s', + cls.__name__) + raise VICNException + return cls + + def find(self, resource_tuple): + cls, attr_dict = resource_tuple + for instance in self.by_type(cls): + cur_attr_dict = instance._get_attribute_dict() + common_keys = [k for k in cur_attr_dict.keys() + if k in attr_dict.keys()] + if all(attr_dict[k] == cur_attr_dict[k] for k in common_keys): + return instance + return None + + def __iter__(self): + for resource in self._resources.values(): + yield resource + + def resources(self): + return list(self.__iter__()) + + def _sort_resources(self): + deps = {} + for instance in self.resources(): + deps[instance._state.uuid] = \ + instance.get_dependencies(allow_unresolved = True) + + self._deps = toposort_flatten(deps) + + def _sorted_resources(self): + """ + Iterates on resources based on their dependencies + """ + if not self._deps: + self._sort_resources() + for dep in self._deps: + try: + yield self._resources[dep] + except KeyError: + log.error('Dependency not found : {}'.format(dep)) + raise InvalidResource + + def sorted_resources(self): + return list(self._sorted_resources()) + + #-------------------------------------------------------------------------- + # Queries + #-------------------------------------------------------------------------- + + def by_uuid(self, uuid): + return self._resources.get(uuid) + + def by_uuid_str(self, uuid_str): + uuid = self._map_str_uuid.get(uuid_str) + return self._resources.get(uuid) + + def by_name(self, name): + uuid = self._map_name_uuid.get(name) + return self.by_uuid(uuid) + + def get_resources(self): + return self._resources.values() + + def get_aggregates(self, resource_name, resource_cls): + """ + Get aggregated object. + """ + if not resource_name in self._aggregates: + return None + all_aggregates = self._aggregates[resource_name] + + if not resource_cls in all_aggregates: + return None + aggregates = all_aggregates[resource_cls] + + assert all(isinstance(x, resource_cls) for x in aggregates) + return aggregates + + def get_aggregate(self, resource_name, resource_cls): + aggregates = self.get_aggregates(resource_name, resource_cls) + if not aggregates: + return None + assert len(aggregates) == 1 + return next(aggregates) + + def by_type(self, type): + return [r for r in self if isinstance(r, type)] + + def by_type_str(self, typestr): + cls = self._available.get(typestr.lower()) + if not cls: + return list() + return self.by_type(cls) + + #-------------------------------------------------------------------------- + # Requirements + #-------------------------------------------------------------------------- + + def add_instance_requirement(self, instance, requirement): + uuid = instance._state.uuid + if not uuid in self._instance_requirements: + self._instance_requirements[uuid] = list() + self._instance_requirements[uuid].append(requirement) + + def get_instance_requirements(self, instance): + uuid = instance._state.uuid + return self._instance_requirements.get(uuid, dict()) + + def add_class_requirement(self, cls, requirement): + if not cls in self._class_requirements: + self._class_requirements[cls] = list() + self._class_requirements[cls].append(requirement) + + #-------------------------------------------------------------------------- + # Events + #-------------------------------------------------------------------------- + + def on(self, resource_name, event, action): + resource = self._resources.get(resource_name, None) + if not resource: + return + + resource.on(event, action) + + #-------------------------------------------------------------------------- + # Task management + #-------------------------------------------------------------------------- + + def schedule(self, task): + if task is None or isinstance(task, EmptyTask): + return + self._task_mgr.schedule(task) + + #-------------------------------------------------------------------------- + # Asynchronous resource API + # + # The manager is the only one to submit tasks to the scheduler since it can + # store and share the results, manage concurrent access, etc. + # As many functions are not thread safe, we make sure that they are all + # executed in the manager's thread (=main thread). + #-------------------------------------------------------------------------- + + async def resource_exits(self, resource): + await self._resource_get() + await self.wait_resource_exists(resource) + return resource._state.exists + + async def wait_attr_init(self, resource, attribute_name): + await resource._state.attr_init[attribute_name].wait() + + async def wait_attr_clean(self, resource, attribute_name): + await resource._state.attr_clean[attribute_name].wait() + + async def attribute_get(self, resource, attribute, value): + await self.wait_attr_init(resource, attribute) + return resource.get(attribute) + + async def attribute_set(self, resource, attribute_name, value, + blocking=True): + with await resource._state.write_lock: + # Add the current operation to the pending list + # NOTE: collections are unordered and can be updated concurrently + #self._attribute_set_pending_value(resource, attribute_name) + resource._state.dirty[attribute_name].trigger(Operations.SET, + value) + + attr_state = resource._state.attr_state[attribute_name] + if attr_state == AttributeState.CLEAN: + resource._state.attr_state[attribute_name] = \ + AttributeState.DIRTY + elif attr_state in [ + # Nothing to do since we know the attribute value will be + # processed later. + # If the attribute was not processed by default, we would have + # to change the state of the attribute so that it gets + # processed. + AttributeState.UNINITIALIZED, + AttributeState.INITIALIZED, + AttributeState.PENDING_INIT, + AttributeState.DIRTY]: + pass + else: + # We cannot have the lock for instance if the attribute is + # being updated. + raise RuntimeError + + resource_state = resource._state.state + if resource_state == ResourceState.CLEAN: + resource._state.state = ResourceState.DIRTY + resource._state.change_event.set() + elif resource_state in [ + ResourceState.UNINITIALIZED, + ResourceState.INITIALIZED, + ResourceState.PENDING_KEYS, + ResourceState.KEYS_OK, + ResourceState.PENDING_DEPS, + ResourceState.DEPS_OK, + ResourceState.PENDING_CREATE, + ResourceState.CREATED, + ResourceState.DIRTY, + ]: + pass # Nothing to do, the attribute will get processed + else: + # ResourceState.PENDING_UPDATE + # other + raise RuntimeError("Resource cannot be in state".format( + resource_state)) + + if blocking: + await self.wait_attr_clean(resource, attribute_name) + + #--------------------------------------------------------------------------- + # Resource dependency management + #--------------------------------------------------------------------------- + + async def _resource_wait_attributes(self, resource): + """Check dependencies and requirements + + Inspect all attributes for referenced resources, and their eventual + requirements. + """ + self.log(resource, ' - Waiting for attribute dependencies...') + for attr in resource.iter_attributes(): + if issubclass(attr.type, Resource): + deps = resource.get(attr.name) + if deps is None: + # Not really a dependency, we expect mandatory to prevent + # us to continue if we should not + continue + + if not attr.is_collection: + deps = [deps] + + for dep in deps: + # XXX This could be done in parallel + if not dep.managed: + continue + dep_pfx = '{}:{}'.format(dep.get_type(), dep.get_uuid()) + self.log(resource, S_WAIT_DEP. format(dep_pfx)) + await wait_resource(dep) + self.log(resource, S_WAIT_DEP_OK. format(dep_pfx)) + + if not attr.requirements: + continue + + for req in attr.requirements: + dep_attr_name = req.requirement_type + dep_attr = dep.get_attribute(dep_attr_name) + assert issubclass(dep_attr.type, Resource) + dep_attr_value = dep.get(dep_attr_name) + + if not dep_attr_value: + dep_attr_value = dep.auto_instanciate(dep_attr) + setattr(dep, dep_attr_name, dep_attr_value) + + dep_attr_value_pfx = '{}:{}'.format( + dep_attr_value.get_type(), + dep_attr_value.get_uuid()) + self.log(resource, + S_WAIT_DEP.format(dep_attr_value_pfx)) + await wait_resource(dep_attr_value) + self.log(resource, + S_WAIT_DEP_OK .format(dep_attr_value_pfx)) + + async def _resource_wait_predecessors(self, resource): + after = resource.__after__() + if after: + self.log(resource, ' - Waiting for predecessors...') + for resource_type in after: + self.log(resource, ' . AFTER TYPE={}'.format(resource_type)) + befores = resource._state.manager.by_type_str(resource_type) + for before in befores: + if not before.managed: + continue + before_pfx = '{}:{}'.format(before.get_type(), + before.get_uuid()) + self.log(resource, S_WAIT.format(before_pfx)) + await wait_resource(before) + self.log(resource, S_WAIT_OK.format(before_pfx)) + + after_init = resource.__after_init__() + if after_init: + self.log(resource, S_WAIT_PRED) + for resource_type in after_init: + self.log(resource, S_AFTER.format(resource_type)) + befores = resource._state.manager.by_type_str(resource_type) + for before in befores: + if not before.managed: + continue + before_pfx = '{}:{}'.format(before.get_type(), + before.get_uuid()) + self.log(resource, S_WAIT.format(before_pfx)) + await wait_resource_init(before) + self.log(resource, S_WAIT_OK.format(before_pfx)) + + async def _resource_wait_subresources(self, resource): + self.log(resource, S_WAIT_SRS) + + # We should accumulate subresources through the hierarchy + sr = EmptyResource() + for base in reversed(resource.__class__.mro()): + if '__subresources__' not in vars(base): + continue + sr = sr > base.__subresources__(resource) + + if sr is not None and not isinstance(sr, EmptyResource): + resource.set_subresources(sr) + pfx_sr = '{}:{}'.format(sr.get_type(), sr.get_uuid()) + self.log(resource, S_REG_SR .format(pfx_sr)) + await sr.async_commit_to_manager(self) + self.log(resource, S_WAIT_SR.format(pfx_sr)) + await wait_resource(sr) + self.log(resource, S_WAIT_SR_OK.format(pfx_sr)) + + async def _resource_wait_dependencies(self, resource): + self.log(resource, 'Waiting for dependencies...') + await self._resource_wait_attributes(resource) + await self._resource_wait_predecessors(resource) + await self._resource_wait_subresources(resource) + + def _task_resource_action(self, resource, action): + """Perform action: __get__, __create__, __delete__ on the full class + hierarchy. + """ + task = EmptyTask() + for base in reversed(resource.__class__.mro()): + # To avoid adding several times the same task + if action not in vars(base): + continue + func = getattr(base, action, None) + if func is None: + continue + t = func(resource) + + task = task > t + + return task + + #-------------------------------------------------------------------------- + # Resource model + #-------------------------------------------------------------------------- + + def _task_attribute_op(self, resource, attribute, op): + return getattr(resource, '_{}_{}'.format(op, attribute.name))() + + #-------------------------------------------------------------------------- + # Attribute FSM + #-------------------------------------------------------------------------- + + def _attribute_is_dirty(self, resource, attribute): + """ + Precondition: + Attribute has been retrieved + """ + pending_value = resource._state.dirty[attribute.name] + return pending_value.value != NEVER_SET + + async def __set_attribute_state(self, resource, attribute_name, state): + """Sets the resource state (no-lock version) + + It is important to centralize state change since some states are + associated with Events(). + """ + resource._state.attr_state[attribute_name] = state + if state in [ + AttributeState.INITIALIZED, + AttributeState.CLEAN, + AttributeState.DIRTY + ]: + resource._state.attr_init[attribute_name].set() + else: + raise RuntimeError("Inconsistent resource state {}".format(state)) + + if state in [AttributeState.CLEAN]: + resource._state.attr_clean[attribute_name].set() + elif state in [ + AttributeState.INITIALIZED, + AttributeState.DIRTY + ]: + resource._state.attr_clean[attribute_name].clear() + else: + raise RuntimeError + + async def _set_attribute_state(self, resource, attribute_name, state): + """Sets the attribute state (lock version) + """ + with await resource._state.attr_lock[attribute_name]: + await self.__set_attribute_state(resource, attribute_name, state) + + def _trigger_attr_state_change(self, resource, attribute, fut): + try: + ret = fut.result() + resource._state.attr_change_success[attribute.name] = True + resource._state.attr_change_value[attribute.name] = ret + except Exception as e: + resource._state.attr_change_success[attribute.name] = False + resource._state.attr_change_value[attribute.name] = e + resource._state.attr_change_event[attribute.name].set() + + async def attribute_process(self, resource, attribute): + """ + Temporary FSM executing in parallel for attribute management. Those FSM + are under the responsability of the main resource FSM. + + Precondition: + Attribute state is initialized + """ + self.attr_log(resource, attribute, + 'Starting attribute FSM for {}'.format(attribute.name)) + + new_state = None + while new_state != AttributeState.CLEAN: + #with await resource._state.attr_lock[attribute.name]: + state = resource._state.attr_state[attribute.name] + self.attr_log(resource, attribute, + 'Current state is {}'.format(state)) + + if resource._state.attr_change_success == False: + log.error('Attribute error') + e = resource._state.attr_change_value[attribute.name] + import traceback; traceback.print_tb(e.__traceback__) + raise NotImplementedError + + # Signal update errors to the parent resource + resource._state.attr_change_event[attribute.name].set() + + elif state == AttributeState.UNINITIALIZED: + pending_state = AttributeState.PENDING_INIT + elif state in AttributeState.INITIALIZED: + pending_state = AttributeState.PENDING_UPDATE + elif state == AttributeState.DIRTY: + pending_state = AttributeState.PENDING_UPDATE + elif state in [ + AttributeState.PENDING_INIT, + AttributeState.PENDING_UPDATE + ]: + # Nothing to do + pending_state = None + elif state == AttributeState.CLEAN: + return + else: + raise RuntimeError + + if pending_state is None: + self.attr_log(resource, attribute, + 'Nothing to do. Waiting for event...') + await resource._state.attr_change_event[attribute.name].wait() + resource._state.attr_change_event[attribute.name].clear() + self.attr_log(resource, attribute, 'Wake up from event') + continue + + if pending_state == AttributeState.PENDING_INIT: + task = self._task_attribute_op(resource, attribute, 'get') + elif pending_state == AttributeState.PENDING_UPDATE: + pending_value = resource._state.dirty[attribute.name] + + if pending_value.value == NEVER_SET: + assert len(pending_value.operations) == 0 + task = EmptyTask() + else: + try: + task = self._task_attribute_op(resource, attribute, + Operations.SET) + except Exception as e: + log.warning('No attribute setter attribute {}'.format( + attribute)) + task = EmptyTask() + else: + raise RuntimeError + + if task is not None and not isinstance(task, EmptyTask): + state_change = functools.partial( \ + self._trigger_attr_state_change, resource, attribute) + task.add_done_callback(state_change) + self.attr_log(resource, attribute, + 'Trigger {} -> {}. Waiting task completion'.format( + state, pending_state)) + self.schedule(task) + + await resource._state.attr_change_event[attribute.name].wait() + resource._state.attr_change_event[attribute.name].clear() + self.attr_log(resource, attribute, + 'Completed {} -> {}. Success = {}'.format( + state, pending_state, + resource._state.attr_change_success[attribute.name])) + else: + # If this value is not reset, attributes get updated many times + resource._state.attr_change_value[attribute.name] = NEVER_SET + + if pending_state == AttributeState.PENDING_INIT: + if resource._state.attr_change_success[attribute.name] == True: + attrs = resource._state.attr_change_value[attribute.name] + self.attr_log(resource, attribute, + 'INIT success. Value = {}'.format(attrs)) + found = self._process_attr_dict(resource, attribute, attrs) + if not found: + log.error('Attribute missing return attrs: {}'.format( + attrs)) + found = self._process_attr_dict(resource, attribute, + attrs) + new_state = AttributeState.INITIALIZED + else: + attrs = resource._state.attr_change_value[attribute.name] + self.attr_log(resource, attribute, + 'INIT gave no value. Value = {}'.format(attrs)) + new_state = AttributeState.INITIALIZED + + elif pending_state == AttributeState.PENDING_UPDATE: + if resource._state.attr_change_success[attribute.name] == True: + attrs = resource._state.attr_change_value[attribute.name] + self.attr_log(resource, attribute, + 'UPDATE success. Value = {}. Attribute is CLEAN'.format(attrs)) + if attrs != NEVER_SET: + # None could be interpreted as the return value. Also, + # we need not to overwrite the value from get + self._process_attr_dict(resource, attribute, attrs) + + # We might do this for all returned attributes + cur_value = vars(resource)[attribute.name] + if attribute.is_collection: + tmp = InstrumentedList(pending_value.value) + tmp._attribute = cur_value._attribute + tmp._instance = cur_value._instance + else: + tmp = pending_value.value + vars(resource)[attribute.name] = tmp + pending_value.clear() + + new_state = AttributeState.CLEAN + else: + log.error('Attribute error') + e = resource._state.attr_change_value[attribute.name] + import traceback; traceback.print_tb(e.__traceback__) + raise NotImplementedError + + else: + raise RuntimeError + + # Setting attribute state + await self._set_attribute_state(resource, attribute.name, + new_state) + + #-------------------------------------------------------------------------- + # Resource FSM + #-------------------------------------------------------------------------- + + def parse_query(self, line): + dic = SQLParser().parse(line) + if not dic: + raise RuntimeError("Can't parse input command: %s" % command) + + return Query.from_dict(dic) + + def _monitor_netmon(self, resource): + ip = resource.node.host_interface.ip_address + if not ip: + log.error('IP of monitored Node is None') + import os; os._exit(1) + + ws = self._router.add_interface('websocketclient', address=ip, + hook=self._on_netmon_record) + + node = resource.node + for interface in node.interfaces: + if not interface.monitored: + continue + + if interface.get_type() == 'dpdkdevice' and hasattr(node,'vpp'): + + # Check if vICN has already subscribed for one interface in + # the channel + if hasattr(interface.channel,'already_subscribed'): + continue + + channel_id = interface.channel._state.uuid._uuid + + update_vpp = functools.partial(self._on_vpp_record, + pylink_id = channel_id) + ws_vpp = self._router.add_interface('websocketclient', + address=ip, hook=update_vpp) + + aggregate_interfaces = list() + for _interface in node.interfaces: + if not _interface.get_type() == 'dpdkdevice' and \ + _interface.monitored: + aggregate_interfaces.append('"' + + _interface.device_name + '"') + + q_str = Q_SUB_VPP.format(','.join(aggregate_interfaces)) + q = self.parse_query(q_str) + packet = Packet.from_query(q) + self._router._flow_table.add(packet, None, ws_vpp) + ws_vpp.send(packet) + + # Prevent vICN to subscribe to other interfaces of the same + # channel + interface.channel.already_subscribed = True + + else: + q_str = Q_SUB_IF.format(interface.device_name) + q = self.parse_query(q_str) + packet = Packet.from_query(q) + self._router._flow_table.add(packet, None, ws) + ws.send(packet) + + def _monitor_emulator(self, resource): + ns = resource + ip = ns.node.bridge.ip_address # host_interface.ip_address + + ws_ns = self._router.add_interface('websocketclient', address = ip, + port = ns.control_port, + hook = self._on_ns_record) + ws = self._router.add_interface('websocketclient', address = ip, + hook = self._on_netmon_channel_record) + + for station in ns.stations: + if not station.managed: + interface = [i for i in station.interfaces if i.channel == ns] + assert len(interface) == 1 + interface = interface[0] + identifier = interface.name + else: + iface = ns._sta_ifs[station] + identifier = iface._state.uuid._uuid + + # Monitor the wireless channel for position and link rate + q_str = Q_SUB_EMULATOR_IF.format(identifier) + q = self.parse_query(q_str) + packet = Packet.from_query(q) + self._router._flow_table.add(packet, None, ws_ns) + ws_ns.send(packet) + + # We also need to subscribe on the node for the tap interfaces + # for individual bandwidth monitoring + tap = ns._sta_taps[station] + q_str = Q_SUB_EMULATOR.format(tap.device_name) + q = self.parse_query(q_str) + packet = Packet.from_query(q) + self._router._flow_table.add(packet, None, ws) + ws.send(packet) + + def _monitor(self, resource): + if resource.get_type() == 'centralip': + for uuid in self._pending_monitoring: + pending = self.by_uuid(uuid) + self._monitor(pending) + self._pending_monitoring.clear() + return + + central_ip = self.by_type_str('centralip') + if not central_ip: + raise NotImplementedError('Missing CentralIP in experiment') + central_ip = central_ip[0] + + uuid = resource.get_uuid() + + if central_ip._state.state != ResourceState.CLEAN: + self._pending_monitoring.add(uuid) + return + + if uuid in self._monitored: + return + self._monitored.add(uuid) + + if resource.get_type() == 'netmon': + if resource.node.get_type() != 'lxccontainer': + return + self._monitor_netmon(resource) + + elif resource.has_type('emulatedchannel'): + self._monitor_emulator(resource) + + async def __set_resource_state(self, resource, state): + """Sets the resource state (no-lock version) + + It is important to centralize state change since some states are + associated with Events(). + """ + resource._state.state = state + if state == ResourceState.CLEAN: + # Monitoring hook + self._monitor(resource) + resource._state.clean.set() + else: + resource._state.clean.clear() + if state == ResourceState.INITIALIZED: + resource._state.init.set() + + async def _set_resource_state(self, resource, state): + """Sets the resource state (lock version) + """ + with await resource._state.lock: + await self.__set_resource_state(resource, state) + + def _trigger_state_change(self, resource, fut): + try: + ret = fut.result() + resource._state.change_success = True + resource._state.change_value = ret + except Exception as e: + resource._state.change_success = False + resource._state.change_value = e + resource._state.change_event.set() + + def _process_attr_dict(self, resource, attribute, attrs): + if not isinstance(attrs, dict): + if attribute is None: + return False + attrs = {attribute.name: attrs} + resource.set_many(attrs, current=True) + return True + + async def _task_resource_update(self, resource): + # Monitor all FSM one by one and inform about errors. + futs = list() + attrs = list() + for attr in resource.iter_attributes(): + if resource.is_local_attribute(attr.name): + continue + if attr.key: + # Those attributes are already done + continue + + attrs.append(attr) + fut = self.attribute_process(resource, attr) + futs.append(fut) + + if not futs: + self.log(resource, 'No attribute to update') + return None + + await asyncio.gather(*futs) + + # Inform the resource about the outcome of the update process + # Error if at least one attribute failed. + resource._state.change_success = all( + resource._state.attr_change_success[attr.name] + for attr in attrs) + self.log(resource, + 'All attributes FSM terminated with success={}'.format( + resource._state.change_success)) + + if resource._state.change_success: + ret = [ resource._state.attr_change_value[attr.name] + for attr in attrs] + return ret + else: + raise NotImplementedError('At least one attribute failed') + + async def _task_resource_keys(self, resource): + # Monitor all FSM one by one and inform about errors. + futs = list() + attrs = list() + for attr in resource.get_keys(): + if resource.is_local_attribute(attr.name): + continue + attrs.append(attr) + fut = self.attribute_process(resource, attr) + futs.append(fut) + + if not futs: + self.log(resource, 'No key attribute to update') + return None + + await asyncio.gather(*futs) + + # Inform the resource about the outcome of the update process + # Error if at least one attribute failed. + resource._state.change_success = all( + resource._state.attr_change_success[attr.name] + for attr in attrs) + self.log(resource, + 'KEY attributes FSM terminated with success={}'.format( + resource._state.change_success)) + + if resource._state.change_success: + ret = resource._state.attr_change_value + return ret + else: + raise NotImplementedError('At least one attribute failed') + + #-------------------------------------------------------------------------- + # Logging + #-------------------------------------------------------------------------- + + def log(self, resource, msg=None): + resource._state.log.append(msg) + + # Display on screen + #pfx = '[{}] {}: '.format(resource.get_type(), resource.get_uuid()) + #print(pfx, msg) + + def attr_log(self, resource, attribute, msg): + resource._state.attr_log[attribute.name].append(msg) + + # Display on screen + #pfx = '[{}] {} / {}: '.format(resource.get_type(), resource.get_uuid(), + # attribute.name) + #print(pfx, msg) + + #-------------------------------------------------------------------------- + + async def _process_resource(self, resource): + """ + We need to schedule the first set of subresources, knowing others will + be orchestrated by the operators + - subresources need to enter the system in order + -> we just add them to the manager in time + - but they need to be managed by the system + - in particular, the owner waits for the system to complete + subresoruces: this is the implementation of __get__ __create__ + __delete__ in the base resource + """ + pfx = '[{}] {}: '.format(resource.get_type(), resource.get_uuid()) + + self.log(resource, 'Starting FSM...') + + # When a resource is managed, it will get automatically monitored by + # adding the netmon resource on it. + from vicn.resource.node import Node + if resource.get_type() == 'lxccontainer': + self.log(resource, + 'Associating monitoring to lxc container resource...') + instance = self.create('netmon', node=resource) + self.commit_resource(instance) + + # FIXME + elif resource.get_type() == 'physical' and resource.managed and \ + len(self.by_type_str('emulatedchannel')) > 0: + self.log(resource, + 'Associating monitoring to physical node resource...') + instance = self.create('netmon', node=resource) + self.commit_resource(instance) + + state = None + + while True: + with await resource._state.lock: + + # FSM implementation + state = resource._state.state + self.log(resource, 'Current state is {}'.format(state)) + + if resource._state.change_success == False: + e = resource._state.change_value + import traceback; traceback.print_tb(e.__traceback__) + raise NotImplementedError + + elif state == ResourceState.UNINITIALIZED: + pending_state = ResourceState.PENDING_DEPS + elif state == ResourceState.DEPS_OK: + pending_state = ResourceState.PENDING_INIT + + elif state == ResourceState.INITIALIZED: + pending_state = ResourceState.PENDING_GET + + elif state == ResourceState.GET_DONE: + if resource.get_keys(): + pending_state = ResourceState.PENDING_KEYS + else: + pending_state = ResourceState.PENDING_CREATE + + elif state == ResourceState.KEYS_OK: + pending_state = ResourceState.PENDING_CREATE + + elif state in [ResourceState.CREATED, ResourceState.DIRTY]: + pending_state = ResourceState.PENDING_UPDATE + + elif state == ResourceState.DELETED: + raise NotImplementedError + # Nothing to do unless explicitely requested + pending_state = None + + elif state in [ + ResourceState.PENDING_DEPS, + ResourceState.PENDING_INIT, + ResourceState.PENDING_CREATE, + ResourceState.PENDING_DELETE, + ResourceState.CLEAN + ]: + # Nothing to do + pending_state = None + else: + raise RuntimeError + + # Implement state changes + # + # If a task is already pending, we simply wait for it to complete + if pending_state is None: + # Wait for an external change + self.log(resource, 'Nothing to do. Waiting for event...') + await resource._state.change_event.wait() + self.log(resource, 'Wake up from event') + resource._state.change_event.clear() + continue + + if pending_state == ResourceState.PENDING_DEPS: + # XXX Maybe for any action, we need to wait for dependencies to + # be up to date + task = async_task(functools.partial(self._resource_wait_dependencies, resource))() + + elif pending_state == ResourceState.PENDING_INIT: + task = self._task_resource_action(resource, '__initialize__') + + elif pending_state == ResourceState.PENDING_GET: + task = self._task_resource_action(resource, '__get__') + if isinstance(task, BashTask): + task.set_default_parse_for_get() + + elif pending_state == ResourceState.PENDING_KEYS: + task = async_task(functools.partial(self._task_resource_keys, resource))() + + elif pending_state == ResourceState.PENDING_CREATE: + task = self._task_resource_action(resource, '__create__') + + elif pending_state == ResourceState.PENDING_UPDATE: + # Instead of tasks, we wait for many smaller autoamtons to + # terminate + await resource._state.write_lock.acquire() + task = async_task(functools.partial(self._task_resource_update, resource))() + + elif pending_state == ResourceState.PENDING_DELETE: + task = self._task_resource_action(resource, '__delete__') + + else: + raise RuntimeError + + if task is not None and not isinstance(task, EmptyTask): + state_change = functools.partial(self._trigger_state_change, resource) + task.add_done_callback(state_change) + self.schedule(task) + + self.log(resource, 'Trigger {} -> {}. Waiting task completion'.format( + state, pending_state)) + await resource._state.change_event.wait() + resource._state.change_event.clear() + self.log(resource, 'Completed {} -> {}. Success = {}'.format( + state, pending_state, resource._state.change_success)) + + # If no task, can assume there is an instant switch to the next step... + + # Update state based on task results + if pending_state == ResourceState.PENDING_DEPS: + # XXX NO CHANGE SUCCESS TEST ?? + new_state = ResourceState.DEPS_OK + + elif pending_state == ResourceState.PENDING_INIT: + if resource._state.change_success == True: + attrs = resource._state.change_value + self.log(resource, 'INIT done.') + new_state = ResourceState.INITIALIZED + else: + e = resource._state.change_value + import traceback; traceback.print_tb(e.__traceback__) + raise NotImplementedError + + elif pending_state == ResourceState.PENDING_GET: + if resource._state.change_success == True: + attrs = resource._state.change_value + self.log(resource, S_INIT_DONE.format(attrs)) + self._process_attr_dict(resource, None, attrs) + new_state = ResourceState.CREATED + else: + e = resource._state.change_value + if ENABLE_LXD_WORKAROUND and \ + resource.get_type() != 'lxccontainer' and \ + isinstance(e, LxdNotFound): + # "not found" is the normal exception when the container + # does not exists. anyways the bug should only occur + # with container.execute(), not container.get() + log.error('LXD Fix (not found). Reset resource') + new_state = ResourceState.UNINITIALIZED + elif ENABLE_LXD_WORKAROUND and isinstance(e, LXDAPIException): + # "not found" is the normal exception when the container + # does not exists. anyways the bug should only occur + # with container.execute(), not container.get() + log.error('LXD Fix (API error). Reset resource') + new_state = ResourceState.UNINITIALIZED + elif isinstance(e, ResourceNotFound): + # The resource does not exist + self.log(resource, S_GET_DONE.format( + resource._state.change_value)) + new_state = ResourceState.GET_DONE + resource._state.change_value = None + else: + e = resource._state.change_value + import traceback; traceback.print_tb(e.__traceback__) + raise NotImplementedError + resource._state.change_success = True + + elif pending_state == ResourceState.PENDING_KEYS: + if resource._state.change_success == True: + new_state = ResourceState.KEYS_OK + self.log(resource, S_KEYS_OK) + else: + e = resource._state.change_value + self.log(resource, 'KEYS failed: {}'.format(e)) + + if ENABLE_LXD_WORKAROUND and isinstance(e, LxdNotFound): + log.error('LXD Fix (not found). Reset resource') + new_state = ResourceState.CREATED + resource._state.change_success = True + else: + e = resource._state.change_value + import traceback; traceback.print_tb(e.__traceback__) + raise NotImplementedError + + elif pending_state == ResourceState.PENDING_CREATE: + if resource._state.change_success == True: + attrs = resource._state.change_value + self.log(resource, S_CREATE_OK.format(attrs)) + self._process_attr_dict(resource, None, attrs) + new_state = ResourceState.CREATED + else: + e = resource._state.change_value + + if ENABLE_LXD_WORKAROUND and isinstance(e, LxdNotFound): + log.error('LXD Fix (not found). Reset resource') + new_state = ResourceState.UNINITIALIZED + resource._state.change_success = True + elif ENABLE_LXD_WORKAROUND and \ + isinstance(e, LXDAPIException): + log.error('LXD Fix (API error). Reset resource') + new_state = ResourceState.UNINITIALIZED + resource._state.change_success = True + elif 'File exists' in str(e): + new_state = ResourceState.CREATED + resource._state.change_success = True + elif 'dpkg --configure -a' in str(e): + resource._dpkg_configure_a = True + new_state = ResourceState.UNINITIALIZED + resource._state.change_success = True + else: + self.log(resource, 'CREATE failed: {}'.format(e)) + e = resource._state.change_value + import traceback; traceback.print_tb(e.__traceback__) + raise NotImplementedError + + elif pending_state == ResourceState.PENDING_UPDATE: + if resource._state.change_success == True: + self.log(resource, 'Update finished, resource is CLEAN.') + new_state = ResourceState.CLEAN + resource._state.write_lock.release() + else: + e = resource._state.change_value + self.log(resource, 'UPDATE failed: {}'.format(e)) + + if ENABLE_LXD_WORKAROUND and isinstance(e, LxdNotFound): + log.error('LXD Fix (not found). Reset resource') + new_state = ResourceState.CREATED + resource._state.change_success = True + resource._state.write_lock.release() + else: + e = resource._state.change_value + resource._state.write_lock.release() + import traceback; traceback.print_tb(e.__traceback__) + raise NotImplementedError + + elif pending_state == ResourceState.PENDING_DELETE: + raise NotImplementedError + new_state = None + else: + raise RuntimeError + + await self._set_resource_state(resource, new_state) + |