aboutsummaryrefslogtreecommitdiffstats
path: root/vicn/core
diff options
context:
space:
mode:
Diffstat (limited to 'vicn/core')
-rw-r--r--vicn/core/resource_mgr.py62
-rw-r--r--vicn/core/state.py2
-rw-r--r--vicn/core/task.py38
3 files changed, 72 insertions, 30 deletions
diff --git a/vicn/core/resource_mgr.py b/vicn/core/resource_mgr.py
index f6082488..57dcafef 100644
--- a/vicn/core/resource_mgr.py
+++ b/vicn/core/resource_mgr.py
@@ -44,6 +44,8 @@ from vicn.core.task import EmptyTask, BashTask
log = logging.getLogger(__name__)
+# NOTE: Do not fully reinitialize a resource after a step fails since it will
+# call initialize several times, and might created spurious resources.
ENABLE_LXD_WORKAROUND = True
# Monitoring queries
@@ -148,6 +150,9 @@ class ResourceManager(metaclass=Singleton):
# For debug
self._committed = set()
+ self._num = 0
+ self._num_clean = 0
+
def terminate(self):
self._router.terminate()
@@ -317,6 +322,7 @@ class ResourceManager(metaclass=Singleton):
Committing a resource creates an asyncio function implementing a state
management automaton.
"""
+ self._num += 1
asyncio.ensure_future(self._process_resource(resource))
def commit(self):
@@ -496,10 +502,10 @@ class ResourceManager(metaclass=Singleton):
# Task management
#--------------------------------------------------------------------------
- def schedule(self, task):
+ def schedule(self, task, resource = None):
if task is None or isinstance(task, EmptyTask):
return
- self._task_mgr.schedule(task)
+ self._task_mgr.schedule(task, resource)
#--------------------------------------------------------------------------
# Asynchronous resource API
@@ -785,8 +791,10 @@ class ResourceManager(metaclass=Singleton):
self.attr_log(resource, attribute,
'Current state is {}'.format(state))
+ # AttributeState.ERROR
if resource._state.attr_change_success == False:
- log.error('Attribute error')
+ log.error('Attribute error {} for resource {}'.format(
+ resource.get_uuid(), attribute.name))
e = resource._state.attr_change_value[attribute.name]
import traceback; traceback.print_tb(e.__traceback__)
raise NotImplementedError
@@ -794,7 +802,7 @@ class ResourceManager(metaclass=Singleton):
# Signal update errors to the parent resource
resource._state.attr_change_event[attribute.name].set()
- elif state == AttributeState.UNINITIALIZED:
+ if state == AttributeState.UNINITIALIZED:
pending_state = AttributeState.PENDING_INIT
elif state in AttributeState.INITIALIZED:
pending_state = AttributeState.PENDING_UPDATE
@@ -898,10 +906,10 @@ class ResourceManager(metaclass=Singleton):
new_state = AttributeState.CLEAN
else:
- log.error('Attribute error')
+ log.error('Attribute error {} for resource {}'.format(
+ resource.get_uuid(), attribute.name))
e = resource._state.attr_change_value[attribute.name]
- import traceback; traceback.print_tb(e.__traceback__)
- raise NotImplementedError
+ new_state = AttributeState.ERROR
else:
raise RuntimeError
@@ -1046,13 +1054,20 @@ class ResourceManager(metaclass=Singleton):
It is important to centralize state change since some states are
associated with Events().
"""
+ prev_state = resource._state.state
resource._state.state = state
if state == ResourceState.CLEAN:
# Monitoring hook
self._monitor(resource)
resource._state.clean.set()
+ if prev_state != ResourceState.CLEAN:
+ self._num_clean += 1
+ log.info("Resource {} is marked as CLEAN ({}/{})".format(
+ resource.get_uuid(), self._num_clean, self._num))
else:
resource._state.clean.clear()
+ if prev_state == ResourceState.CLEAN:
+ self._num_clean -= 1
if state == ResourceState.INITIALIZED:
resource._state.init.set()
@@ -1211,12 +1226,7 @@ class ResourceManager(metaclass=Singleton):
state = resource._state.state
self.log(resource, 'Current state is {}'.format(state))
- if resource._state.change_success == False:
- e = resource._state.change_value
- import traceback; traceback.print_tb(e.__traceback__)
- raise NotImplementedError
-
- elif state == ResourceState.UNINITIALIZED:
+ if state == ResourceState.UNINITIALIZED:
pending_state = ResourceState.PENDING_DEPS
elif state == ResourceState.DEPS_OK:
pending_state = ResourceState.PENDING_INIT
@@ -1296,9 +1306,10 @@ class ResourceManager(metaclass=Singleton):
raise RuntimeError
if task is not None and not isinstance(task, EmptyTask):
+ resource._state.change_success = None # undetermined state
state_change = functools.partial(self._trigger_state_change, resource)
task.add_done_callback(state_change)
- self.schedule(task)
+ self.schedule(task, resource)
self.log(resource, 'Trigger {} -> {}. Waiting task completion'.format(
state, pending_state))
@@ -1321,8 +1332,8 @@ class ResourceManager(metaclass=Singleton):
new_state = ResourceState.INITIALIZED
else:
e = resource._state.change_value
- import traceback; traceback.print_tb(e.__traceback__)
- raise NotImplementedError
+ log.error('Cannot setup resource {} : {}'.format(
+ resource.get_uuid(), e))
elif pending_state == ResourceState.PENDING_GET:
if resource._state.change_success == True:
@@ -1339,13 +1350,13 @@ class ResourceManager(metaclass=Singleton):
# does not exists. anyways the bug should only occur
# with container.execute(), not container.get()
log.error('LXD Fix (not found). Reset resource')
- new_state = ResourceState.UNINITIALIZED
+ new_state = ResourceState.INITIALIZED
elif ENABLE_LXD_WORKAROUND and isinstance(e, LXDAPIException):
# "not found" is the normal exception when the container
# does not exists. anyways the bug should only occur
# with container.execute(), not container.get()
log.error('LXD Fix (API error). Reset resource')
- new_state = ResourceState.UNINITIALIZED
+ new_state = ResourceState.INITIALIZED
elif isinstance(e, ResourceNotFound):
# The resource does not exist
self.log(resource, S_GET_DONE.format(
@@ -1354,8 +1365,9 @@ class ResourceManager(metaclass=Singleton):
resource._state.change_value = None
else:
e = resource._state.change_value
- import traceback; traceback.print_tb(e.__traceback__)
- raise NotImplementedError
+ log.error('Cannot get resource state {} : {}'.format(
+ resource.get_uuid(), e))
+ new_state = ResourceState.ERROR
resource._state.change_success = True
elif pending_state == ResourceState.PENDING_KEYS:
@@ -1372,8 +1384,8 @@ class ResourceManager(metaclass=Singleton):
resource._state.change_success = True
else:
e = resource._state.change_value
- import traceback; traceback.print_tb(e.__traceback__)
- raise NotImplementedError
+ log.error('Cannot create resource {} : {}'.format(
+ resource.get_uuid(), e))
elif pending_state == ResourceState.PENDING_CREATE:
if resource._state.change_success == True:
@@ -1386,19 +1398,19 @@ class ResourceManager(metaclass=Singleton):
if ENABLE_LXD_WORKAROUND and isinstance(e, LxdNotFound):
log.error('LXD Fix (not found). Reset resource')
- new_state = ResourceState.UNINITIALIZED
+ new_state = ResourceState.INITIALIZED
resource._state.change_success = True
elif ENABLE_LXD_WORKAROUND and \
isinstance(e, LXDAPIException):
log.error('LXD Fix (API error). Reset resource')
- new_state = ResourceState.UNINITIALIZED
+ new_state = ResourceState.INITIALIZED
resource._state.change_success = True
elif 'File exists' in str(e):
new_state = ResourceState.CREATED
resource._state.change_success = True
elif 'dpkg --configure -a' in str(e):
resource._dpkg_configure_a = True
- new_state = ResourceState.UNINITIALIZED
+ new_state = ResourceState.INITIALIZED
resource._state.change_success = True
else:
self.log(resource, 'CREATE failed: {}'.format(e))
diff --git a/vicn/core/state.py b/vicn/core/state.py
index d5069b24..c32b8237 100644
--- a/vicn/core/state.py
+++ b/vicn/core/state.py
@@ -46,6 +46,7 @@ class ResourceState:
PENDING_UPDATE = 'PENDING_UPDATE'
PENDING_DELETE = 'PENDING_DELETE'
DELETED = 'DELETED'
+ ERROR = 'ERROR'
class AttributeState:
UNINITIALIZED = 'UNINITIALIZED'
@@ -54,6 +55,7 @@ class AttributeState:
PENDING_INIT = 'PENDING_INIT'
PENDING_UPDATE = 'PENDING_UPDATE'
CLEAN = 'CLEAN'
+ ERROR = 'ERROR'
class Operations:
SET = 'set'
diff --git a/vicn/core/task.py b/vicn/core/task.py
index 2e9bc275..5aecb40b 100644
--- a/vicn/core/task.py
+++ b/vicn/core/task.py
@@ -34,7 +34,7 @@ log = logging.getLogger(__name__)
EXECUTOR=concurrent.futures.ThreadPoolExecutor
#EXECUTOR=concurrent.futures.ProcessPoolExecutor
-MAX_WORKERS=50 # None
+MAX_WORKERS = 50 # None
class BaseTask:
"""Base class for all tasks
@@ -171,6 +171,27 @@ def get_attributes_task(resource, attribute_names):
return {attribute_name: ret}
return func()
+def _get_func_desc(f):
+ """
+ Returns a string representation of a function for logging purposes.
+
+ Todo: args and keywords (including from partial)
+ """
+ partial = isinstance(f, functools.partial)
+ if partial:
+ f = f.func
+
+ s = ''
+ if hasattr(f, '__name__'):
+ s += f.__name__
+ if hasattr(f, '__doc__') and f.__doc__:
+ if s:
+ s += ' : '
+ s += f.__doc__
+
+ return 'partial<{}>'.format(s) if partial else s
+
+
class PythonTask(Task):
def __init__(self, func, *args, **kwargs):
super().__init__()
@@ -197,8 +218,8 @@ class PythonTask(Task):
fut.add_done_callback(self._done_callback)
def __repr__(self):
- return '<Task[py] {} / {} {}>'.format(self._func, self._args,
- self._kwargs)
+ s = _get_func_desc(self._func)
+ return '<Task[py] {}>'.format(s) if s else '<Task[py]>'
class PythonAsyncTask(PythonTask):
async def execute(self, *args, **kwargs):
@@ -213,7 +234,8 @@ class PythonAsyncTask(PythonTask):
fut.add_done_callback(self._done_callback)
def __repr__(self):
- return '<Task[apy]>'
+ s = _get_func_desc(self._func)
+ return '<Task[apy] {}>'.format(s) if s else '<Task[apy]>'
class PythonInlineTask(PythonTask):
async def execute(self, *args, **kwargs):
@@ -229,6 +251,10 @@ class PythonInlineTask(PythonTask):
self._future.set_exception(e)
return self._future
+ def __repr__(self):
+ s = _get_func_desc(self._func)
+ return '<Task[ipy] {}>'.format(s) if s else '<Task[ipy]>'
+
class BashTask(Task):
def __init__(self, node, cmd, parameters=None, parse=None, as_root=False,
output=False, pre=None, post=None, lock=None):
@@ -339,12 +365,14 @@ class TaskManager:
loop = asyncio.get_event_loop()
loop.set_default_executor(executor)
- def schedule(self, task):
+ def schedule(self, task, resource = None):
"""All instances of BaseTask can be scheduled
Here we might decide to do more advanced scheduling, like merging bash
tasks, etc. thanks to the task algebra.
"""
+ uuid = resource.get_uuid() if resource else '(unknown)'
+ log.info('Scheduling task {} for resource {}'.format(task, uuid))
asyncio.ensure_future(task.execute())
@task