aboutsummaryrefslogtreecommitdiffstats
path: root/vicn/core/task.py
diff options
context:
space:
mode:
Diffstat (limited to 'vicn/core/task.py')
-rw-r--r--vicn/core/task.py38
1 files changed, 33 insertions, 5 deletions
diff --git a/vicn/core/task.py b/vicn/core/task.py
index 2e9bc275..5aecb40b 100644
--- a/vicn/core/task.py
+++ b/vicn/core/task.py
@@ -34,7 +34,7 @@ log = logging.getLogger(__name__)
EXECUTOR=concurrent.futures.ThreadPoolExecutor
#EXECUTOR=concurrent.futures.ProcessPoolExecutor
-MAX_WORKERS=50 # None
+MAX_WORKERS = 50 # None
class BaseTask:
"""Base class for all tasks
@@ -171,6 +171,27 @@ def get_attributes_task(resource, attribute_names):
return {attribute_name: ret}
return func()
+def _get_func_desc(f):
+ """
+ Returns a string representation of a function for logging purposes.
+
+ Todo: args and keywords (including from partial)
+ """
+ partial = isinstance(f, functools.partial)
+ if partial:
+ f = f.func
+
+ s = ''
+ if hasattr(f, '__name__'):
+ s += f.__name__
+ if hasattr(f, '__doc__') and f.__doc__:
+ if s:
+ s += ' : '
+ s += f.__doc__
+
+ return 'partial<{}>'.format(s) if partial else s
+
+
class PythonTask(Task):
def __init__(self, func, *args, **kwargs):
super().__init__()
@@ -197,8 +218,8 @@ class PythonTask(Task):
fut.add_done_callback(self._done_callback)
def __repr__(self):
- return '<Task[py] {} / {} {}>'.format(self._func, self._args,
- self._kwargs)
+ s = _get_func_desc(self._func)
+ return '<Task[py] {}>'.format(s) if s else '<Task[py]>'
class PythonAsyncTask(PythonTask):
async def execute(self, *args, **kwargs):
@@ -213,7 +234,8 @@ class PythonAsyncTask(PythonTask):
fut.add_done_callback(self._done_callback)
def __repr__(self):
- return '<Task[apy]>'
+ s = _get_func_desc(self._func)
+ return '<Task[apy] {}>'.format(s) if s else '<Task[apy]>'
class PythonInlineTask(PythonTask):
async def execute(self, *args, **kwargs):
@@ -229,6 +251,10 @@ class PythonInlineTask(PythonTask):
self._future.set_exception(e)
return self._future
+ def __repr__(self):
+ s = _get_func_desc(self._func)
+ return '<Task[ipy] {}>'.format(s) if s else '<Task[ipy]>'
+
class BashTask(Task):
def __init__(self, node, cmd, parameters=None, parse=None, as_root=False,
output=False, pre=None, post=None, lock=None):
@@ -339,12 +365,14 @@ class TaskManager:
loop = asyncio.get_event_loop()
loop.set_default_executor(executor)
- def schedule(self, task):
+ def schedule(self, task, resource = None):
"""All instances of BaseTask can be scheduled
Here we might decide to do more advanced scheduling, like merging bash
tasks, etc. thanks to the task algebra.
"""
+ uuid = resource.get_uuid() if resource else '(unknown)'
+ log.info('Scheduling task {} for resource {}'.format(task, uuid))
asyncio.ensure_future(task.execute())
@task