aboutsummaryrefslogtreecommitdiffstats
path: root/vicn/core/resource_mgr.py
diff options
context:
space:
mode:
Diffstat (limited to 'vicn/core/resource_mgr.py')
-rw-r--r--vicn/core/resource_mgr.py180
1 files changed, 103 insertions, 77 deletions
diff --git a/vicn/core/resource_mgr.py b/vicn/core/resource_mgr.py
index 57dcafef..c71e0444 100644
--- a/vicn/core/resource_mgr.py
+++ b/vicn/core/resource_mgr.py
@@ -104,19 +104,19 @@ class ResourceManager(metaclass=Singleton):
self._base = base
# Resources sorted via dependency (instances)
- self._resources = dict()
+ self._resources = dict()
self._deps = None
# Store resource requirements used for automatic instanciation
# instance -> attribute -> requirements
- self._instance_requirements = dict()
+ self._instance_requirements = dict()
self._dirty = set()
self._auto_commit = False
# class -> Requirements
- self._class_requirements = dict()
+ self._class_requirements = dict()
self._map_uuid_name = dict()
self._map_name_uuid = dict()
@@ -140,7 +140,7 @@ class ResourceManager(metaclass=Singleton):
self._router.add_interface('vicn', manager = self)
ws_port = self.get('websocket_port')
- self._ws = self._router.add_interface('websocketserver',
+ self._ws = self._router.add_interface('websocketserver',
port = ws_port)
# Monitoring
@@ -195,7 +195,7 @@ class ResourceManager(metaclass=Singleton):
def _broadcast(self, query):
if not self._ws:
return
- self._ws.execute(query)
+ self._ws.execute(query)
def _broadcast_packet(self, packet):
self._broadcast(packet.to_query())
@@ -205,20 +205,20 @@ class ResourceManager(metaclass=Singleton):
if not query.object_name == 'interface':
return
- q = Query(ACTION_UPDATE, 'channel', filter = query.filter,
+ q = Query(ACTION_UPDATE, 'channel', filter = query.filter,
params = query.params)
q.reply = True
- self._ws.execute(q)
+ self._ws.execute(q)
return None
def _on_netmon_record(self, packet):
query = packet.to_query()
# Find channel related to query
- # NOTE: we update the channel twice, once for each interface...
+ # NOTE: we update the channel twice, once for each interface...
if query.object_name == 'interface':
- device_names = [value for key, op, value in query.filter.to_list()
+ device_names = [value for key, op, value in query.filter.to_list()
if key == 'device_name']
if not device_names:
log.error('No device name in packet=', packet)
@@ -231,9 +231,9 @@ class ResourceManager(metaclass=Singleton):
for interface in node.interfaces:
if interface.device_name == device_name:
if interface.channel:
- f = Filter.from_list([['id', '==',
+ f = Filter.from_list([['id', '==',
interface.channel._state.uuid._uuid]])
- q = Query(ACTION_UPDATE, 'channel', filter = f,
+ q = Query(ACTION_UPDATE, 'channel', filter = f,
params = query.params)
q.reply = True
self._ws.execute(q)
@@ -245,7 +245,7 @@ class ResourceManager(metaclass=Singleton):
def _on_netmon_channel_record(self, packet):
query = packet.to_query()
if query.object_name == 'interface':
- device_names = [value for key, op, value in query.filter.to_list()
+ device_names = [value for key, op, value in query.filter.to_list()
if key == 'device_name']
if not device_names:
log.error('No device name in packet=', packet)
@@ -254,7 +254,7 @@ class ResourceManager(metaclass=Singleton):
device_name = device_names[0]
f = Filter.from_list([['id', '==', device_name]])
- q = Query(ACTION_UPDATE, 'channel', filter = f,
+ q = Query(ACTION_UPDATE, 'channel', filter = f,
params = query.params)
q.reply = True
self._ws.execute(q)
@@ -265,7 +265,7 @@ class ResourceManager(metaclass=Singleton):
def _on_vpp_record(self, packet, pylink_id):
query = packet.to_query()
if query.object_name == 'interface':
- device_names = [value for key, op, value in query.filter.to_list()
+ device_names = [value for key, op, value in query.filter.to_list()
if key == 'device_name']
if not device_names:
log.error('No device name in packet=', packet)
@@ -273,7 +273,7 @@ class ResourceManager(metaclass=Singleton):
# We might want to check if the query has SUM(*)
f = Filter.from_list([['id', '==', pylink_id]])
- q = Query(ACTION_UPDATE, 'channel', filter = f,
+ q = Query(ACTION_UPDATE, 'channel', filter = f,
params = query.params)
q.reply = True
self._ws.execute(q)
@@ -341,7 +341,7 @@ class ResourceManager(metaclass=Singleton):
asyncio.ensure_future(self._set_resource_state(resource,
ResourceState.CLEAN))
continue
-
+
self.commit_resource(resource)
def setup(self, commit=False):
@@ -358,12 +358,12 @@ class ResourceManager(metaclass=Singleton):
if '__type__' in cls.__dict__ and cls.__type__ == FactoryResource:
candidates = inheritors(cls)
if not candidates:
- log.error('Abstract resource with no candidates: %s',
+ log.error('Abstract resource with no candidates: %s',
cls.__name__)
return None
for delegate in candidates:
- if capabilities and (not '__capabilities__' in vars(delegate)
+ if capabilities and (not '__capabilities__' in vars(delegate)
or not capabilities.issubset(delegate.__capabilities__)):
continue
log.info("Abstract resource %s, delegated %s among %r" % \
@@ -371,9 +371,9 @@ class ResourceManager(metaclass=Singleton):
return delegate
return None
else:
- if capabilities and (not '__capabilities__' in vars(delegate) or
+ if capabilities and (not '__capabilities__' in vars(delegate) or
not capabilities.issubset(delegate.__capabilities__)):
- log.error('Capabilities conflict for resource : %s',
+ log.error('Capabilities conflict for resource : %s',
cls.__name__)
raise VICNException
return cls
@@ -382,7 +382,7 @@ class ResourceManager(metaclass=Singleton):
cls, attr_dict = resource_tuple
for instance in self.by_type(cls):
cur_attr_dict = instance._get_attribute_dict()
- common_keys = [k for k in cur_attr_dict.keys()
+ common_keys = [k for k in cur_attr_dict.keys()
if k in attr_dict.keys()]
if all(attr_dict[k] == cur_attr_dict[k] for k in common_keys):
return instance
@@ -457,7 +457,7 @@ class ResourceManager(metaclass=Singleton):
if not aggregates:
return None
assert len(aggregates) == 1
- return next(aggregates)
+ return next(aggregates)
def by_type(self, type):
return [r for r in self if isinstance(r, type)]
@@ -531,13 +531,13 @@ class ResourceManager(metaclass=Singleton):
await self.wait_attr_init(resource, attribute)
return resource.get(attribute)
- async def attribute_set(self, resource, attribute_name, value,
+ async def attribute_set(self, resource, attribute_name, value,
blocking=True):
with await resource._state.write_lock:
# Add the current operation to the pending list
# NOTE: collections are unordered and can be updated concurrently
#self._attribute_set_pending_value(resource, attribute_name)
- resource._state.dirty[attribute_name].trigger(Operations.SET,
+ resource._state.dirty[attribute_name].trigger(Operations.SET,
value)
attr_state = resource._state.attr_state[attribute_name]
@@ -630,12 +630,12 @@ class ResourceManager(metaclass=Singleton):
setattr(dep, dep_attr_name, dep_attr_value)
dep_attr_value_pfx = '{}:{}'.format(
- dep_attr_value.get_type(),
+ dep_attr_value.get_type(),
dep_attr_value.get_uuid())
- self.log(resource,
+ self.log(resource,
S_WAIT_DEP.format(dep_attr_value_pfx))
await wait_resource(dep_attr_value)
- self.log(resource,
+ self.log(resource,
S_WAIT_DEP_OK .format(dep_attr_value_pfx))
async def _resource_wait_predecessors(self, resource):
@@ -648,7 +648,7 @@ class ResourceManager(metaclass=Singleton):
for before in befores:
if not before.managed:
continue
- before_pfx = '{}:{}'.format(before.get_type(),
+ before_pfx = '{}:{}'.format(before.get_type(),
before.get_uuid())
self.log(resource, S_WAIT.format(before_pfx))
await wait_resource(before)
@@ -663,7 +663,7 @@ class ResourceManager(metaclass=Singleton):
for before in befores:
if not before.managed:
continue
- before_pfx = '{}:{}'.format(before.get_type(),
+ before_pfx = '{}:{}'.format(before.get_type(),
before.get_uuid())
self.log(resource, S_WAIT.format(before_pfx))
await wait_resource_init(before)
@@ -693,7 +693,7 @@ class ResourceManager(metaclass=Singleton):
await self._resource_wait_attributes(resource)
await self._resource_wait_predecessors(resource)
await self._resource_wait_subresources(resource)
-
+
def _task_resource_action(self, resource, action):
"""Perform action: __get__, __create__, __delete__ on the full class
hierarchy.
@@ -737,13 +737,14 @@ class ResourceManager(metaclass=Singleton):
It is important to centralize state change since some states are
associated with Events().
"""
- resource._state.attr_state[attribute_name] = state
if state in [
AttributeState.INITIALIZED,
AttributeState.CLEAN,
AttributeState.DIRTY
]:
resource._state.attr_init[attribute_name].set()
+ elif state == AttributeState.RESET:
+ resource._state.attr_init[attribute_name].clear()
else:
raise RuntimeError("Inconsistent resource state {}".format(state))
@@ -751,12 +752,16 @@ class ResourceManager(metaclass=Singleton):
resource._state.attr_clean[attribute_name].set()
elif state in [
AttributeState.INITIALIZED,
- AttributeState.DIRTY
+ AttributeState.DIRTY,
+ AttributeState.RESET
]:
resource._state.attr_clean[attribute_name].clear()
else:
raise RuntimeError
+ resource._state.attr_state[attribute_name] = AttributeState.UNINITIALIZED \
+ if state == AttributeState.RESET else state
+
async def _set_attribute_state(self, resource, attribute_name, state):
"""Sets the attribute state (lock version)
"""
@@ -788,19 +793,26 @@ class ResourceManager(metaclass=Singleton):
while new_state != AttributeState.CLEAN:
#with await resource._state.attr_lock[attribute.name]:
state = resource._state.attr_state[attribute.name]
- self.attr_log(resource, attribute,
+ self.attr_log(resource, attribute,
'Current state is {}'.format(state))
# AttributeState.ERROR
- if resource._state.attr_change_success == False:
- log.error('Attribute error {} for resource {}'.format(
- resource.get_uuid(), attribute.name))
+ if resource._state.attr_change_success[attribute.name] == False:
e = resource._state.attr_change_value[attribute.name]
- import traceback; traceback.print_tb(e.__traceback__)
- raise NotImplementedError
+ if ENABLE_LXD_WORKAROUND and \
+ (isinstance(e, LxdNotFound) or isinstance(e, LXDAPIException)):
+ new_state = AttributeState.RESET
+ log.error('LXD Fix (not found). Reset attribute')
+ resource._state.attr_change_success[attribute.name] = True
+ else:
+ log.error('Attribute error {} for resource {}'.format(
+ resource.get_uuid(), attribute.name))
+ import traceback; traceback.print_tb(e.__traceback__)
+ log.error('Failed with exception: {}'.format(e))
+ import os; os._exit(1)
- # Signal update errors to the parent resource
- resource._state.attr_change_event[attribute.name].set()
+ # Signal update errors to the parent resource
+ resource._state.attr_change_event[attribute.name].set()
if state == AttributeState.UNINITIALIZED:
pending_state = AttributeState.PENDING_INIT
@@ -813,14 +825,14 @@ class ResourceManager(metaclass=Singleton):
AttributeState.PENDING_UPDATE
]:
# Nothing to do
- pending_state = None
+ pending_state = None
elif state == AttributeState.CLEAN:
return
else:
raise RuntimeError
if pending_state is None:
- self.attr_log(resource, attribute,
+ self.attr_log(resource, attribute,
'Nothing to do. Waiting for event...')
await resource._state.attr_change_event[attribute.name].wait()
resource._state.attr_change_event[attribute.name].clear()
@@ -837,7 +849,7 @@ class ResourceManager(metaclass=Singleton):
task = EmptyTask()
else:
try:
- task = self._task_attribute_op(resource, attribute,
+ task = self._task_attribute_op(resource, attribute,
Operations.SET)
except Exception as e:
log.warning('No attribute setter attribute {}'.format(
@@ -853,11 +865,11 @@ class ResourceManager(metaclass=Singleton):
self.attr_log(resource, attribute,
'Trigger {} -> {}. Waiting task completion'.format(
state, pending_state))
- self.schedule(task)
+ self.schedule(task)
await resource._state.attr_change_event[attribute.name].wait()
resource._state.attr_change_event[attribute.name].clear()
- self.attr_log(resource, attribute,
+ self.attr_log(resource, attribute,
'Completed {} -> {}. Success = {}'.format(
state, pending_state,
resource._state.attr_change_success[attribute.name]))
@@ -868,25 +880,31 @@ class ResourceManager(metaclass=Singleton):
if pending_state == AttributeState.PENDING_INIT:
if resource._state.attr_change_success[attribute.name] == True:
attrs = resource._state.attr_change_value[attribute.name]
- self.attr_log(resource, attribute,
+ self.attr_log(resource, attribute,
'INIT success. Value = {}'.format(attrs))
found = self._process_attr_dict(resource, attribute, attrs)
if not found:
log.error('Attribute missing return attrs: {}'.format(
attrs))
- found = self._process_attr_dict(resource, attribute,
+ found = self._process_attr_dict(resource, attribute,
attrs)
new_state = AttributeState.INITIALIZED
else:
attrs = resource._state.attr_change_value[attribute.name]
- self.attr_log(resource, attribute,
- 'INIT gave no value. Value = {}'.format(attrs))
- new_state = AttributeState.INITIALIZED
+ if ENABLE_LXD_WORKAROUND and \
+ (isinstance(attrs, LxdNotFound) or isinstance(attrs, LXDAPIException)):
+ new_state = AttributeState.RESET
+ log.error('LXD Fix (not found). Reset attribute')
+ resource._state.attr_change_success[attribute.name] = True
+ else:
+ self.attr_log(resource, attribute,
+ 'INIT gave no value. Value = {}'.format(attrs))
+ new_state = AttributeState.INITIALIZED
elif pending_state == AttributeState.PENDING_UPDATE:
if resource._state.attr_change_success[attribute.name] == True:
attrs = resource._state.attr_change_value[attribute.name]
- self.attr_log(resource, attribute,
+ self.attr_log(resource, attribute,
'UPDATE success. Value = {}. Attribute is CLEAN'.format(attrs))
if attrs != NEVER_SET:
# None could be interpreted as the return value. Also,
@@ -906,16 +924,22 @@ class ResourceManager(metaclass=Singleton):
new_state = AttributeState.CLEAN
else:
- log.error('Attribute error {} for resource {}'.format(
+ if ENABLE_LXD_WORKAROUND and \
+ (isinstance(attrs, LxdNotFound) or isinstance(attrs, LXDAPIException)):
+ new_state = AttributeState.RESET
+ log.error('LXD Fix (not found). Reset attribute')
+ resource._state.attr_change_success[attribute.name] = True
+ else:
+ log.error('Attribute error {} for resource {}'.format(
resource.get_uuid(), attribute.name))
- e = resource._state.attr_change_value[attribute.name]
- new_state = AttributeState.ERROR
+ e = resource._state.attr_change_value[attribute.name]
+ new_state = AttributeState.ERROR
else:
raise RuntimeError
# Setting attribute state
- await self._set_attribute_state(resource, attribute.name,
+ await self._set_attribute_state(resource, attribute.name,
new_state)
#--------------------------------------------------------------------------
@@ -937,7 +961,7 @@ class ResourceManager(metaclass=Singleton):
ws = self._router.add_interface('websocketclient', address=ip,
hook=self._on_netmon_record)
-
+
node = resource.node
for interface in node.interfaces:
if not interface.monitored:
@@ -951,17 +975,17 @@ class ResourceManager(metaclass=Singleton):
continue
channel_id = interface.channel._state.uuid._uuid
-
- update_vpp = functools.partial(self._on_vpp_record,
+
+ update_vpp = functools.partial(self._on_vpp_record,
pylink_id = channel_id)
- ws_vpp = self._router.add_interface('websocketclient',
+ ws_vpp = self._router.add_interface('websocketclient',
address=ip, hook=update_vpp)
aggregate_interfaces = list()
for _interface in node.interfaces:
if not _interface.get_type() == 'dpdkdevice' and \
_interface.monitored:
- aggregate_interfaces.append('"' +
+ aggregate_interfaces.append('"' +
_interface.device_name + '"')
q_str = Q_SUB_VPP.format(','.join(aggregate_interfaces))
@@ -975,7 +999,7 @@ class ResourceManager(metaclass=Singleton):
interface.channel.already_subscribed = True
else:
- q_str = Q_SUB_IF.format(interface.device_name)
+ q_str = Q_SUB_IF.format(interface.device_name)
q = self.parse_query(q_str)
packet = Packet.from_query(q)
self._router._flow_table.add(packet, None, ws)
@@ -986,7 +1010,7 @@ class ResourceManager(metaclass=Singleton):
ip = ns.node.bridge.ip_address # host_interface.ip_address
ws_ns = self._router.add_interface('websocketclient', address = ip,
- port = ns.control_port,
+ port = ns.control_port,
hook = self._on_ns_record)
ws = self._router.add_interface('websocketclient', address = ip,
hook = self._on_netmon_channel_record)
@@ -1011,7 +1035,7 @@ class ResourceManager(metaclass=Singleton):
# We also need to subscribe on the node for the tap interfaces
# for individual bandwidth monitoring
tap = ns._sta_taps[station]
- q_str = Q_SUB_EMULATOR.format(tap.device_name)
+ q_str = Q_SUB_EMULATOR.format(tap.device_name)
q = self.parse_query(q_str)
packet = Packet.from_query(q)
self._router._flow_table.add(packet, None, ws)
@@ -1099,7 +1123,7 @@ class ResourceManager(metaclass=Singleton):
# Monitor all FSM one by one and inform about errors.
futs = list()
attrs = list()
- for attr in resource.iter_attributes():
+ for attr in resource.iter_attributes():
if resource.is_local_attribute(attr.name):
continue
if attr.key:
@@ -1121,14 +1145,14 @@ class ResourceManager(metaclass=Singleton):
resource._state.change_success = all(
resource._state.attr_change_success[attr.name]
for attr in attrs)
- self.log(resource,
+ self.log(resource,
'All attributes FSM terminated with success={}'.format(
resource._state.change_success))
if resource._state.change_success:
- ret = [ resource._state.attr_change_value[attr.name]
+ ret = [ resource._state.attr_change_value[attr.name]
for attr in attrs]
- return ret
+ return ret
else:
raise NotImplementedError('At least one attribute failed')
@@ -1145,7 +1169,7 @@ class ResourceManager(metaclass=Singleton):
if not futs:
self.log(resource, 'No key attribute to update')
- return None
+ return None
await asyncio.gather(*futs)
@@ -1154,7 +1178,7 @@ class ResourceManager(metaclass=Singleton):
resource._state.change_success = all(
resource._state.attr_change_success[attr.name]
for attr in attrs)
- self.log(resource,
+ self.log(resource,
'KEY attributes FSM terminated with success={}'.format(
resource._state.change_success))
@@ -1170,7 +1194,7 @@ class ResourceManager(metaclass=Singleton):
def log(self, resource, msg=None):
resource._state.log.append(msg)
-
+
# Display on screen
#pfx = '[{}] {}: '.format(resource.get_type(), resource.get_uuid())
#print(pfx, msg)
@@ -1199,20 +1223,20 @@ class ResourceManager(metaclass=Singleton):
pfx = '[{}] {}: '.format(resource.get_type(), resource.get_uuid())
self.log(resource, 'Starting FSM...')
-
+
# When a resource is managed, it will get automatically monitored by
# adding the netmon resource on it.
from vicn.resource.node import Node
if resource.get_type() == 'lxccontainer':
- self.log(resource,
+ self.log(resource,
'Associating monitoring to lxc container resource...')
instance = self.create('netmon', node=resource)
self.commit_resource(instance)
- # FIXME
+ # FIXME
elif resource.get_type() == 'physical' and resource.managed and \
len(self.by_type_str('emulatedchannel')) > 0:
- self.log(resource,
+ self.log(resource,
'Associating monitoring to physical node resource...')
instance = self.create('netmon', node=resource)
self.commit_resource(instance)
@@ -1249,7 +1273,7 @@ class ResourceManager(metaclass=Singleton):
elif state == ResourceState.DELETED:
raise NotImplementedError
# Nothing to do unless explicitely requested
- pending_state = None
+ pending_state = None
elif state in [
ResourceState.PENDING_DEPS,
@@ -1416,7 +1440,8 @@ class ResourceManager(metaclass=Singleton):
self.log(resource, 'CREATE failed: {}'.format(e))
e = resource._state.change_value
import traceback; traceback.print_tb(e.__traceback__)
- raise NotImplementedError
+ log.error('Failed with exception {}'.format(e))
+ import os; os._exit(1)
elif pending_state == ResourceState.PENDING_UPDATE:
if resource._state.change_success == True:
@@ -1436,7 +1461,8 @@ class ResourceManager(metaclass=Singleton):
e = resource._state.change_value
resource._state.write_lock.release()
import traceback; traceback.print_tb(e.__traceback__)
- raise NotImplementedError
+ log.error('Failed with exception {}'.format(e))
+ import os; os._exit(1)
elif pending_state == ResourceState.PENDING_DELETE:
raise NotImplementedError