1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
|
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2017 Cisco and/or its affiliates.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import asyncio
import concurrent.futures
import functools
import logging
import shlex
import subprocess
import os
from netmodel.util.process import execute_local
from vicn.core.commands import ReturnValue
from vicn.core.commands import Command, SequentialCommands
from vicn.core.exception import ResourceNotFound
from vicn.core.scheduling_algebra import SchedulingAlgebra
log = logging.getLogger(__name__)
EXECUTOR=concurrent.futures.ThreadPoolExecutor
#EXECUTOR=concurrent.futures.ProcessPoolExecutor
#Sets the number of task workers to the number of CPU threads+1
MAX_WORKERS = os.cpu_count()+1
class BaseTask:
"""Base class for all tasks
"""
def __init__(self):
self._future = asyncio.Future()
def terminate(self):
pass
def start(self):
pass
def stop(self):
pass
def get_future(self):
return self._future
def add_done_callback(self, cb):
self._future.add_done_callback(cb)
def __repr__(self):
return '<BaseTask>'
class ConcurrentMixin:
async def execute(self):
try:
for t in self._elements:
await t.execute()
rets = await asyncio.gather(*[t.get_future()
for t in self._elements])
# The result value is the "union" of all result values
# In case of tasks setting the same attributes, they are merged
# into a list
dic = dict()
for ret in rets:
# Ideally we should get all attribute names, and properly
# insert Nones. So far we assume all dicts are present and
# complete.
if not isinstance(ret, dict):
continue
for k, v in ret.items():
if k in dic:
if not isinstance(dic[k], list):
dic[k] = [dic[k]]
dic[k].append(v)
else:
dic[k] = [v]
self.get_future().set_result(dic)
except Exception as e:
self.get_future().set_exception(e)
class SequentialMixin:
async def execute(self):
try:
for t in self._elements:
await t.execute()
await t.get_future()
self.get_future().set_result(None)
except Exception as e:
self.get_future().set_exception(e)
class CompositionMixin:
async def execute(self):
try:
ret = None
for t in self._elements:
ret = (ret,) if ret is not None else tuple()
await t.execute(*ret)
ret = await t.get_future()
self.get_future().set_result(ret)
except Exception as e:
self.get_future().set_exception(e)
Task, EmptyTask = SchedulingAlgebra(BaseTask, ConcurrentMixin,
CompositionMixin, SequentialMixin)
def task(fn):
def decorator(*args, **kwargs):
return PythonTask(fn, *args, **kwargs)
return decorator
def async_task(fn, *t_args, **t_kwargs):
def decorator(*args, **kwargs):
all_args = tuple() + t_args + args
all_kwargs = dict()
all_kwargs.update(t_kwargs)
all_kwargs.update(kwargs)
return PythonAsyncTask(fn, *args, **kwargs)
return decorator
def inline_task(fn):
def decorator(*args, **kwargs):
return PythonInlineTask(fn, *args, **kwargs)
return decorator
async def wait_task(task):
return await task.get_future()
async def run_task(task, manager):
manager.schedule(task)
ret = await wait_task(task)
return ret
async def wait_concurrent_tasks(tasks):
await wait_task(Task.__concurrent__(*tasks))
wait_task_task = async_task(wait_task)
#------------------------------------------------------------------------------
# Task inheritance
#------------------------------------------------------------------------------
# NOTES:
# - delete is a special case where we have to reverse operations
# - subresources is also a special case, since it deals with resources, but it
# works the same since the algebra is similar
def inherit_parent(fn):
def f(self, *args, **kwargs):
# Break loops
if fn.__name__ not in vars(self.__class__):
return fn(self, *args, **kwargs)
parent_tasks = EmptyTask()
for parent in self.__class__.__bases__:
if not fn.__name__ in vars(parent):
continue
for cls in self.__class__.__mro__:
if cls.__dict__.get(fn.__name__) is fn:
break
parent_task = getattr(super(cls, self), fn.__name__, None)
if not parent_task:
continue
parent_tasks = parent_tasks | parent_task(self)
if fn.__name__ == 'delete':
return fn(*args, **kwargs) > parent_tasks
else:
return parent_tasks > fn(self, *args, **kwargs)
return f
def override_parent(fn):
def f(self, *args, **kwargs):
# Break loops
if fn.__name__ not in vars(self.__class__):
return fn(self, *args, **kwargs)
bases = set([ancestor for parent in self.__class__.__bases__
for ancestor in parent.__bases__])
ancestor_tasks = EmptyTask()
for base in bases:
if not fn.__name__ in vars(base):
continue
ancestor_task = getattr(base, fn.__name__, None)
if not ancestor_task:
continue
ancestor_tasks = ancestor_tasks | ancestor_task(self)
if fn.__name__ == 'delete':
return fn(*args, **kwargs) > ancestor_tasks
else:
return ancestor_tasks > fn(self, *args, **kwargs)
return f
def inherit(*classes):
def decorator(fn):
def f(self, *args, **kwargs):
# Break loops
if fn.__name__ not in vars(self.__class__):
return fn(self, *args, **kwargs)
parent_tasks = EmptyTask()
for parent in classes:
if not fn.__name__ in vars(parent):
continue
parent_task = getattr(parent, fn.__name__, None)
if not parent_task:
continue
parent_tasks = parent_tasks | parent_task(self)
if fn.__name__ == 'delete':
return fn(*args, **kwargs) > parent_tasks
else:
return parent_tasks > fn(self, *args, **kwargs)
return f
return decorator
#------------------------------------------------------------------------------
def get_attribute_task(resource, attribute_name):
@async_task
async def func():
return await resource.async_get(attribute_name)
return func()
def set_attributes_task(resource, attribute_dict):
# The difficulty is in setting the pending value without triggering the
# manager, and executing the task by ourselves !
raise NotImplementedError
def get_attributes_task(resource, attribute_names):
assert len(attribute_names) == 1
attribute_name = attribute_names[0]
@async_task
async def func():
await resource._state.manager.wait_attr_init(resource, attribute_name)
ret = await resource.async_get(attribute_name)
return {attribute_name: ret}
return func()
def _get_func_desc(f):
"""
Returns a string representation of a function for logging purposes.
Todo: args and keywords (including from partial)
"""
partial = isinstance(f, functools.partial)
if partial:
f = f.func
s = ''
if hasattr(f, '__name__'):
s += f.__name__
if hasattr(f, '__doc__') and f.__doc__:
if s:
s += ' : '
s += f.__doc__
return 'partial<{}>'.format(s) if partial else s
class PythonTask(Task):
def __init__(self, func, *args, **kwargs):
super().__init__()
self._func = func
self._args = args
self._kwargs = kwargs
def _done_callback(self, fut):
try:
self._future.set_result(fut.result())
except Exception as e:
self._future.set_exception(e)
async def execute(self, *args, **kwargs):
all_args = self._args + args
all_kwargs = dict()
all_kwargs.update(self._kwargs)
all_kwargs.update(kwargs)
partial = functools.partial(self._func, *all_args, **all_kwargs)
loop = asyncio.get_event_loop()
fut = loop.run_in_executor(None, partial)
fut.add_done_callback(self._done_callback)
def execute_blocking(self, *args, **kwargs):
all_args = self._args + args
all_kwargs = dict()
all_kwargs.update(self._kwargs)
all_kwargs.update(kwargs)
return self._func(*all_args, **all_kwargs)
def __repr__(self):
s = _get_func_desc(self._func)
return '<Task[py] {}>'.format(s) if s else '<Task[py]>'
class PythonAsyncTask(PythonTask):
async def execute(self, *args, **kwargs):
all_args = self._args + args
all_kwargs = dict()
all_kwargs.update(self._kwargs)
all_kwargs.update(kwargs)
partial = asyncio.coroutine(self._func)(*all_args, **all_kwargs)
fut = asyncio.ensure_future(partial)
fut.add_done_callback(self._done_callback)
def __repr__(self):
s = _get_func_desc(self._func)
return '<Task[apy] {}>'.format(s) if s else '<Task[apy]>'
class PythonInlineTask(PythonTask):
async def execute(self, *args, **kwargs):
all_args = self._args + args
all_kwargs = dict()
all_kwargs.update(self._kwargs)
all_kwargs.update(kwargs)
try:
ret = self._func(*all_args, **all_kwargs)
self._future.set_result(ret)
except Exception as e:
self._future.set_exception(e)
return self._future
def __repr__(self):
s = _get_func_desc(self._func)
return '<Task[ipy] {}>'.format(s) if s else '<Task[ipy]>'
class BashTask(Task):
def __init__(self, node, cmd, parameters=None, parse=None, as_root=False,
output=False, pre=None, post=None, lock=None):
super().__init__()
self._node = node
self._cmd = cmd
self._params = parameters if parameters else dict()
self._parse = parse
self._pre = pre
self._post = post
self._lock = lock
self._output = output
self._as_root = as_root
def _default_parse_for_get(self, rv):
if not bool(rv):
raise ResourceNotFound
def set_default_parse_for_get(self):
if self._parse is None:
self._parse = self._default_parse_for_get
def _done_callback(self, fut):
"""
Note: extends the functionality of the parent _done_callback
"""
try:
rv = fut.result()
if self._parse is None and rv.return_value != 0:
raise Exception('Bash command failed on node {}'.format(self._node.name), self.get_full_cmd(), rv)
if self._post:
self._post()
if self._parse:
rv = self._parse(rv)
self._future.set_result(rv)
except Exception as e:
self._future.set_exception(e)
if self._lock:
self._lock.release()
def get_full_cmd(self):
c = SequentialCommands()
desc = None
cmd = self._cmd
if callable(cmd):
cmd = cmd()
for line in cmd.splitlines():
line = line.strip()
if not line:
continue
if line.startswith('#'):
desc = line[1:].strip()
continue
c << Command(line, description = desc)
desc = None
c.parameters = self._params
return c.command.full_commandline
async def execute(self, *args, **kwargs):
"""Execute the task, enforcing any eventual locking.
Returns:
asyncio.Future
Upon completion (eventually error), the Task's future is set.
"""
if len(args) == 1:
dic, = args
if isinstance(dic, dict):
self._params.update(dic)
if self._pre:
self._pre()
func = self._node.execute if self._node else execute_local
# It is important that the command is contructed only just before it is
# executed, so that any object passed as parameters is deferenced right
# on time.
cmd = self.get_full_cmd()
partial = functools.partial(func, cmd, output = self._output or bool(self._parse))
node_str = self._node.name if self._node else '(LOCAL)'
cmd_str = cmd#[:77] + '...' if len(cmd) > 80 else cmd
log.info('Execute: {} - {}'.format(node_str, cmd_str))
if self._lock:
# We need to do lock/unlock around the task execution
# Locking now will early block other tasks, but will at the same
# time delay them entering the executor queue; so this is
# equivalent
await self._lock.acquire()
loop = asyncio.get_event_loop()
fut = loop.run_in_executor(None, partial)
fut.add_done_callback(self._done_callback)
def execute_blocking(self):
rv = self._node.execute(self.get_full_cmd(), output=True)
if self._parse:
rv = self._parse(rv)
return rv
def __repr__(self):
return '<Task[bash] {} / {}>'.format(self._cmd, self._params)
class TaskManager:
def __init__(self):
executor = EXECUTOR() if MAX_WORKERS is None \
else EXECUTOR(max_workers=MAX_WORKERS)
loop = asyncio.get_event_loop()
loop.set_default_executor(executor)
def schedule(self, task, resource = None):
"""All instances of BaseTask can be scheduled
Here we might decide to do more advanced scheduling, like merging bash
tasks, etc. thanks to the task algebra.
"""
uuid = resource.get_uuid() if resource else '(unknown)'
log.info('Scheduling task {} for resource {}'.format(task, uuid))
asyncio.ensure_future(task.execute())
@task
def ParseRegexTask(rv):
return [m.groupdict() for m in rx.finditer(rv.stdout)]
|