diff options
3 files changed, 116 insertions, 228 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index b25d5cd5..9e49b852 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -9,6 +9,7 @@ except ImportError: from client_utils.jsonrpc_client import JsonRpcClient from client_utils.packet_builder import CTRexPktBuilder import json +from common.trex_stats import * class CTRexStatelessClient(object): @@ -17,31 +18,62 @@ class CTRexStatelessClient(object): super(CTRexStatelessClient, self).__init__() self.tx_link = CTRexStatelessClient.CTxLink(server, port, virtual) self._conn_handler = {} - - def owned(func): - def wrapper(self, *args, **kwargs): - if self._conn_handler.get(kwargs.get("port_id")): - return func(self, *args, **kwargs) - else: - raise RuntimeError("The requested method ('{0}') cannot be invoked unless the desired port is owned". - format(func.__name__)) + self._active_ports = set() + self._port_stats = CTRexStatsManager() + self._stream_stats = CTRexStatsManager() + + + # ----- decorator methods ----- # + def force_status(owned=True, active=False): + def wrapper(func): + def wrapper_f(self, *args, **kwargs): + port_ids = kwargs.get("port_id") + if isinstance(port_ids, int): + # make sure port_ids is a list + port_ids = [port_ids] + bad_ids = set() + for port_id in port_ids: + if not self._conn_handler.get(kwargs.get(port_id)): + bad_ids.add(port_ids) + if bad_ids: + # Some port IDs are not according to desires status + own_str = "owned" if owned else "not-owned" + act_str = "active" if active else "non-active" + raise RuntimeError("The requested method ('{0}') cannot be invoked since port IDs {1} are not both" \ + "{2} and {3}".format(func.__name__, + bad_ids, + own_str, + act_str)) + else: + func(self, *args, **kwargs) + return wrapper_f return wrapper + # def owned(func): + # def wrapper(self, *args, **kwargs): + # if self._conn_handler.get(kwargs.get("port_id")): + # return func(self, *args, **kwargs) + # else: + # raise RuntimeError("The requested method ('{0}') cannot be invoked unless the desired port is owned". + # format(func.__name__)) + # return wrapper + + # ----- user-access methods ----- # def acquire(self, port_id, username, force=False): params = {"port_id": port_id, "user": username, "force": force} self._conn_handler[port_id] = self.transmit("acquire", params) - return self._conn_handler + return self._conn_handler[port_id] - @owned + @force_status(owned=True) def release(self, port_id=None): self._conn_handler.pop(port_id) params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} return self.transmit("release", params) - @owned + @force_status(owned=True) def add_stream(self, stream_id, stream_obj, port_id=None): assert isinstance(stream_obj, CStream) params = {"handler": self._conn_handler.get(port_id), @@ -50,15 +82,15 @@ class CTRexStatelessClient(object): "stream": stream_obj.dump()} return self.transmit("add_stream", params) - @owned + @force_status(owned=True) def remove_stream(self, stream_id, port_id=None): params = {"handler": self._conn_handler.get(port_id), "port_id": port_id, "stream_id": stream_id} return self.transmit("remove_stream", params) - @owned - def get_stream_list(self, port_id=None): + @force_status(owned=True,active=) + def get_stream_id_list(self, port_id=None): params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} return self.transmit("get_stream_list", params) @@ -86,21 +118,35 @@ class CTRexStatelessClient(object): return self.transmit("get_global_stats") @owned - def stop_traffic(self, port_id=None): - params = {"handler": self._conn_handler.get(port_id), + def get_port_stats(self, port_id=None): + params = {"handler": self._conn_handler.get(port_id), # TODO: verify if needed "port_id": port_id} - return self.transmit("stop_traffic", params) - - - + return self.transmit("get_port_stats", params) + @owned + def get_stream_stats(self, port_id=None): + params = {"handler": self._conn_handler.get(port_id), # TODO: verify if needed + "port_id": port_id} + return self.transmit("get_stream_stats", params) + # ----- internal methods ----- # + def transmit(self, method_name, params={}): + return self.tx_link.transmit(method_name, params) + @staticmethod + def _object_decoder(obj_type, obj_data): + if obj_type=="global": + return CGlobalStats(**obj_data) + elif obj_type=="port": + return CPortStats(**obj_data) + elif obj_type=="stream": + return CStreamStats(**obj_data) + else: + # Do not serialize the data into class + return obj_data - def transmit(self, method_name, params={}): - return self.tx_link.transmit(method_name, params) # ------ private classes ------ # diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py index 51bb3a14..a5adc485 100755 --- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py +++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py @@ -60,6 +60,7 @@ class JsonRpcClient(object): return rc + # pretty print for JSON def pretty_json (self, json_str, use_colors = True): pretty_str = json.dumps(json.loads(json_str), indent = 4, separators=(',', ': '), sort_keys = True) @@ -87,6 +88,7 @@ class JsonRpcClient(object): print "[verbose] " + msg + # batch messages def create_batch (self): return BatchMessage(self) @@ -114,6 +116,7 @@ class JsonRpcClient(object): return self.send_raw_msg(msg, block) + # low level send of string message def send_raw_msg (self, msg, block = False): self.verbose_msg("Sending Request To Server:\n\n" + self.pretty_json(msg) + "\n") @@ -241,209 +244,3 @@ class JsonRpcClient(object): print "Shutting down RPC client\n" if hasattr(self, "context"): self.context.destroy(linger=0) - -# MOVE THIS TO DAN'S FILE -class TrexStatelessClient(JsonRpcClient): - - def __init__ (self, server, port, user): - - super(TrexStatelessClient, self).__init__(server, port) - - self.user = user - self.port_handlers = {} - - self.supported_cmds = [] - self.system_info = None - self.server_version = None - - - def whoami (self): - return self.user - - def ping_rpc_server(self): - - return self.invoke_rpc_method("ping", block = False) - - def get_rpc_server_version (self): - return self.server_version - - def get_system_info (self): - return self.system_info - - def get_supported_cmds(self): - return self.supported_cmds - - def get_port_count (self): - if not self.system_info: - return 0 - - return self.system_info["port_count"] - - # refresh the client for transient data - def refresh (self): - - # get server versionrc, msg = self.get_supported_cmds() - rc, msg = self.invoke_rpc_method("get_version") - if not rc: - self.disconnect() - return rc, msg - - self.server_version = msg - - # get supported commands - rc, msg = self.invoke_rpc_method("get_supported_cmds") - if not rc: - self.disconnect() - return rc, msg - - self.supported_cmds = [str(x) for x in msg if x] - - # get system info - rc, msg = self.invoke_rpc_method("get_system_info") - if not rc: - self.disconnect() - return rc, msg - - self.system_info = msg - - return True, "" - - def connect (self): - rc, err = super(TrexStatelessClient, self).connect() - if not rc: - return rc, err - - return self.refresh() - - - # take ownership over ports - def take_ownership (self, port_id_array, force = False): - if not self.connected: - return False, "Not connected to server" - - batch = self.create_batch() - - for port_id in port_id_array: - batch.add("acquire", params = {"port_id":port_id, "user":self.user, "force":force}) - - rc, resp_list = batch.invoke() - if not rc: - return rc, resp_list - - for i, rc in enumerate(resp_list): - if rc[0]: - self.port_handlers[port_id_array[i]] = rc[1] - - return True, resp_list - - - def release_ports (self, port_id_array): - batch = self.create_batch() - - for port_id in port_id_array: - - # let the server handle un-acquired errors - if self.port_handlers.get(port_id): - handler = self.port_handlers[port_id] - else: - handler = "" - - batch.add("release", params = {"port_id":port_id, "handler":handler}) - - - rc, resp_list = batch.invoke() - if not rc: - return rc, resp_list - - for i, rc in enumerate(resp_list): - if rc[0]: - self.port_handlers.pop(port_id_array[i]) - - return True, resp_list - - def get_owned_ports (self): - return self.port_handlers.keys() - - # fetch port stats - def get_port_stats (self, port_id_array): - if not self.connected: - return False, "Not connected to server" - - batch = self.create_batch() - - # empty list means all - if port_id_array == []: - port_id_array = list([x for x in xrange(0, self.system_info["port_count"])]) - - for port_id in port_id_array: - - # let the server handle un-acquired errors - if self.port_handlers.get(port_id): - handler = self.port_handlers[port_id] - else: - handler = "" - - batch.add("get_port_stats", params = {"port_id":port_id, "handler":handler}) - - - rc, resp_list = batch.invoke() - - return rc, resp_list - - # snapshot will take a snapshot of all your owned ports for streams and etc. - def snapshot(self): - - - if len(self.get_owned_ports()) == 0: - return {} - - snap = {} - - batch = self.create_batch() - - for port_id in self.get_owned_ports(): - - batch.add("get_port_stats", params = {"port_id": port_id, "handler": self.port_handlers[port_id]}) - batch.add("get_stream_list", params = {"port_id": port_id, "handler": self.port_handlers[port_id]}) - - rc, resp_list = batch.invoke() - if not rc: - return rc, resp_list - - # split the list to 2s - index = 0 - for port_id in self.get_owned_ports(): - if not resp_list[index] or not resp_list[index + 1]: - snap[port_id] = None - continue - - # fetch the first two - stats = resp_list[index][1] - stream_list = resp_list[index + 1][1] - - port = {} - port['status'] = stats['status'] - port['stream_list'] = [] - - # get all the streams - if len(stream_list) > 0: - batch = self.create_batch() - for stream_id in stream_list: - batch.add("get_stream", params = {"port_id": port_id, "stream_id": stream_id, "handler": self.port_handlers[port_id]}) - - rc, stream_resp_list = batch.invoke() - if not rc: - port = {} - - port['streams'] = {} - for i, resp in enumerate(stream_resp_list): - if resp[0]: - port['streams'][stream_list[i]] = resp[1] - - snap[port_id] = port - - # move to next one - index += 2 - - - return snap diff --git a/scripts/automation/trex_control_plane/common/trex_stats.py b/scripts/automation/trex_control_plane/common/trex_stats.py new file mode 100755 index 00000000..62c3a890 --- /dev/null +++ b/scripts/automation/trex_control_plane/common/trex_stats.py @@ -0,0 +1,45 @@ +#!/router/bin/python +import copy + +class CTRexStatsManager(object): + + def __init__(self): + self._stats = {} + pass + + def update(self, obj_id, stats_obj): + assert isinstance(stats_obj, CTRexStats) + self._stats[obj_id] = stats_obj + + def get_stats(self, obj_id): + return copy.copy(self._stats.pop(obj_id)) + + + + +class CTRexStats(object): + def __init__(self, **kwargs): + for k, v in kwargs.items(): + setattr(self, k, v) + + +class CGlobalStats(CTRexStats): + def __init__(self, **kwargs): + super(CGlobalStats, self).__init__(kwargs) + pass + + +class CPortStats(CTRexStats): + def __init__(self, **kwargs): + super(CPortStats, self).__init__(kwargs) + pass + + +class CStreamStats(CTRexStats): + def __init__(self, **kwargs): + super(CStreamStats, self).__init__(kwargs) + pass + + +if __name__ == "__main__": + pass |