diff options
Diffstat (limited to 'vicn/core')
-rw-r--r-- | vicn/core/resource_mgr.py | 62 | ||||
-rw-r--r-- | vicn/core/state.py | 2 | ||||
-rw-r--r-- | vicn/core/task.py | 38 |
3 files changed, 72 insertions, 30 deletions
diff --git a/vicn/core/resource_mgr.py b/vicn/core/resource_mgr.py index f6082488..57dcafef 100644 --- a/vicn/core/resource_mgr.py +++ b/vicn/core/resource_mgr.py @@ -44,6 +44,8 @@ from vicn.core.task import EmptyTask, BashTask log = logging.getLogger(__name__) +# NOTE: Do not fully reinitialize a resource after a step fails since it will +# call initialize several times, and might created spurious resources. ENABLE_LXD_WORKAROUND = True # Monitoring queries @@ -148,6 +150,9 @@ class ResourceManager(metaclass=Singleton): # For debug self._committed = set() + self._num = 0 + self._num_clean = 0 + def terminate(self): self._router.terminate() @@ -317,6 +322,7 @@ class ResourceManager(metaclass=Singleton): Committing a resource creates an asyncio function implementing a state management automaton. """ + self._num += 1 asyncio.ensure_future(self._process_resource(resource)) def commit(self): @@ -496,10 +502,10 @@ class ResourceManager(metaclass=Singleton): # Task management #-------------------------------------------------------------------------- - def schedule(self, task): + def schedule(self, task, resource = None): if task is None or isinstance(task, EmptyTask): return - self._task_mgr.schedule(task) + self._task_mgr.schedule(task, resource) #-------------------------------------------------------------------------- # Asynchronous resource API @@ -785,8 +791,10 @@ class ResourceManager(metaclass=Singleton): self.attr_log(resource, attribute, 'Current state is {}'.format(state)) + # AttributeState.ERROR if resource._state.attr_change_success == False: - log.error('Attribute error') + log.error('Attribute error {} for resource {}'.format( + resource.get_uuid(), attribute.name)) e = resource._state.attr_change_value[attribute.name] import traceback; traceback.print_tb(e.__traceback__) raise NotImplementedError @@ -794,7 +802,7 @@ class ResourceManager(metaclass=Singleton): # Signal update errors to the parent resource resource._state.attr_change_event[attribute.name].set() - elif state == AttributeState.UNINITIALIZED: + if state == AttributeState.UNINITIALIZED: pending_state = AttributeState.PENDING_INIT elif state in AttributeState.INITIALIZED: pending_state = AttributeState.PENDING_UPDATE @@ -898,10 +906,10 @@ class ResourceManager(metaclass=Singleton): new_state = AttributeState.CLEAN else: - log.error('Attribute error') + log.error('Attribute error {} for resource {}'.format( + resource.get_uuid(), attribute.name)) e = resource._state.attr_change_value[attribute.name] - import traceback; traceback.print_tb(e.__traceback__) - raise NotImplementedError + new_state = AttributeState.ERROR else: raise RuntimeError @@ -1046,13 +1054,20 @@ class ResourceManager(metaclass=Singleton): It is important to centralize state change since some states are associated with Events(). """ + prev_state = resource._state.state resource._state.state = state if state == ResourceState.CLEAN: # Monitoring hook self._monitor(resource) resource._state.clean.set() + if prev_state != ResourceState.CLEAN: + self._num_clean += 1 + log.info("Resource {} is marked as CLEAN ({}/{})".format( + resource.get_uuid(), self._num_clean, self._num)) else: resource._state.clean.clear() + if prev_state == ResourceState.CLEAN: + self._num_clean -= 1 if state == ResourceState.INITIALIZED: resource._state.init.set() @@ -1211,12 +1226,7 @@ class ResourceManager(metaclass=Singleton): state = resource._state.state self.log(resource, 'Current state is {}'.format(state)) - if resource._state.change_success == False: - e = resource._state.change_value - import traceback; traceback.print_tb(e.__traceback__) - raise NotImplementedError - - elif state == ResourceState.UNINITIALIZED: + if state == ResourceState.UNINITIALIZED: pending_state = ResourceState.PENDING_DEPS elif state == ResourceState.DEPS_OK: pending_state = ResourceState.PENDING_INIT @@ -1296,9 +1306,10 @@ class ResourceManager(metaclass=Singleton): raise RuntimeError if task is not None and not isinstance(task, EmptyTask): + resource._state.change_success = None # undetermined state state_change = functools.partial(self._trigger_state_change, resource) task.add_done_callback(state_change) - self.schedule(task) + self.schedule(task, resource) self.log(resource, 'Trigger {} -> {}. Waiting task completion'.format( state, pending_state)) @@ -1321,8 +1332,8 @@ class ResourceManager(metaclass=Singleton): new_state = ResourceState.INITIALIZED else: e = resource._state.change_value - import traceback; traceback.print_tb(e.__traceback__) - raise NotImplementedError + log.error('Cannot setup resource {} : {}'.format( + resource.get_uuid(), e)) elif pending_state == ResourceState.PENDING_GET: if resource._state.change_success == True: @@ -1339,13 +1350,13 @@ class ResourceManager(metaclass=Singleton): # does not exists. anyways the bug should only occur # with container.execute(), not container.get() log.error('LXD Fix (not found). Reset resource') - new_state = ResourceState.UNINITIALIZED + new_state = ResourceState.INITIALIZED elif ENABLE_LXD_WORKAROUND and isinstance(e, LXDAPIException): # "not found" is the normal exception when the container # does not exists. anyways the bug should only occur # with container.execute(), not container.get() log.error('LXD Fix (API error). Reset resource') - new_state = ResourceState.UNINITIALIZED + new_state = ResourceState.INITIALIZED elif isinstance(e, ResourceNotFound): # The resource does not exist self.log(resource, S_GET_DONE.format( @@ -1354,8 +1365,9 @@ class ResourceManager(metaclass=Singleton): resource._state.change_value = None else: e = resource._state.change_value - import traceback; traceback.print_tb(e.__traceback__) - raise NotImplementedError + log.error('Cannot get resource state {} : {}'.format( + resource.get_uuid(), e)) + new_state = ResourceState.ERROR resource._state.change_success = True elif pending_state == ResourceState.PENDING_KEYS: @@ -1372,8 +1384,8 @@ class ResourceManager(metaclass=Singleton): resource._state.change_success = True else: e = resource._state.change_value - import traceback; traceback.print_tb(e.__traceback__) - raise NotImplementedError + log.error('Cannot create resource {} : {}'.format( + resource.get_uuid(), e)) elif pending_state == ResourceState.PENDING_CREATE: if resource._state.change_success == True: @@ -1386,19 +1398,19 @@ class ResourceManager(metaclass=Singleton): if ENABLE_LXD_WORKAROUND and isinstance(e, LxdNotFound): log.error('LXD Fix (not found). Reset resource') - new_state = ResourceState.UNINITIALIZED + new_state = ResourceState.INITIALIZED resource._state.change_success = True elif ENABLE_LXD_WORKAROUND and \ isinstance(e, LXDAPIException): log.error('LXD Fix (API error). Reset resource') - new_state = ResourceState.UNINITIALIZED + new_state = ResourceState.INITIALIZED resource._state.change_success = True elif 'File exists' in str(e): new_state = ResourceState.CREATED resource._state.change_success = True elif 'dpkg --configure -a' in str(e): resource._dpkg_configure_a = True - new_state = ResourceState.UNINITIALIZED + new_state = ResourceState.INITIALIZED resource._state.change_success = True else: self.log(resource, 'CREATE failed: {}'.format(e)) diff --git a/vicn/core/state.py b/vicn/core/state.py index d5069b24..c32b8237 100644 --- a/vicn/core/state.py +++ b/vicn/core/state.py @@ -46,6 +46,7 @@ class ResourceState: PENDING_UPDATE = 'PENDING_UPDATE' PENDING_DELETE = 'PENDING_DELETE' DELETED = 'DELETED' + ERROR = 'ERROR' class AttributeState: UNINITIALIZED = 'UNINITIALIZED' @@ -54,6 +55,7 @@ class AttributeState: PENDING_INIT = 'PENDING_INIT' PENDING_UPDATE = 'PENDING_UPDATE' CLEAN = 'CLEAN' + ERROR = 'ERROR' class Operations: SET = 'set' diff --git a/vicn/core/task.py b/vicn/core/task.py index 2e9bc275..5aecb40b 100644 --- a/vicn/core/task.py +++ b/vicn/core/task.py @@ -34,7 +34,7 @@ log = logging.getLogger(__name__) EXECUTOR=concurrent.futures.ThreadPoolExecutor #EXECUTOR=concurrent.futures.ProcessPoolExecutor -MAX_WORKERS=50 # None +MAX_WORKERS = 50 # None class BaseTask: """Base class for all tasks @@ -171,6 +171,27 @@ def get_attributes_task(resource, attribute_names): return {attribute_name: ret} return func() +def _get_func_desc(f): + """ + Returns a string representation of a function for logging purposes. + + Todo: args and keywords (including from partial) + """ + partial = isinstance(f, functools.partial) + if partial: + f = f.func + + s = '' + if hasattr(f, '__name__'): + s += f.__name__ + if hasattr(f, '__doc__') and f.__doc__: + if s: + s += ' : ' + s += f.__doc__ + + return 'partial<{}>'.format(s) if partial else s + + class PythonTask(Task): def __init__(self, func, *args, **kwargs): super().__init__() @@ -197,8 +218,8 @@ class PythonTask(Task): fut.add_done_callback(self._done_callback) def __repr__(self): - return '<Task[py] {} / {} {}>'.format(self._func, self._args, - self._kwargs) + s = _get_func_desc(self._func) + return '<Task[py] {}>'.format(s) if s else '<Task[py]>' class PythonAsyncTask(PythonTask): async def execute(self, *args, **kwargs): @@ -213,7 +234,8 @@ class PythonAsyncTask(PythonTask): fut.add_done_callback(self._done_callback) def __repr__(self): - return '<Task[apy]>' + s = _get_func_desc(self._func) + return '<Task[apy] {}>'.format(s) if s else '<Task[apy]>' class PythonInlineTask(PythonTask): async def execute(self, *args, **kwargs): @@ -229,6 +251,10 @@ class PythonInlineTask(PythonTask): self._future.set_exception(e) return self._future + def __repr__(self): + s = _get_func_desc(self._func) + return '<Task[ipy] {}>'.format(s) if s else '<Task[ipy]>' + class BashTask(Task): def __init__(self, node, cmd, parameters=None, parse=None, as_root=False, output=False, pre=None, post=None, lock=None): @@ -339,12 +365,14 @@ class TaskManager: loop = asyncio.get_event_loop() loop.set_default_executor(executor) - def schedule(self, task): + def schedule(self, task, resource = None): """All instances of BaseTask can be scheduled Here we might decide to do more advanced scheduling, like merging bash tasks, etc. thanks to the task algebra. """ + uuid = resource.get_uuid() if resource else '(unknown)' + log.info('Scheduling task {} for resource {}'.format(task, uuid)) asyncio.ensure_future(task.execute()) @task |