aboutsummaryrefslogtreecommitdiffstats
path: root/vicn/core/resource_mgr.py
diff options
context:
space:
mode:
Diffstat (limited to 'vicn/core/resource_mgr.py')
-rw-r--r--vicn/core/resource_mgr.py357
1 files changed, 265 insertions, 92 deletions
diff --git a/vicn/core/resource_mgr.py b/vicn/core/resource_mgr.py
index 84deb7b4..8db7f04d 100644
--- a/vicn/core/resource_mgr.py
+++ b/vicn/core/resource_mgr.py
@@ -22,6 +22,7 @@ import time
# LXD workaround
from pylxd.exceptions import NotFound as LxdNotFound, LXDAPIException
+from netmodel.model.collection import Collection
from netmodel.model.filter import Filter
from netmodel.model.query import Query, ACTION_SELECT, ACTION_INSERT
from netmodel.model.query import ACTION_UPDATE, ACTION_SUBSCRIBE
@@ -33,10 +34,10 @@ from netmodel.util.meta import inheritors
from netmodel.util.singleton import Singleton
from netmodel.util.misc import is_iterable
from vicn.core.attribute import NEVER_SET
+from vicn.core.commands import ReturnValue
from vicn.core.exception import VICNException, ResourceNotFound
from vicn.core.resource_factory import ResourceFactory
from vicn.core.resource import Resource, FactoryResource, EmptyResource
-from vicn.core.sa_collections import InstrumentedList
from vicn.core.state import InstanceState, ResourceState
from vicn.core.state import AttributeState, Operations, PendingValue
from vicn.core.task import TaskManager, wait_task, task, async_task
@@ -46,12 +47,14 @@ log = logging.getLogger(__name__)
# NOTE: Do not fully reinitialize a resource after a step fails since it will
# call initialize several times, and might created spurious resources.
-ENABLE_LXD_WORKAROUND = True
+ENABLE_LXD_WORKAROUND = False
+DEFAULT_QTPLAYER_PORT = 8999
# Monitoring queries
Q_SUB_VPP = 'SUBSCRIBE SUM(*) FROM interface WHERE device_name INCLUDED [{}]'
Q_SUB_IF = 'SUBSCRIBE * FROM interface WHERE device_name == "{}"'
+Q_SUB_VPP_IF = 'SUBSCRIBE * FROM vpp_interface WHERE device_name == "{}"'
Q_SUB_STATS = 'SUBSCRIBE * FROM stats'
Q_SUB_EMULATOR_IF = 'SUBSCRIBE * FROM interface WHERE id == "{}"'
Q_SUB_EMULATOR = 'SUBSCRIBE * FROM interface WHERE device_name == "{}"'
@@ -146,6 +149,8 @@ class ResourceManager(metaclass=Singleton):
# Monitoring
self._monitored = set()
self._pending_monitoring = set()
+ self._map_ip_interface = dict()
+ self._monitored_channels = set()
# For debug
self._committed = set()
@@ -200,6 +205,13 @@ class ResourceManager(metaclass=Singleton):
def _broadcast_packet(self, packet):
self._broadcast(packet.to_query())
+ def _on_qtplayer_packet(self, name, packet):
+ query = packet.to_query()
+ query.params['name'] = name
+ query.reply = True
+ self._ws.execute(query)
+ return None
+
def _on_ns_record(self, packet):
query = packet.to_query()
@@ -240,6 +252,35 @@ class ResourceManager(metaclass=Singleton):
return None
return None
return None
+ elif query.object_name == 'vpp_interface':
+ device_names = [value for key, op, value in query.filter.to_list()
+ if key == 'device_name']
+ if not device_names:
+ log.error('No device name in packet=', packet)
+ return
+ device_name = device_names[0]
+ node_name = query.params['node']
+ node = ResourceManager().by_name(node_name)
+ if node is None:
+ print("no node")
+ return None
+ for interface in node.interfaces:
+ if not hasattr(interface, 'vppinterface') or not interface.vppinterface:
+ continue
+ if interface.vppinterface.device_name == device_name:
+ if interface.channel:
+ f = Filter.from_list([['id', '==',
+ interface.channel._state.uuid._uuid]])
+ q = Query(ACTION_UPDATE, 'channel', filter = f,
+ params = query.params)
+ q.reply = True
+ self._ws.execute(q)
+ return None
+ print("no channel")
+ return None
+ print("no vpp interface found")
+ return None
+
return None
def _on_netmon_channel_record(self, packet):
@@ -546,14 +587,8 @@ class ResourceManager(metaclass=Singleton):
await self.wait_attr_init(resource, attribute)
return resource.get(attribute)
- async def attribute_set(self, resource, attribute_name, value,
- blocking=True):
+ async def _attribute_set(self, resource, attribute_name, value):
with await resource._state.write_lock:
- # Add the current operation to the pending list
- # NOTE: collections are unordered and can be updated concurrently
- #self._attribute_set_pending_value(resource, attribute_name)
- resource._state.dirty[attribute_name].trigger(Operations.SET,
- value)
attr_state = resource._state.attr_state[attribute_name]
if attr_state == AttributeState.CLEAN:
@@ -597,8 +632,26 @@ class ResourceManager(metaclass=Singleton):
raise RuntimeError("Resource cannot be in state".format(
resource_state))
- if blocking:
- await self.wait_attr_clean(resource, attribute_name)
+# if blocking:
+# await self.wait_attr_clean(resource, attribute_name)
+
+ def attribute_set(self, resource, attribute_name, value):
+ # Add the current operation to the pending list
+ # NOTE: collections are unordered and can be updated concurrently
+ #self._attribute_set_pending_value(resource, attribute_name)
+ resource._state.dirty[attribute_name].trigger(Operations.SET,
+ value)
+ asyncio.ensure_future(self._attribute_set(resource, attribute_name, value))
+
+ async def attribute_set_async(self, resource, attribute_name, value):
+ # Add the current operation to the pending list
+ # NOTE: collections are unordered and can be updated concurrently
+ #self._attribute_set_pending_value(resource, attribute_name)
+ resource._state.dirty[attribute_name].trigger(Operations.SET,
+ value)
+ await self._attribute_set(resource, attribute_name, value)
+
+
#---------------------------------------------------------------------------
# Resource dependency management
@@ -623,7 +676,7 @@ class ResourceManager(metaclass=Singleton):
deps = [deps]
for dep in deps:
- if attr.key:
+ if resource.has_key_attribute(attr):
if not dep.managed:
continue
dep_pfx = '{}:{}'.format(dep.get_type(), dep.get_uuid())
@@ -687,13 +740,7 @@ class ResourceManager(metaclass=Singleton):
async def _resource_wait_subresources(self, resource):
self.log(resource, S_WAIT_SRS)
- # We should accumulate subresources through the hierarchy
- sr = EmptyResource()
- for base in reversed(resource.__class__.mro()):
- if '__subresources__' not in vars(base):
- continue
- sr = sr > base.__subresources__(resource)
-
+ sr = resource.__subresources__()
if sr is not None and not isinstance(sr, EmptyResource):
resource.set_subresources(sr)
pfx_sr = '{}:{}'.format(sr.get_type(), sr.get_uuid())
@@ -713,19 +760,8 @@ class ResourceManager(metaclass=Singleton):
"""Perform action: __get__, __create__, __delete__ on the full class
hierarchy.
"""
- task = EmptyTask()
- for base in reversed(resource.__class__.mro()):
- # To avoid adding several times the same task
- if action not in vars(base):
- continue
- func = getattr(base, action, None)
- if func is None:
- continue
- t = func(resource)
-
- task = task > t
-
- return task
+ method = getattr(resource, action, None)
+ return method() if method else EmptyTask()
#--------------------------------------------------------------------------
# Resource model
@@ -788,7 +824,11 @@ class ResourceManager(metaclass=Singleton):
ret = fut.result()
resource._state.attr_change_success[attribute.name] = True
resource._state.attr_change_value[attribute.name] = ret
+ except ResourceNotFound as e:
+ resource._state.attr_change_success[attribute.name] = False
+ resource._state.attr_change_value[attribute.name] = e
except Exception as e:
+ import traceback; traceback.print_exc()
resource._state.attr_change_success[attribute.name] = False
resource._state.attr_change_value[attribute.name] = e
resource._state.attr_change_event[attribute.name].set()
@@ -899,7 +939,10 @@ class ResourceManager(metaclass=Singleton):
attrs = resource._state.attr_change_value[attribute.name]
self.attr_log(resource, attribute,
'INIT success. Value = {}'.format(attrs))
- found = self._process_attr_dict(resource, attribute, attrs)
+ if not isinstance(attrs, ReturnValue):
+ found = self._process_attr_dict(resource, attribute, attrs)
+ else:
+ found = self._process_attr_dict(resource, attribute, attrs.stdout)
if not found:
log.error('Attribute missing return attrs: {}'.format(
attrs))
@@ -923,7 +966,7 @@ class ResourceManager(metaclass=Singleton):
if resource._state.attr_change_success[attribute.name] == True:
self.attr_log(resource, attribute,
'UPDATE success. Value = {}. Attribute is CLEAN'.format(attrs))
- if attrs != NEVER_SET:
+ if not isinstance(attrs, ReturnValue) and attrs != NEVER_SET:
# None could be interpreted as the return value. Also,
# we need not to overwrite the value from get
self._process_attr_dict(resource, attribute, attrs)
@@ -931,7 +974,7 @@ class ResourceManager(metaclass=Singleton):
# We might do this for all returned attributes
cur_value = vars(resource)[attribute.name]
if attribute.is_collection:
- tmp = InstrumentedList(pending_value.value)
+ tmp = Collection(pending_value.value)
tmp._attribute = cur_value._attribute
tmp._instance = cur_value._instance
else:
@@ -974,8 +1017,25 @@ class ResourceManager(metaclass=Singleton):
return Query.from_dict(dic)
+ def _monitor_qtplayer(self, resource):
+ try:
+ ip = resource.node.hostname
+ except:
+ ip = str(resource.node.management_interface.ip4_address)
+
+ hook = functools.partial(self._on_qtplayer_packet, resource.node.name)
+ ws = self._router.add_interface('websocketclient', address=ip,
+ port = DEFAULT_QTPLAYER_PORT,
+ hook = hook)
+ q_str = 'SUBSCRIBE * FROM stats'
+ q = self.parse_query(q_str)
+ packet = Packet.from_query(q)
+ self._router._flow_table.add(packet, None, set([ws]))
+ ws.send(packet)
+
def _monitor_netmon(self, resource):
- ip = resource.node.management_interface.ip4_address
+ print("MONITOR NODE", resource.node)
+ ip = str(resource.node.management_interface.ip4_address)
if not ip:
log.error('IP of monitored Node {} is None'.format(resource.node))
import os; os._exit(1)
@@ -986,49 +1046,146 @@ class ResourceManager(metaclass=Singleton):
node = resource.node
for interface in node.interfaces:
if not interface.monitored:
+ print("non monitored interface", interface)
continue
+ print("NETMON MONITOR INTERFACE", interface)
+
+# if interface.get_type() == 'dpdkdevice' and hasattr(node,'vpp'):
+#
+# # Check if vICN has already subscribed for one interface in
+# # the channel
+# if hasattr(interface.channel,'already_subscribed'):
+# continue
+#
+# channel_id = interface.channel._state.uuid._uuid
+#
+# update_vpp = functools.partial(self._on_vpp_record,
+# pylink_id = channel_id)
+# ws_vpp = self._router.add_interface('websocketclient',
+# address=ip, hook=update_vpp)
+#
+# aggregate_interfaces = list()
+# for _interface in node.interfaces:
+# if not _interface.get_type() == 'dpdkdevice' and \
+# _interface.monitored:
+# aggregate_interfaces.append('"' +
+# _interface.device_name + '"')
+#
+# q_str = Q_SUB_VPP.format(','.join(aggregate_interfaces))
+# q = self.parse_query(q_str)
+# packet = Packet.from_query(q)
+# self._router._flow_table.add(packet, None, ws_vpp)
+# ws_vpp.send(packet)
+#
+# # Prevent vICN to subscribe to other interfaces of the same
+# # channel
+# interface.channel.already_subscribed = True
+#
+# else:
+ if hasattr(node, 'vpp') and node.vpp is not None:
+ q_str = Q_SUB_VPP_IF.format(interface.vppinterface.device_name)
+ else:
+ q_str = Q_SUB_IF.format(interface.device_name)
+ log.warning(" -- MONITOR {}".format(q_str))
+ q = self.parse_query(q_str)
+ packet = Packet.from_query(q)
+ self._router._flow_table.add(packet, None, set([ws]))
+ ws.send(packet)
- if interface.get_type() == 'dpdkdevice' and hasattr(node,'vpp'):
+ def _monitor_vpp_interface(self, vpp_interface):
+ print("MONITOR interface", vpp_interface)
+ interface = vpp_interface.parent
+ node = interface.node
+ # XXX only monitor in the topology group
+ if node.get_type() != 'lxccontainer':
+ print("MONITOR -> Ignored: not in container")
+ return
- # Check if vICN has already subscribed for one interface in
- # the channel
- if hasattr(interface.channel,'already_subscribed'):
- continue
+ # We only monitor interfaces to provide data for wired channels
+ channel = interface.channel
+ if channel is None:
+ print("MONITOR -> Ignored: no channel")
+ return
+ if channel.has_type('emulatedchannel'):
+ print("MONITOR -> Ignored: belong to wireless channel")
+ return
- channel_id = interface.channel._state.uuid._uuid
+ # Don't monitor multiple interfaces per channel
+ if channel in self._monitored_channels:
+ print("MONITOR -> Ignored: channel already monitored")
+ return
+ self._monitored_channels.add(channel)
- update_vpp = functools.partial(self._on_vpp_record,
- pylink_id = channel_id)
- ws_vpp = self._router.add_interface('websocketclient',
- address=ip, hook=update_vpp)
+ ip = str(node.management_interface.ip4_address)
+ if not ip:
+ log.error('IP of monitored Node {} is None'.format(resource.node))
+ import os; os._exit(1)
- aggregate_interfaces = list()
- for _interface in node.interfaces:
- if not _interface.get_type() == 'dpdkdevice' and \
- _interface.monitored:
- aggregate_interfaces.append('"' +
- _interface.device_name + '"')
+ # Reuse existing websockets
+ ws = self._map_ip_interface.get(ip)
+ if not ws:
+ ws = self._router.add_interface('websocketclient', address=ip,
+ hook=self._on_netmon_record)
+ self._map_ip_interface[ip] = ws
+
+ q_str = Q_SUB_VPP_IF.format(vpp_interface.device_name)
+ print("MONITOR -> query= {}".format(q_str))
+ q = self.parse_query(q_str)
+ packet = Packet.from_query(q)
+ self._router._flow_table.add(packet, None, set([ws]))
+ ws.send(packet)
+
+ def _monitor_interface(self, interface):
+ print("MONITOR interface", interface)
+ node = interface.node
+ # XXX only monitor in the topology group
+ if node.get_type() != 'lxccontainer':
+ print("MONITOR -> Ignored: not in container")
+ return
- q_str = Q_SUB_VPP.format(','.join(aggregate_interfaces))
- q = self.parse_query(q_str)
- packet = Packet.from_query(q)
- self._router._flow_table.add(packet, None, ws_vpp)
- ws_vpp.send(packet)
+ # Only monitor vpp interfaces on vpp node
+ if hasattr(node, 'vpp') and node.vpp is not None:
+ print("MONITOR -> Ignored: non-vpp interface on vpp node")
+ return
- # Prevent vICN to subscribe to other interfaces of the same
- # channel
- interface.channel.already_subscribed = True
+ # We only monitor interfaces to provide data for wired channels
+ channel = interface.channel
+ if channel is None:
+ print("MONITOR -> Ignored: no channel")
+ return
+ if channel.has_type('emulatedchannel'):
+ print("MONITOR -> Ignored: belong to wireless channel")
+ return
- else:
- q_str = Q_SUB_IF.format(interface.device_name)
- q = self.parse_query(q_str)
- packet = Packet.from_query(q)
- self._router._flow_table.add(packet, None, ws)
- ws.send(packet)
+ # Don't monitor multiple interfaces per channel
+ if channel in self._monitored_channels:
+ print("MONITOR -> Ignored: channel already monitored")
+ return
+ self._monitored_channels.add(channel)
+
+ ip = str(node.management_interface.ip4_address)
+ if not ip:
+ log.error('IP of monitored Node {} is None'.format(resource.node))
+ import os; os._exit(1)
+
+ # Reuse existing websockets
+ ws = self._map_ip_interface.get(ip)
+ if not ws:
+ ws = self._router.add_interface('websocketclient', address=ip,
+ hook=self._on_netmon_record)
+ self._map_ip_interface[ip] = ws
+
+ q_str = Q_SUB_IF.format(interface.device_name)
+ print("MONITOR -> query= {}".format(q_str))
+ q = self.parse_query(q_str)
+ packet = Packet.from_query(q)
+ self._router._flow_table.add(packet, None, set([ws]))
+ ws.send(packet)
def _monitor_emulator(self, resource):
ns = resource
- ip = ns.node.bridge.ip4_address # management_interface.ip_address
+ # XXX UGLY, we have no management interface
+ ip = ns.node.hostname # str(ns.node.interfaces[0].ip4_address)
ws_ns = self._router.add_interface('websocketclient', address = ip,
port = ns.control_port,
@@ -1050,7 +1207,7 @@ class ResourceManager(metaclass=Singleton):
q_str = Q_SUB_EMULATOR_IF.format(identifier)
q = self.parse_query(q_str)
packet = Packet.from_query(q)
- self._router._flow_table.add(packet, None, ws_ns)
+ self._router._flow_table.add(packet, None, set([ws_ns]))
ws_ns.send(packet)
# We also need to subscribe on the node for the tap interfaces
@@ -1059,7 +1216,7 @@ class ResourceManager(metaclass=Singleton):
q_str = Q_SUB_EMULATOR.format(tap.device_name)
q = self.parse_query(q_str)
packet = Packet.from_query(q)
- self._router._flow_table.add(packet, None, ws)
+ self._router._flow_table.add(packet, None, set([ws]))
ws.send(packet)
def _monitor(self, resource):
@@ -1070,29 +1227,37 @@ class ResourceManager(metaclass=Singleton):
self._pending_monitoring.clear()
return
- central_ip = self.by_type_str('centralip')
- if not central_ip:
- raise NotImplementedError('Missing CentralIP in experiment')
- central_ip = central_ip[0]
-
uuid = resource.get_uuid()
- if central_ip._state.state != ResourceState.CLEAN:
- self._pending_monitoring.add(uuid)
- return
+ central_ip = self.by_type_str('centralip')
+ if central_ip:
+ central_ip = central_ip[0]
+
+ if central_ip._state.state != ResourceState.CLEAN:
+ self._pending_monitoring.add(uuid)
+ return
if uuid in self._monitored:
return
self._monitored.add(uuid)
- if resource.get_type() == 'netmon':
- if resource.node.get_type() != 'lxccontainer':
- return
- self._monitor_netmon(resource)
+# if resource.get_type() == 'netmon':
+# if resource.node.get_type() != 'lxccontainer':
+# return
+# self._monitor_netmon(resource)
+
+ if resource.get_type() == 'qtplayer':
+ self._monitor_qtplayer(resource)
elif resource.has_type('emulatedchannel'):
self._monitor_emulator(resource)
+ elif resource.has_type('interface'):
+ self._monitor_interface(resource)
+
+ elif resource.has_type('vppinterface'):
+ self._monitor_vpp_interface(resource)
+
async def __set_resource_state(self, resource, state):
"""Sets the resource state (no-lock version)
@@ -1128,6 +1293,9 @@ class ResourceManager(metaclass=Singleton):
ret = fut.result()
resource._state.change_success = True
resource._state.change_value = ret
+ except ResourceNotFound as e:
+ resource._state.change_success = False
+ resource._state.change_value = e
except Exception as e:
resource._state.change_success = False
resource._state.change_value = e
@@ -1148,7 +1316,7 @@ class ResourceManager(metaclass=Singleton):
for attr in resource.iter_attributes():
if resource.is_local_attribute(attr.name):
continue
- if attr.key:
+ if resource.has_key_attribute(attr):
# Those attributes are already done
continue
@@ -1182,12 +1350,14 @@ class ResourceManager(metaclass=Singleton):
# Monitor all FSM one by one and inform about errors.
futs = list()
attrs = list()
- for attr in resource.get_keys():
- if resource.is_local_attribute(attr.name):
- continue
- attrs.append(attr)
- fut = self.attribute_process(resource, attr)
- futs.append(fut)
+
+ for key in resource.get_keys():
+ for attr in key:
+ if resource.is_local_attribute(attr.name):
+ continue
+ attrs.append(attr)
+ fut = self.attribute_process(resource, attr)
+ futs.append(fut)
if not futs:
self.log(resource, 'No key attribute to update')
@@ -1277,6 +1447,7 @@ class ResourceManager(metaclass=Singleton):
print("------")
import traceback; traceback.print_tb(e.__traceback__)
log.error('Resource: {} - Exception: {}'.format(pfx, e))
+ return
import os; os._exit(1)
elif state == ResourceState.UNINITIALIZED:
pending_state = ResourceState.PENDING_DEPS
@@ -1398,8 +1569,9 @@ class ResourceManager(metaclass=Singleton):
elif pending_state == ResourceState.PENDING_GET:
if resource._state.change_success == True:
attrs = resource._state.change_value
- self.log(resource, S_INIT_DONE.format(attrs))
- self._process_attr_dict(resource, None, attrs)
+ if not isinstance(attrs, ReturnValue):
+ self.log(resource, S_INIT_DONE.format(attrs))
+ self._process_attr_dict(resource, None, attrs)
new_state = ResourceState.CREATED
else:
e = resource._state.change_value
@@ -1452,8 +1624,9 @@ class ResourceManager(metaclass=Singleton):
elif pending_state == ResourceState.PENDING_CREATE:
if resource._state.change_success == True:
attrs = resource._state.change_value
- self.log(resource, S_CREATE_OK.format(attrs))
- self._process_attr_dict(resource, None, attrs)
+ if not isinstance(attrs, ReturnValue):
+ self.log(resource, S_CREATE_OK.format(attrs))
+ self._process_attr_dict(resource, None, attrs)
new_state = ResourceState.CREATED
else:
e = resource._state.change_value