summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane')
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py90
-rwxr-xr-xscripts/automation/trex_control_plane/client_utils/jsonrpc_client.py209
-rwxr-xr-xscripts/automation/trex_control_plane/common/trex_stats.py45
3 files changed, 116 insertions, 228 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index b25d5cd5..9e49b852 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -9,6 +9,7 @@ except ImportError:
from client_utils.jsonrpc_client import JsonRpcClient
from client_utils.packet_builder import CTRexPktBuilder
import json
+from common.trex_stats import *
class CTRexStatelessClient(object):
@@ -17,31 +18,62 @@ class CTRexStatelessClient(object):
super(CTRexStatelessClient, self).__init__()
self.tx_link = CTRexStatelessClient.CTxLink(server, port, virtual)
self._conn_handler = {}
-
- def owned(func):
- def wrapper(self, *args, **kwargs):
- if self._conn_handler.get(kwargs.get("port_id")):
- return func(self, *args, **kwargs)
- else:
- raise RuntimeError("The requested method ('{0}') cannot be invoked unless the desired port is owned".
- format(func.__name__))
+ self._active_ports = set()
+ self._port_stats = CTRexStatsManager()
+ self._stream_stats = CTRexStatsManager()
+
+
+ # ----- decorator methods ----- #
+ def force_status(owned=True, active=False):
+ def wrapper(func):
+ def wrapper_f(self, *args, **kwargs):
+ port_ids = kwargs.get("port_id")
+ if isinstance(port_ids, int):
+ # make sure port_ids is a list
+ port_ids = [port_ids]
+ bad_ids = set()
+ for port_id in port_ids:
+ if not self._conn_handler.get(kwargs.get(port_id)):
+ bad_ids.add(port_ids)
+ if bad_ids:
+ # Some port IDs are not according to desires status
+ own_str = "owned" if owned else "not-owned"
+ act_str = "active" if active else "non-active"
+ raise RuntimeError("The requested method ('{0}') cannot be invoked since port IDs {1} are not both" \
+ "{2} and {3}".format(func.__name__,
+ bad_ids,
+ own_str,
+ act_str))
+ else:
+ func(self, *args, **kwargs)
+ return wrapper_f
return wrapper
+ # def owned(func):
+ # def wrapper(self, *args, **kwargs):
+ # if self._conn_handler.get(kwargs.get("port_id")):
+ # return func(self, *args, **kwargs)
+ # else:
+ # raise RuntimeError("The requested method ('{0}') cannot be invoked unless the desired port is owned".
+ # format(func.__name__))
+ # return wrapper
+
+ # ----- user-access methods ----- #
def acquire(self, port_id, username, force=False):
params = {"port_id": port_id,
"user": username,
"force": force}
self._conn_handler[port_id] = self.transmit("acquire", params)
- return self._conn_handler
+ return self._conn_handler[port_id]
- @owned
+ @force_status(owned=True)
def release(self, port_id=None):
self._conn_handler.pop(port_id)
params = {"handler": self._conn_handler.get(port_id),
"port_id": port_id}
return self.transmit("release", params)
- @owned
+ @force_status(owned=True)
def add_stream(self, stream_id, stream_obj, port_id=None):
assert isinstance(stream_obj, CStream)
params = {"handler": self._conn_handler.get(port_id),
@@ -50,15 +82,15 @@ class CTRexStatelessClient(object):
"stream": stream_obj.dump()}
return self.transmit("add_stream", params)
- @owned
+ @force_status(owned=True)
def remove_stream(self, stream_id, port_id=None):
params = {"handler": self._conn_handler.get(port_id),
"port_id": port_id,
"stream_id": stream_id}
return self.transmit("remove_stream", params)
- @owned
- def get_stream_list(self, port_id=None):
+ @force_status(owned=True,active=)
+ def get_stream_id_list(self, port_id=None):
params = {"handler": self._conn_handler.get(port_id),
"port_id": port_id}
return self.transmit("get_stream_list", params)
@@ -86,21 +118,35 @@ class CTRexStatelessClient(object):
return self.transmit("get_global_stats")
@owned
- def stop_traffic(self, port_id=None):
- params = {"handler": self._conn_handler.get(port_id),
+ def get_port_stats(self, port_id=None):
+ params = {"handler": self._conn_handler.get(port_id), # TODO: verify if needed
"port_id": port_id}
- return self.transmit("stop_traffic", params)
-
-
-
+ return self.transmit("get_port_stats", params)
+ @owned
+ def get_stream_stats(self, port_id=None):
+ params = {"handler": self._conn_handler.get(port_id), # TODO: verify if needed
+ "port_id": port_id}
+ return self.transmit("get_stream_stats", params)
+ # ----- internal methods ----- #
+ def transmit(self, method_name, params={}):
+ return self.tx_link.transmit(method_name, params)
+ @staticmethod
+ def _object_decoder(obj_type, obj_data):
+ if obj_type=="global":
+ return CGlobalStats(**obj_data)
+ elif obj_type=="port":
+ return CPortStats(**obj_data)
+ elif obj_type=="stream":
+ return CStreamStats(**obj_data)
+ else:
+ # Do not serialize the data into class
+ return obj_data
- def transmit(self, method_name, params={}):
- return self.tx_link.transmit(method_name, params)
# ------ private classes ------ #
diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
index 51bb3a14..a5adc485 100755
--- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
+++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
@@ -60,6 +60,7 @@ class JsonRpcClient(object):
return rc
+ # pretty print for JSON
def pretty_json (self, json_str, use_colors = True):
pretty_str = json.dumps(json.loads(json_str), indent = 4, separators=(',', ': '), sort_keys = True)
@@ -87,6 +88,7 @@ class JsonRpcClient(object):
print "[verbose] " + msg
+ # batch messages
def create_batch (self):
return BatchMessage(self)
@@ -114,6 +116,7 @@ class JsonRpcClient(object):
return self.send_raw_msg(msg, block)
+ # low level send of string message
def send_raw_msg (self, msg, block = False):
self.verbose_msg("Sending Request To Server:\n\n" + self.pretty_json(msg) + "\n")
@@ -241,209 +244,3 @@ class JsonRpcClient(object):
print "Shutting down RPC client\n"
if hasattr(self, "context"):
self.context.destroy(linger=0)
-
-# MOVE THIS TO DAN'S FILE
-class TrexStatelessClient(JsonRpcClient):
-
- def __init__ (self, server, port, user):
-
- super(TrexStatelessClient, self).__init__(server, port)
-
- self.user = user
- self.port_handlers = {}
-
- self.supported_cmds = []
- self.system_info = None
- self.server_version = None
-
-
- def whoami (self):
- return self.user
-
- def ping_rpc_server(self):
-
- return self.invoke_rpc_method("ping", block = False)
-
- def get_rpc_server_version (self):
- return self.server_version
-
- def get_system_info (self):
- return self.system_info
-
- def get_supported_cmds(self):
- return self.supported_cmds
-
- def get_port_count (self):
- if not self.system_info:
- return 0
-
- return self.system_info["port_count"]
-
- # refresh the client for transient data
- def refresh (self):
-
- # get server versionrc, msg = self.get_supported_cmds()
- rc, msg = self.invoke_rpc_method("get_version")
- if not rc:
- self.disconnect()
- return rc, msg
-
- self.server_version = msg
-
- # get supported commands
- rc, msg = self.invoke_rpc_method("get_supported_cmds")
- if not rc:
- self.disconnect()
- return rc, msg
-
- self.supported_cmds = [str(x) for x in msg if x]
-
- # get system info
- rc, msg = self.invoke_rpc_method("get_system_info")
- if not rc:
- self.disconnect()
- return rc, msg
-
- self.system_info = msg
-
- return True, ""
-
- def connect (self):
- rc, err = super(TrexStatelessClient, self).connect()
- if not rc:
- return rc, err
-
- return self.refresh()
-
-
- # take ownership over ports
- def take_ownership (self, port_id_array, force = False):
- if not self.connected:
- return False, "Not connected to server"
-
- batch = self.create_batch()
-
- for port_id in port_id_array:
- batch.add("acquire", params = {"port_id":port_id, "user":self.user, "force":force})
-
- rc, resp_list = batch.invoke()
- if not rc:
- return rc, resp_list
-
- for i, rc in enumerate(resp_list):
- if rc[0]:
- self.port_handlers[port_id_array[i]] = rc[1]
-
- return True, resp_list
-
-
- def release_ports (self, port_id_array):
- batch = self.create_batch()
-
- for port_id in port_id_array:
-
- # let the server handle un-acquired errors
- if self.port_handlers.get(port_id):
- handler = self.port_handlers[port_id]
- else:
- handler = ""
-
- batch.add("release", params = {"port_id":port_id, "handler":handler})
-
-
- rc, resp_list = batch.invoke()
- if not rc:
- return rc, resp_list
-
- for i, rc in enumerate(resp_list):
- if rc[0]:
- self.port_handlers.pop(port_id_array[i])
-
- return True, resp_list
-
- def get_owned_ports (self):
- return self.port_handlers.keys()
-
- # fetch port stats
- def get_port_stats (self, port_id_array):
- if not self.connected:
- return False, "Not connected to server"
-
- batch = self.create_batch()
-
- # empty list means all
- if port_id_array == []:
- port_id_array = list([x for x in xrange(0, self.system_info["port_count"])])
-
- for port_id in port_id_array:
-
- # let the server handle un-acquired errors
- if self.port_handlers.get(port_id):
- handler = self.port_handlers[port_id]
- else:
- handler = ""
-
- batch.add("get_port_stats", params = {"port_id":port_id, "handler":handler})
-
-
- rc, resp_list = batch.invoke()
-
- return rc, resp_list
-
- # snapshot will take a snapshot of all your owned ports for streams and etc.
- def snapshot(self):
-
-
- if len(self.get_owned_ports()) == 0:
- return {}
-
- snap = {}
-
- batch = self.create_batch()
-
- for port_id in self.get_owned_ports():
-
- batch.add("get_port_stats", params = {"port_id": port_id, "handler": self.port_handlers[port_id]})
- batch.add("get_stream_list", params = {"port_id": port_id, "handler": self.port_handlers[port_id]})
-
- rc, resp_list = batch.invoke()
- if not rc:
- return rc, resp_list
-
- # split the list to 2s
- index = 0
- for port_id in self.get_owned_ports():
- if not resp_list[index] or not resp_list[index + 1]:
- snap[port_id] = None
- continue
-
- # fetch the first two
- stats = resp_list[index][1]
- stream_list = resp_list[index + 1][1]
-
- port = {}
- port['status'] = stats['status']
- port['stream_list'] = []
-
- # get all the streams
- if len(stream_list) > 0:
- batch = self.create_batch()
- for stream_id in stream_list:
- batch.add("get_stream", params = {"port_id": port_id, "stream_id": stream_id, "handler": self.port_handlers[port_id]})
-
- rc, stream_resp_list = batch.invoke()
- if not rc:
- port = {}
-
- port['streams'] = {}
- for i, resp in enumerate(stream_resp_list):
- if resp[0]:
- port['streams'][stream_list[i]] = resp[1]
-
- snap[port_id] = port
-
- # move to next one
- index += 2
-
-
- return snap
diff --git a/scripts/automation/trex_control_plane/common/trex_stats.py b/scripts/automation/trex_control_plane/common/trex_stats.py
new file mode 100755
index 00000000..62c3a890
--- /dev/null
+++ b/scripts/automation/trex_control_plane/common/trex_stats.py
@@ -0,0 +1,45 @@
+#!/router/bin/python
+import copy
+
+class CTRexStatsManager(object):
+
+ def __init__(self):
+ self._stats = {}
+ pass
+
+ def update(self, obj_id, stats_obj):
+ assert isinstance(stats_obj, CTRexStats)
+ self._stats[obj_id] = stats_obj
+
+ def get_stats(self, obj_id):
+ return copy.copy(self._stats.pop(obj_id))
+
+
+
+
+class CTRexStats(object):
+ def __init__(self, **kwargs):
+ for k, v in kwargs.items():
+ setattr(self, k, v)
+
+
+class CGlobalStats(CTRexStats):
+ def __init__(self, **kwargs):
+ super(CGlobalStats, self).__init__(kwargs)
+ pass
+
+
+class CPortStats(CTRexStats):
+ def __init__(self, **kwargs):
+ super(CPortStats, self).__init__(kwargs)
+ pass
+
+
+class CStreamStats(CTRexStats):
+ def __init__(self, **kwargs):
+ super(CStreamStats, self).__init__(kwargs)
+ pass
+
+
+if __name__ == "__main__":
+ pass