diff options
Diffstat (limited to 'vicn/core/task.py')
-rw-r--r-- | vicn/core/task.py | 38 |
1 files changed, 33 insertions, 5 deletions
diff --git a/vicn/core/task.py b/vicn/core/task.py index 2e9bc275..5aecb40b 100644 --- a/vicn/core/task.py +++ b/vicn/core/task.py @@ -34,7 +34,7 @@ log = logging.getLogger(__name__) EXECUTOR=concurrent.futures.ThreadPoolExecutor #EXECUTOR=concurrent.futures.ProcessPoolExecutor -MAX_WORKERS=50 # None +MAX_WORKERS = 50 # None class BaseTask: """Base class for all tasks @@ -171,6 +171,27 @@ def get_attributes_task(resource, attribute_names): return {attribute_name: ret} return func() +def _get_func_desc(f): + """ + Returns a string representation of a function for logging purposes. + + Todo: args and keywords (including from partial) + """ + partial = isinstance(f, functools.partial) + if partial: + f = f.func + + s = '' + if hasattr(f, '__name__'): + s += f.__name__ + if hasattr(f, '__doc__') and f.__doc__: + if s: + s += ' : ' + s += f.__doc__ + + return 'partial<{}>'.format(s) if partial else s + + class PythonTask(Task): def __init__(self, func, *args, **kwargs): super().__init__() @@ -197,8 +218,8 @@ class PythonTask(Task): fut.add_done_callback(self._done_callback) def __repr__(self): - return '<Task[py] {} / {} {}>'.format(self._func, self._args, - self._kwargs) + s = _get_func_desc(self._func) + return '<Task[py] {}>'.format(s) if s else '<Task[py]>' class PythonAsyncTask(PythonTask): async def execute(self, *args, **kwargs): @@ -213,7 +234,8 @@ class PythonAsyncTask(PythonTask): fut.add_done_callback(self._done_callback) def __repr__(self): - return '<Task[apy]>' + s = _get_func_desc(self._func) + return '<Task[apy] {}>'.format(s) if s else '<Task[apy]>' class PythonInlineTask(PythonTask): async def execute(self, *args, **kwargs): @@ -229,6 +251,10 @@ class PythonInlineTask(PythonTask): self._future.set_exception(e) return self._future + def __repr__(self): + s = _get_func_desc(self._func) + return '<Task[ipy] {}>'.format(s) if s else '<Task[ipy]>' + class BashTask(Task): def __init__(self, node, cmd, parameters=None, parse=None, as_root=False, output=False, pre=None, post=None, lock=None): @@ -339,12 +365,14 @@ class TaskManager: loop = asyncio.get_event_loop() loop.set_default_executor(executor) - def schedule(self, task): + def schedule(self, task, resource = None): """All instances of BaseTask can be scheduled Here we might decide to do more advanced scheduling, like merging bash tasks, etc. thanks to the task algebra. """ + uuid = resource.get_uuid() if resource else '(unknown)' + log.info('Scheduling task {} for resource {}'.format(task, uuid)) asyncio.ensure_future(task.execute()) @task |