diff options
author | Marcel Enguehard <mengueha+fdio@cisco.com> | 2017-04-11 10:24:25 +0200 |
---|---|---|
committer | Marcel Enguehard <mengueha+fdio@cisco.com> | 2017-04-11 08:30:34 +0000 |
commit | efd3df05559be6d2cdad38a02112c307c5013ccc (patch) | |
tree | ecce42d2de691d70eee44c3936e1d8fb8ad7aa81 /vicn/core | |
parent | 2021eb93cc7fd09e166fd721e8f2b814bba68bdb (diff) |
* Handler for Lxd NotFound exception for attributes
* Typo in capacity settings
* Error handling: vICN fails directly on exception
* Removed urllib3 warnings
* Trailing spaces
Change-Id: I9358d33da8607f62496a4bfadd5da5228ee484dc
Signed-off-by: Marcel Enguehard <mengueha+fdio@cisco.com>
Diffstat (limited to 'vicn/core')
-rw-r--r-- | vicn/core/resource_mgr.py | 180 | ||||
-rw-r--r-- | vicn/core/state.py | 1 |
2 files changed, 104 insertions, 77 deletions
diff --git a/vicn/core/resource_mgr.py b/vicn/core/resource_mgr.py index 57dcafef..c71e0444 100644 --- a/vicn/core/resource_mgr.py +++ b/vicn/core/resource_mgr.py @@ -104,19 +104,19 @@ class ResourceManager(metaclass=Singleton): self._base = base # Resources sorted via dependency (instances) - self._resources = dict() + self._resources = dict() self._deps = None # Store resource requirements used for automatic instanciation # instance -> attribute -> requirements - self._instance_requirements = dict() + self._instance_requirements = dict() self._dirty = set() self._auto_commit = False # class -> Requirements - self._class_requirements = dict() + self._class_requirements = dict() self._map_uuid_name = dict() self._map_name_uuid = dict() @@ -140,7 +140,7 @@ class ResourceManager(metaclass=Singleton): self._router.add_interface('vicn', manager = self) ws_port = self.get('websocket_port') - self._ws = self._router.add_interface('websocketserver', + self._ws = self._router.add_interface('websocketserver', port = ws_port) # Monitoring @@ -195,7 +195,7 @@ class ResourceManager(metaclass=Singleton): def _broadcast(self, query): if not self._ws: return - self._ws.execute(query) + self._ws.execute(query) def _broadcast_packet(self, packet): self._broadcast(packet.to_query()) @@ -205,20 +205,20 @@ class ResourceManager(metaclass=Singleton): if not query.object_name == 'interface': return - q = Query(ACTION_UPDATE, 'channel', filter = query.filter, + q = Query(ACTION_UPDATE, 'channel', filter = query.filter, params = query.params) q.reply = True - self._ws.execute(q) + self._ws.execute(q) return None def _on_netmon_record(self, packet): query = packet.to_query() # Find channel related to query - # NOTE: we update the channel twice, once for each interface... + # NOTE: we update the channel twice, once for each interface... if query.object_name == 'interface': - device_names = [value for key, op, value in query.filter.to_list() + device_names = [value for key, op, value in query.filter.to_list() if key == 'device_name'] if not device_names: log.error('No device name in packet=', packet) @@ -231,9 +231,9 @@ class ResourceManager(metaclass=Singleton): for interface in node.interfaces: if interface.device_name == device_name: if interface.channel: - f = Filter.from_list([['id', '==', + f = Filter.from_list([['id', '==', interface.channel._state.uuid._uuid]]) - q = Query(ACTION_UPDATE, 'channel', filter = f, + q = Query(ACTION_UPDATE, 'channel', filter = f, params = query.params) q.reply = True self._ws.execute(q) @@ -245,7 +245,7 @@ class ResourceManager(metaclass=Singleton): def _on_netmon_channel_record(self, packet): query = packet.to_query() if query.object_name == 'interface': - device_names = [value for key, op, value in query.filter.to_list() + device_names = [value for key, op, value in query.filter.to_list() if key == 'device_name'] if not device_names: log.error('No device name in packet=', packet) @@ -254,7 +254,7 @@ class ResourceManager(metaclass=Singleton): device_name = device_names[0] f = Filter.from_list([['id', '==', device_name]]) - q = Query(ACTION_UPDATE, 'channel', filter = f, + q = Query(ACTION_UPDATE, 'channel', filter = f, params = query.params) q.reply = True self._ws.execute(q) @@ -265,7 +265,7 @@ class ResourceManager(metaclass=Singleton): def _on_vpp_record(self, packet, pylink_id): query = packet.to_query() if query.object_name == 'interface': - device_names = [value for key, op, value in query.filter.to_list() + device_names = [value for key, op, value in query.filter.to_list() if key == 'device_name'] if not device_names: log.error('No device name in packet=', packet) @@ -273,7 +273,7 @@ class ResourceManager(metaclass=Singleton): # We might want to check if the query has SUM(*) f = Filter.from_list([['id', '==', pylink_id]]) - q = Query(ACTION_UPDATE, 'channel', filter = f, + q = Query(ACTION_UPDATE, 'channel', filter = f, params = query.params) q.reply = True self._ws.execute(q) @@ -341,7 +341,7 @@ class ResourceManager(metaclass=Singleton): asyncio.ensure_future(self._set_resource_state(resource, ResourceState.CLEAN)) continue - + self.commit_resource(resource) def setup(self, commit=False): @@ -358,12 +358,12 @@ class ResourceManager(metaclass=Singleton): if '__type__' in cls.__dict__ and cls.__type__ == FactoryResource: candidates = inheritors(cls) if not candidates: - log.error('Abstract resource with no candidates: %s', + log.error('Abstract resource with no candidates: %s', cls.__name__) return None for delegate in candidates: - if capabilities and (not '__capabilities__' in vars(delegate) + if capabilities and (not '__capabilities__' in vars(delegate) or not capabilities.issubset(delegate.__capabilities__)): continue log.info("Abstract resource %s, delegated %s among %r" % \ @@ -371,9 +371,9 @@ class ResourceManager(metaclass=Singleton): return delegate return None else: - if capabilities and (not '__capabilities__' in vars(delegate) or + if capabilities and (not '__capabilities__' in vars(delegate) or not capabilities.issubset(delegate.__capabilities__)): - log.error('Capabilities conflict for resource : %s', + log.error('Capabilities conflict for resource : %s', cls.__name__) raise VICNException return cls @@ -382,7 +382,7 @@ class ResourceManager(metaclass=Singleton): cls, attr_dict = resource_tuple for instance in self.by_type(cls): cur_attr_dict = instance._get_attribute_dict() - common_keys = [k for k in cur_attr_dict.keys() + common_keys = [k for k in cur_attr_dict.keys() if k in attr_dict.keys()] if all(attr_dict[k] == cur_attr_dict[k] for k in common_keys): return instance @@ -457,7 +457,7 @@ class ResourceManager(metaclass=Singleton): if not aggregates: return None assert len(aggregates) == 1 - return next(aggregates) + return next(aggregates) def by_type(self, type): return [r for r in self if isinstance(r, type)] @@ -531,13 +531,13 @@ class ResourceManager(metaclass=Singleton): await self.wait_attr_init(resource, attribute) return resource.get(attribute) - async def attribute_set(self, resource, attribute_name, value, + async def attribute_set(self, resource, attribute_name, value, blocking=True): with await resource._state.write_lock: # Add the current operation to the pending list # NOTE: collections are unordered and can be updated concurrently #self._attribute_set_pending_value(resource, attribute_name) - resource._state.dirty[attribute_name].trigger(Operations.SET, + resource._state.dirty[attribute_name].trigger(Operations.SET, value) attr_state = resource._state.attr_state[attribute_name] @@ -630,12 +630,12 @@ class ResourceManager(metaclass=Singleton): setattr(dep, dep_attr_name, dep_attr_value) dep_attr_value_pfx = '{}:{}'.format( - dep_attr_value.get_type(), + dep_attr_value.get_type(), dep_attr_value.get_uuid()) - self.log(resource, + self.log(resource, S_WAIT_DEP.format(dep_attr_value_pfx)) await wait_resource(dep_attr_value) - self.log(resource, + self.log(resource, S_WAIT_DEP_OK .format(dep_attr_value_pfx)) async def _resource_wait_predecessors(self, resource): @@ -648,7 +648,7 @@ class ResourceManager(metaclass=Singleton): for before in befores: if not before.managed: continue - before_pfx = '{}:{}'.format(before.get_type(), + before_pfx = '{}:{}'.format(before.get_type(), before.get_uuid()) self.log(resource, S_WAIT.format(before_pfx)) await wait_resource(before) @@ -663,7 +663,7 @@ class ResourceManager(metaclass=Singleton): for before in befores: if not before.managed: continue - before_pfx = '{}:{}'.format(before.get_type(), + before_pfx = '{}:{}'.format(before.get_type(), before.get_uuid()) self.log(resource, S_WAIT.format(before_pfx)) await wait_resource_init(before) @@ -693,7 +693,7 @@ class ResourceManager(metaclass=Singleton): await self._resource_wait_attributes(resource) await self._resource_wait_predecessors(resource) await self._resource_wait_subresources(resource) - + def _task_resource_action(self, resource, action): """Perform action: __get__, __create__, __delete__ on the full class hierarchy. @@ -737,13 +737,14 @@ class ResourceManager(metaclass=Singleton): It is important to centralize state change since some states are associated with Events(). """ - resource._state.attr_state[attribute_name] = state if state in [ AttributeState.INITIALIZED, AttributeState.CLEAN, AttributeState.DIRTY ]: resource._state.attr_init[attribute_name].set() + elif state == AttributeState.RESET: + resource._state.attr_init[attribute_name].clear() else: raise RuntimeError("Inconsistent resource state {}".format(state)) @@ -751,12 +752,16 @@ class ResourceManager(metaclass=Singleton): resource._state.attr_clean[attribute_name].set() elif state in [ AttributeState.INITIALIZED, - AttributeState.DIRTY + AttributeState.DIRTY, + AttributeState.RESET ]: resource._state.attr_clean[attribute_name].clear() else: raise RuntimeError + resource._state.attr_state[attribute_name] = AttributeState.UNINITIALIZED \ + if state == AttributeState.RESET else state + async def _set_attribute_state(self, resource, attribute_name, state): """Sets the attribute state (lock version) """ @@ -788,19 +793,26 @@ class ResourceManager(metaclass=Singleton): while new_state != AttributeState.CLEAN: #with await resource._state.attr_lock[attribute.name]: state = resource._state.attr_state[attribute.name] - self.attr_log(resource, attribute, + self.attr_log(resource, attribute, 'Current state is {}'.format(state)) # AttributeState.ERROR - if resource._state.attr_change_success == False: - log.error('Attribute error {} for resource {}'.format( - resource.get_uuid(), attribute.name)) + if resource._state.attr_change_success[attribute.name] == False: e = resource._state.attr_change_value[attribute.name] - import traceback; traceback.print_tb(e.__traceback__) - raise NotImplementedError + if ENABLE_LXD_WORKAROUND and \ + (isinstance(e, LxdNotFound) or isinstance(e, LXDAPIException)): + new_state = AttributeState.RESET + log.error('LXD Fix (not found). Reset attribute') + resource._state.attr_change_success[attribute.name] = True + else: + log.error('Attribute error {} for resource {}'.format( + resource.get_uuid(), attribute.name)) + import traceback; traceback.print_tb(e.__traceback__) + log.error('Failed with exception: {}'.format(e)) + import os; os._exit(1) - # Signal update errors to the parent resource - resource._state.attr_change_event[attribute.name].set() + # Signal update errors to the parent resource + resource._state.attr_change_event[attribute.name].set() if state == AttributeState.UNINITIALIZED: pending_state = AttributeState.PENDING_INIT @@ -813,14 +825,14 @@ class ResourceManager(metaclass=Singleton): AttributeState.PENDING_UPDATE ]: # Nothing to do - pending_state = None + pending_state = None elif state == AttributeState.CLEAN: return else: raise RuntimeError if pending_state is None: - self.attr_log(resource, attribute, + self.attr_log(resource, attribute, 'Nothing to do. Waiting for event...') await resource._state.attr_change_event[attribute.name].wait() resource._state.attr_change_event[attribute.name].clear() @@ -837,7 +849,7 @@ class ResourceManager(metaclass=Singleton): task = EmptyTask() else: try: - task = self._task_attribute_op(resource, attribute, + task = self._task_attribute_op(resource, attribute, Operations.SET) except Exception as e: log.warning('No attribute setter attribute {}'.format( @@ -853,11 +865,11 @@ class ResourceManager(metaclass=Singleton): self.attr_log(resource, attribute, 'Trigger {} -> {}. Waiting task completion'.format( state, pending_state)) - self.schedule(task) + self.schedule(task) await resource._state.attr_change_event[attribute.name].wait() resource._state.attr_change_event[attribute.name].clear() - self.attr_log(resource, attribute, + self.attr_log(resource, attribute, 'Completed {} -> {}. Success = {}'.format( state, pending_state, resource._state.attr_change_success[attribute.name])) @@ -868,25 +880,31 @@ class ResourceManager(metaclass=Singleton): if pending_state == AttributeState.PENDING_INIT: if resource._state.attr_change_success[attribute.name] == True: attrs = resource._state.attr_change_value[attribute.name] - self.attr_log(resource, attribute, + self.attr_log(resource, attribute, 'INIT success. Value = {}'.format(attrs)) found = self._process_attr_dict(resource, attribute, attrs) if not found: log.error('Attribute missing return attrs: {}'.format( attrs)) - found = self._process_attr_dict(resource, attribute, + found = self._process_attr_dict(resource, attribute, attrs) new_state = AttributeState.INITIALIZED else: attrs = resource._state.attr_change_value[attribute.name] - self.attr_log(resource, attribute, - 'INIT gave no value. Value = {}'.format(attrs)) - new_state = AttributeState.INITIALIZED + if ENABLE_LXD_WORKAROUND and \ + (isinstance(attrs, LxdNotFound) or isinstance(attrs, LXDAPIException)): + new_state = AttributeState.RESET + log.error('LXD Fix (not found). Reset attribute') + resource._state.attr_change_success[attribute.name] = True + else: + self.attr_log(resource, attribute, + 'INIT gave no value. Value = {}'.format(attrs)) + new_state = AttributeState.INITIALIZED elif pending_state == AttributeState.PENDING_UPDATE: if resource._state.attr_change_success[attribute.name] == True: attrs = resource._state.attr_change_value[attribute.name] - self.attr_log(resource, attribute, + self.attr_log(resource, attribute, 'UPDATE success. Value = {}. Attribute is CLEAN'.format(attrs)) if attrs != NEVER_SET: # None could be interpreted as the return value. Also, @@ -906,16 +924,22 @@ class ResourceManager(metaclass=Singleton): new_state = AttributeState.CLEAN else: - log.error('Attribute error {} for resource {}'.format( + if ENABLE_LXD_WORKAROUND and \ + (isinstance(attrs, LxdNotFound) or isinstance(attrs, LXDAPIException)): + new_state = AttributeState.RESET + log.error('LXD Fix (not found). Reset attribute') + resource._state.attr_change_success[attribute.name] = True + else: + log.error('Attribute error {} for resource {}'.format( resource.get_uuid(), attribute.name)) - e = resource._state.attr_change_value[attribute.name] - new_state = AttributeState.ERROR + e = resource._state.attr_change_value[attribute.name] + new_state = AttributeState.ERROR else: raise RuntimeError # Setting attribute state - await self._set_attribute_state(resource, attribute.name, + await self._set_attribute_state(resource, attribute.name, new_state) #-------------------------------------------------------------------------- @@ -937,7 +961,7 @@ class ResourceManager(metaclass=Singleton): ws = self._router.add_interface('websocketclient', address=ip, hook=self._on_netmon_record) - + node = resource.node for interface in node.interfaces: if not interface.monitored: @@ -951,17 +975,17 @@ class ResourceManager(metaclass=Singleton): continue channel_id = interface.channel._state.uuid._uuid - - update_vpp = functools.partial(self._on_vpp_record, + + update_vpp = functools.partial(self._on_vpp_record, pylink_id = channel_id) - ws_vpp = self._router.add_interface('websocketclient', + ws_vpp = self._router.add_interface('websocketclient', address=ip, hook=update_vpp) aggregate_interfaces = list() for _interface in node.interfaces: if not _interface.get_type() == 'dpdkdevice' and \ _interface.monitored: - aggregate_interfaces.append('"' + + aggregate_interfaces.append('"' + _interface.device_name + '"') q_str = Q_SUB_VPP.format(','.join(aggregate_interfaces)) @@ -975,7 +999,7 @@ class ResourceManager(metaclass=Singleton): interface.channel.already_subscribed = True else: - q_str = Q_SUB_IF.format(interface.device_name) + q_str = Q_SUB_IF.format(interface.device_name) q = self.parse_query(q_str) packet = Packet.from_query(q) self._router._flow_table.add(packet, None, ws) @@ -986,7 +1010,7 @@ class ResourceManager(metaclass=Singleton): ip = ns.node.bridge.ip_address # host_interface.ip_address ws_ns = self._router.add_interface('websocketclient', address = ip, - port = ns.control_port, + port = ns.control_port, hook = self._on_ns_record) ws = self._router.add_interface('websocketclient', address = ip, hook = self._on_netmon_channel_record) @@ -1011,7 +1035,7 @@ class ResourceManager(metaclass=Singleton): # We also need to subscribe on the node for the tap interfaces # for individual bandwidth monitoring tap = ns._sta_taps[station] - q_str = Q_SUB_EMULATOR.format(tap.device_name) + q_str = Q_SUB_EMULATOR.format(tap.device_name) q = self.parse_query(q_str) packet = Packet.from_query(q) self._router._flow_table.add(packet, None, ws) @@ -1099,7 +1123,7 @@ class ResourceManager(metaclass=Singleton): # Monitor all FSM one by one and inform about errors. futs = list() attrs = list() - for attr in resource.iter_attributes(): + for attr in resource.iter_attributes(): if resource.is_local_attribute(attr.name): continue if attr.key: @@ -1121,14 +1145,14 @@ class ResourceManager(metaclass=Singleton): resource._state.change_success = all( resource._state.attr_change_success[attr.name] for attr in attrs) - self.log(resource, + self.log(resource, 'All attributes FSM terminated with success={}'.format( resource._state.change_success)) if resource._state.change_success: - ret = [ resource._state.attr_change_value[attr.name] + ret = [ resource._state.attr_change_value[attr.name] for attr in attrs] - return ret + return ret else: raise NotImplementedError('At least one attribute failed') @@ -1145,7 +1169,7 @@ class ResourceManager(metaclass=Singleton): if not futs: self.log(resource, 'No key attribute to update') - return None + return None await asyncio.gather(*futs) @@ -1154,7 +1178,7 @@ class ResourceManager(metaclass=Singleton): resource._state.change_success = all( resource._state.attr_change_success[attr.name] for attr in attrs) - self.log(resource, + self.log(resource, 'KEY attributes FSM terminated with success={}'.format( resource._state.change_success)) @@ -1170,7 +1194,7 @@ class ResourceManager(metaclass=Singleton): def log(self, resource, msg=None): resource._state.log.append(msg) - + # Display on screen #pfx = '[{}] {}: '.format(resource.get_type(), resource.get_uuid()) #print(pfx, msg) @@ -1199,20 +1223,20 @@ class ResourceManager(metaclass=Singleton): pfx = '[{}] {}: '.format(resource.get_type(), resource.get_uuid()) self.log(resource, 'Starting FSM...') - + # When a resource is managed, it will get automatically monitored by # adding the netmon resource on it. from vicn.resource.node import Node if resource.get_type() == 'lxccontainer': - self.log(resource, + self.log(resource, 'Associating monitoring to lxc container resource...') instance = self.create('netmon', node=resource) self.commit_resource(instance) - # FIXME + # FIXME elif resource.get_type() == 'physical' and resource.managed and \ len(self.by_type_str('emulatedchannel')) > 0: - self.log(resource, + self.log(resource, 'Associating monitoring to physical node resource...') instance = self.create('netmon', node=resource) self.commit_resource(instance) @@ -1249,7 +1273,7 @@ class ResourceManager(metaclass=Singleton): elif state == ResourceState.DELETED: raise NotImplementedError # Nothing to do unless explicitely requested - pending_state = None + pending_state = None elif state in [ ResourceState.PENDING_DEPS, @@ -1416,7 +1440,8 @@ class ResourceManager(metaclass=Singleton): self.log(resource, 'CREATE failed: {}'.format(e)) e = resource._state.change_value import traceback; traceback.print_tb(e.__traceback__) - raise NotImplementedError + log.error('Failed with exception {}'.format(e)) + import os; os._exit(1) elif pending_state == ResourceState.PENDING_UPDATE: if resource._state.change_success == True: @@ -1436,7 +1461,8 @@ class ResourceManager(metaclass=Singleton): e = resource._state.change_value resource._state.write_lock.release() import traceback; traceback.print_tb(e.__traceback__) - raise NotImplementedError + log.error('Failed with exception {}'.format(e)) + import os; os._exit(1) elif pending_state == ResourceState.PENDING_DELETE: raise NotImplementedError diff --git a/vicn/core/state.py b/vicn/core/state.py index c32b8237..bb108b2b 100644 --- a/vicn/core/state.py +++ b/vicn/core/state.py @@ -56,6 +56,7 @@ class AttributeState: PENDING_UPDATE = 'PENDING_UPDATE' CLEAN = 'CLEAN' ERROR = 'ERROR' + RESET = 'RESET' class Operations: SET = 'set' |