diff options
Diffstat (limited to 'vicn/core/resource_mgr.py')
-rw-r--r-- | vicn/core/resource_mgr.py | 357 |
1 files changed, 265 insertions, 92 deletions
diff --git a/vicn/core/resource_mgr.py b/vicn/core/resource_mgr.py index 84deb7b4..8db7f04d 100644 --- a/vicn/core/resource_mgr.py +++ b/vicn/core/resource_mgr.py @@ -22,6 +22,7 @@ import time # LXD workaround from pylxd.exceptions import NotFound as LxdNotFound, LXDAPIException +from netmodel.model.collection import Collection from netmodel.model.filter import Filter from netmodel.model.query import Query, ACTION_SELECT, ACTION_INSERT from netmodel.model.query import ACTION_UPDATE, ACTION_SUBSCRIBE @@ -33,10 +34,10 @@ from netmodel.util.meta import inheritors from netmodel.util.singleton import Singleton from netmodel.util.misc import is_iterable from vicn.core.attribute import NEVER_SET +from vicn.core.commands import ReturnValue from vicn.core.exception import VICNException, ResourceNotFound from vicn.core.resource_factory import ResourceFactory from vicn.core.resource import Resource, FactoryResource, EmptyResource -from vicn.core.sa_collections import InstrumentedList from vicn.core.state import InstanceState, ResourceState from vicn.core.state import AttributeState, Operations, PendingValue from vicn.core.task import TaskManager, wait_task, task, async_task @@ -46,12 +47,14 @@ log = logging.getLogger(__name__) # NOTE: Do not fully reinitialize a resource after a step fails since it will # call initialize several times, and might created spurious resources. -ENABLE_LXD_WORKAROUND = True +ENABLE_LXD_WORKAROUND = False +DEFAULT_QTPLAYER_PORT = 8999 # Monitoring queries Q_SUB_VPP = 'SUBSCRIBE SUM(*) FROM interface WHERE device_name INCLUDED [{}]' Q_SUB_IF = 'SUBSCRIBE * FROM interface WHERE device_name == "{}"' +Q_SUB_VPP_IF = 'SUBSCRIBE * FROM vpp_interface WHERE device_name == "{}"' Q_SUB_STATS = 'SUBSCRIBE * FROM stats' Q_SUB_EMULATOR_IF = 'SUBSCRIBE * FROM interface WHERE id == "{}"' Q_SUB_EMULATOR = 'SUBSCRIBE * FROM interface WHERE device_name == "{}"' @@ -146,6 +149,8 @@ class ResourceManager(metaclass=Singleton): # Monitoring self._monitored = set() self._pending_monitoring = set() + self._map_ip_interface = dict() + self._monitored_channels = set() # For debug self._committed = set() @@ -200,6 +205,13 @@ class ResourceManager(metaclass=Singleton): def _broadcast_packet(self, packet): self._broadcast(packet.to_query()) + def _on_qtplayer_packet(self, name, packet): + query = packet.to_query() + query.params['name'] = name + query.reply = True + self._ws.execute(query) + return None + def _on_ns_record(self, packet): query = packet.to_query() @@ -240,6 +252,35 @@ class ResourceManager(metaclass=Singleton): return None return None return None + elif query.object_name == 'vpp_interface': + device_names = [value for key, op, value in query.filter.to_list() + if key == 'device_name'] + if not device_names: + log.error('No device name in packet=', packet) + return + device_name = device_names[0] + node_name = query.params['node'] + node = ResourceManager().by_name(node_name) + if node is None: + print("no node") + return None + for interface in node.interfaces: + if not hasattr(interface, 'vppinterface') or not interface.vppinterface: + continue + if interface.vppinterface.device_name == device_name: + if interface.channel: + f = Filter.from_list([['id', '==', + interface.channel._state.uuid._uuid]]) + q = Query(ACTION_UPDATE, 'channel', filter = f, + params = query.params) + q.reply = True + self._ws.execute(q) + return None + print("no channel") + return None + print("no vpp interface found") + return None + return None def _on_netmon_channel_record(self, packet): @@ -546,14 +587,8 @@ class ResourceManager(metaclass=Singleton): await self.wait_attr_init(resource, attribute) return resource.get(attribute) - async def attribute_set(self, resource, attribute_name, value, - blocking=True): + async def _attribute_set(self, resource, attribute_name, value): with await resource._state.write_lock: - # Add the current operation to the pending list - # NOTE: collections are unordered and can be updated concurrently - #self._attribute_set_pending_value(resource, attribute_name) - resource._state.dirty[attribute_name].trigger(Operations.SET, - value) attr_state = resource._state.attr_state[attribute_name] if attr_state == AttributeState.CLEAN: @@ -597,8 +632,26 @@ class ResourceManager(metaclass=Singleton): raise RuntimeError("Resource cannot be in state".format( resource_state)) - if blocking: - await self.wait_attr_clean(resource, attribute_name) +# if blocking: +# await self.wait_attr_clean(resource, attribute_name) + + def attribute_set(self, resource, attribute_name, value): + # Add the current operation to the pending list + # NOTE: collections are unordered and can be updated concurrently + #self._attribute_set_pending_value(resource, attribute_name) + resource._state.dirty[attribute_name].trigger(Operations.SET, + value) + asyncio.ensure_future(self._attribute_set(resource, attribute_name, value)) + + async def attribute_set_async(self, resource, attribute_name, value): + # Add the current operation to the pending list + # NOTE: collections are unordered and can be updated concurrently + #self._attribute_set_pending_value(resource, attribute_name) + resource._state.dirty[attribute_name].trigger(Operations.SET, + value) + await self._attribute_set(resource, attribute_name, value) + + #--------------------------------------------------------------------------- # Resource dependency management @@ -623,7 +676,7 @@ class ResourceManager(metaclass=Singleton): deps = [deps] for dep in deps: - if attr.key: + if resource.has_key_attribute(attr): if not dep.managed: continue dep_pfx = '{}:{}'.format(dep.get_type(), dep.get_uuid()) @@ -687,13 +740,7 @@ class ResourceManager(metaclass=Singleton): async def _resource_wait_subresources(self, resource): self.log(resource, S_WAIT_SRS) - # We should accumulate subresources through the hierarchy - sr = EmptyResource() - for base in reversed(resource.__class__.mro()): - if '__subresources__' not in vars(base): - continue - sr = sr > base.__subresources__(resource) - + sr = resource.__subresources__() if sr is not None and not isinstance(sr, EmptyResource): resource.set_subresources(sr) pfx_sr = '{}:{}'.format(sr.get_type(), sr.get_uuid()) @@ -713,19 +760,8 @@ class ResourceManager(metaclass=Singleton): """Perform action: __get__, __create__, __delete__ on the full class hierarchy. """ - task = EmptyTask() - for base in reversed(resource.__class__.mro()): - # To avoid adding several times the same task - if action not in vars(base): - continue - func = getattr(base, action, None) - if func is None: - continue - t = func(resource) - - task = task > t - - return task + method = getattr(resource, action, None) + return method() if method else EmptyTask() #-------------------------------------------------------------------------- # Resource model @@ -788,7 +824,11 @@ class ResourceManager(metaclass=Singleton): ret = fut.result() resource._state.attr_change_success[attribute.name] = True resource._state.attr_change_value[attribute.name] = ret + except ResourceNotFound as e: + resource._state.attr_change_success[attribute.name] = False + resource._state.attr_change_value[attribute.name] = e except Exception as e: + import traceback; traceback.print_exc() resource._state.attr_change_success[attribute.name] = False resource._state.attr_change_value[attribute.name] = e resource._state.attr_change_event[attribute.name].set() @@ -899,7 +939,10 @@ class ResourceManager(metaclass=Singleton): attrs = resource._state.attr_change_value[attribute.name] self.attr_log(resource, attribute, 'INIT success. Value = {}'.format(attrs)) - found = self._process_attr_dict(resource, attribute, attrs) + if not isinstance(attrs, ReturnValue): + found = self._process_attr_dict(resource, attribute, attrs) + else: + found = self._process_attr_dict(resource, attribute, attrs.stdout) if not found: log.error('Attribute missing return attrs: {}'.format( attrs)) @@ -923,7 +966,7 @@ class ResourceManager(metaclass=Singleton): if resource._state.attr_change_success[attribute.name] == True: self.attr_log(resource, attribute, 'UPDATE success. Value = {}. Attribute is CLEAN'.format(attrs)) - if attrs != NEVER_SET: + if not isinstance(attrs, ReturnValue) and attrs != NEVER_SET: # None could be interpreted as the return value. Also, # we need not to overwrite the value from get self._process_attr_dict(resource, attribute, attrs) @@ -931,7 +974,7 @@ class ResourceManager(metaclass=Singleton): # We might do this for all returned attributes cur_value = vars(resource)[attribute.name] if attribute.is_collection: - tmp = InstrumentedList(pending_value.value) + tmp = Collection(pending_value.value) tmp._attribute = cur_value._attribute tmp._instance = cur_value._instance else: @@ -974,8 +1017,25 @@ class ResourceManager(metaclass=Singleton): return Query.from_dict(dic) + def _monitor_qtplayer(self, resource): + try: + ip = resource.node.hostname + except: + ip = str(resource.node.management_interface.ip4_address) + + hook = functools.partial(self._on_qtplayer_packet, resource.node.name) + ws = self._router.add_interface('websocketclient', address=ip, + port = DEFAULT_QTPLAYER_PORT, + hook = hook) + q_str = 'SUBSCRIBE * FROM stats' + q = self.parse_query(q_str) + packet = Packet.from_query(q) + self._router._flow_table.add(packet, None, set([ws])) + ws.send(packet) + def _monitor_netmon(self, resource): - ip = resource.node.management_interface.ip4_address + print("MONITOR NODE", resource.node) + ip = str(resource.node.management_interface.ip4_address) if not ip: log.error('IP of monitored Node {} is None'.format(resource.node)) import os; os._exit(1) @@ -986,49 +1046,146 @@ class ResourceManager(metaclass=Singleton): node = resource.node for interface in node.interfaces: if not interface.monitored: + print("non monitored interface", interface) continue + print("NETMON MONITOR INTERFACE", interface) + +# if interface.get_type() == 'dpdkdevice' and hasattr(node,'vpp'): +# +# # Check if vICN has already subscribed for one interface in +# # the channel +# if hasattr(interface.channel,'already_subscribed'): +# continue +# +# channel_id = interface.channel._state.uuid._uuid +# +# update_vpp = functools.partial(self._on_vpp_record, +# pylink_id = channel_id) +# ws_vpp = self._router.add_interface('websocketclient', +# address=ip, hook=update_vpp) +# +# aggregate_interfaces = list() +# for _interface in node.interfaces: +# if not _interface.get_type() == 'dpdkdevice' and \ +# _interface.monitored: +# aggregate_interfaces.append('"' + +# _interface.device_name + '"') +# +# q_str = Q_SUB_VPP.format(','.join(aggregate_interfaces)) +# q = self.parse_query(q_str) +# packet = Packet.from_query(q) +# self._router._flow_table.add(packet, None, ws_vpp) +# ws_vpp.send(packet) +# +# # Prevent vICN to subscribe to other interfaces of the same +# # channel +# interface.channel.already_subscribed = True +# +# else: + if hasattr(node, 'vpp') and node.vpp is not None: + q_str = Q_SUB_VPP_IF.format(interface.vppinterface.device_name) + else: + q_str = Q_SUB_IF.format(interface.device_name) + log.warning(" -- MONITOR {}".format(q_str)) + q = self.parse_query(q_str) + packet = Packet.from_query(q) + self._router._flow_table.add(packet, None, set([ws])) + ws.send(packet) - if interface.get_type() == 'dpdkdevice' and hasattr(node,'vpp'): + def _monitor_vpp_interface(self, vpp_interface): + print("MONITOR interface", vpp_interface) + interface = vpp_interface.parent + node = interface.node + # XXX only monitor in the topology group + if node.get_type() != 'lxccontainer': + print("MONITOR -> Ignored: not in container") + return - # Check if vICN has already subscribed for one interface in - # the channel - if hasattr(interface.channel,'already_subscribed'): - continue + # We only monitor interfaces to provide data for wired channels + channel = interface.channel + if channel is None: + print("MONITOR -> Ignored: no channel") + return + if channel.has_type('emulatedchannel'): + print("MONITOR -> Ignored: belong to wireless channel") + return - channel_id = interface.channel._state.uuid._uuid + # Don't monitor multiple interfaces per channel + if channel in self._monitored_channels: + print("MONITOR -> Ignored: channel already monitored") + return + self._monitored_channels.add(channel) - update_vpp = functools.partial(self._on_vpp_record, - pylink_id = channel_id) - ws_vpp = self._router.add_interface('websocketclient', - address=ip, hook=update_vpp) + ip = str(node.management_interface.ip4_address) + if not ip: + log.error('IP of monitored Node {} is None'.format(resource.node)) + import os; os._exit(1) - aggregate_interfaces = list() - for _interface in node.interfaces: - if not _interface.get_type() == 'dpdkdevice' and \ - _interface.monitored: - aggregate_interfaces.append('"' + - _interface.device_name + '"') + # Reuse existing websockets + ws = self._map_ip_interface.get(ip) + if not ws: + ws = self._router.add_interface('websocketclient', address=ip, + hook=self._on_netmon_record) + self._map_ip_interface[ip] = ws + + q_str = Q_SUB_VPP_IF.format(vpp_interface.device_name) + print("MONITOR -> query= {}".format(q_str)) + q = self.parse_query(q_str) + packet = Packet.from_query(q) + self._router._flow_table.add(packet, None, set([ws])) + ws.send(packet) + + def _monitor_interface(self, interface): + print("MONITOR interface", interface) + node = interface.node + # XXX only monitor in the topology group + if node.get_type() != 'lxccontainer': + print("MONITOR -> Ignored: not in container") + return - q_str = Q_SUB_VPP.format(','.join(aggregate_interfaces)) - q = self.parse_query(q_str) - packet = Packet.from_query(q) - self._router._flow_table.add(packet, None, ws_vpp) - ws_vpp.send(packet) + # Only monitor vpp interfaces on vpp node + if hasattr(node, 'vpp') and node.vpp is not None: + print("MONITOR -> Ignored: non-vpp interface on vpp node") + return - # Prevent vICN to subscribe to other interfaces of the same - # channel - interface.channel.already_subscribed = True + # We only monitor interfaces to provide data for wired channels + channel = interface.channel + if channel is None: + print("MONITOR -> Ignored: no channel") + return + if channel.has_type('emulatedchannel'): + print("MONITOR -> Ignored: belong to wireless channel") + return - else: - q_str = Q_SUB_IF.format(interface.device_name) - q = self.parse_query(q_str) - packet = Packet.from_query(q) - self._router._flow_table.add(packet, None, ws) - ws.send(packet) + # Don't monitor multiple interfaces per channel + if channel in self._monitored_channels: + print("MONITOR -> Ignored: channel already monitored") + return + self._monitored_channels.add(channel) + + ip = str(node.management_interface.ip4_address) + if not ip: + log.error('IP of monitored Node {} is None'.format(resource.node)) + import os; os._exit(1) + + # Reuse existing websockets + ws = self._map_ip_interface.get(ip) + if not ws: + ws = self._router.add_interface('websocketclient', address=ip, + hook=self._on_netmon_record) + self._map_ip_interface[ip] = ws + + q_str = Q_SUB_IF.format(interface.device_name) + print("MONITOR -> query= {}".format(q_str)) + q = self.parse_query(q_str) + packet = Packet.from_query(q) + self._router._flow_table.add(packet, None, set([ws])) + ws.send(packet) def _monitor_emulator(self, resource): ns = resource - ip = ns.node.bridge.ip4_address # management_interface.ip_address + # XXX UGLY, we have no management interface + ip = ns.node.hostname # str(ns.node.interfaces[0].ip4_address) ws_ns = self._router.add_interface('websocketclient', address = ip, port = ns.control_port, @@ -1050,7 +1207,7 @@ class ResourceManager(metaclass=Singleton): q_str = Q_SUB_EMULATOR_IF.format(identifier) q = self.parse_query(q_str) packet = Packet.from_query(q) - self._router._flow_table.add(packet, None, ws_ns) + self._router._flow_table.add(packet, None, set([ws_ns])) ws_ns.send(packet) # We also need to subscribe on the node for the tap interfaces @@ -1059,7 +1216,7 @@ class ResourceManager(metaclass=Singleton): q_str = Q_SUB_EMULATOR.format(tap.device_name) q = self.parse_query(q_str) packet = Packet.from_query(q) - self._router._flow_table.add(packet, None, ws) + self._router._flow_table.add(packet, None, set([ws])) ws.send(packet) def _monitor(self, resource): @@ -1070,29 +1227,37 @@ class ResourceManager(metaclass=Singleton): self._pending_monitoring.clear() return - central_ip = self.by_type_str('centralip') - if not central_ip: - raise NotImplementedError('Missing CentralIP in experiment') - central_ip = central_ip[0] - uuid = resource.get_uuid() - if central_ip._state.state != ResourceState.CLEAN: - self._pending_monitoring.add(uuid) - return + central_ip = self.by_type_str('centralip') + if central_ip: + central_ip = central_ip[0] + + if central_ip._state.state != ResourceState.CLEAN: + self._pending_monitoring.add(uuid) + return if uuid in self._monitored: return self._monitored.add(uuid) - if resource.get_type() == 'netmon': - if resource.node.get_type() != 'lxccontainer': - return - self._monitor_netmon(resource) +# if resource.get_type() == 'netmon': +# if resource.node.get_type() != 'lxccontainer': +# return +# self._monitor_netmon(resource) + + if resource.get_type() == 'qtplayer': + self._monitor_qtplayer(resource) elif resource.has_type('emulatedchannel'): self._monitor_emulator(resource) + elif resource.has_type('interface'): + self._monitor_interface(resource) + + elif resource.has_type('vppinterface'): + self._monitor_vpp_interface(resource) + async def __set_resource_state(self, resource, state): """Sets the resource state (no-lock version) @@ -1128,6 +1293,9 @@ class ResourceManager(metaclass=Singleton): ret = fut.result() resource._state.change_success = True resource._state.change_value = ret + except ResourceNotFound as e: + resource._state.change_success = False + resource._state.change_value = e except Exception as e: resource._state.change_success = False resource._state.change_value = e @@ -1148,7 +1316,7 @@ class ResourceManager(metaclass=Singleton): for attr in resource.iter_attributes(): if resource.is_local_attribute(attr.name): continue - if attr.key: + if resource.has_key_attribute(attr): # Those attributes are already done continue @@ -1182,12 +1350,14 @@ class ResourceManager(metaclass=Singleton): # Monitor all FSM one by one and inform about errors. futs = list() attrs = list() - for attr in resource.get_keys(): - if resource.is_local_attribute(attr.name): - continue - attrs.append(attr) - fut = self.attribute_process(resource, attr) - futs.append(fut) + + for key in resource.get_keys(): + for attr in key: + if resource.is_local_attribute(attr.name): + continue + attrs.append(attr) + fut = self.attribute_process(resource, attr) + futs.append(fut) if not futs: self.log(resource, 'No key attribute to update') @@ -1277,6 +1447,7 @@ class ResourceManager(metaclass=Singleton): print("------") import traceback; traceback.print_tb(e.__traceback__) log.error('Resource: {} - Exception: {}'.format(pfx, e)) + return import os; os._exit(1) elif state == ResourceState.UNINITIALIZED: pending_state = ResourceState.PENDING_DEPS @@ -1398,8 +1569,9 @@ class ResourceManager(metaclass=Singleton): elif pending_state == ResourceState.PENDING_GET: if resource._state.change_success == True: attrs = resource._state.change_value - self.log(resource, S_INIT_DONE.format(attrs)) - self._process_attr_dict(resource, None, attrs) + if not isinstance(attrs, ReturnValue): + self.log(resource, S_INIT_DONE.format(attrs)) + self._process_attr_dict(resource, None, attrs) new_state = ResourceState.CREATED else: e = resource._state.change_value @@ -1452,8 +1624,9 @@ class ResourceManager(metaclass=Singleton): elif pending_state == ResourceState.PENDING_CREATE: if resource._state.change_success == True: attrs = resource._state.change_value - self.log(resource, S_CREATE_OK.format(attrs)) - self._process_attr_dict(resource, None, attrs) + if not isinstance(attrs, ReturnValue): + self.log(resource, S_CREATE_OK.format(attrs)) + self._process_attr_dict(resource, None, attrs) new_state = ResourceState.CREATED else: e = resource._state.change_value |