diff options
-rwxr-xr-x | scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py | 270 |
1 files changed, 255 insertions, 15 deletions
diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py index 163c6923..8c8987b6 100755 --- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py +++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py @@ -6,9 +6,6 @@ import json import general_utils import re from time import sleep -from collections import namedtuple - -CmdResponse = namedtuple('CmdResponse', ['success', 'data']) class bcolors: BLUE = '\033[94m' @@ -26,12 +23,12 @@ class BatchMessage(object): self.rpc_client = rpc_client self.batch_list = [] - def add (self, method_name, params={}): + def add (self, method_name, params = {}): id, msg = self.rpc_client.create_jsonrpc_v2(method_name, params, encode = False) self.batch_list.append(msg) - def invoke(self, block = False): + def invoke (self, block = False): if not self.rpc_client.connected: return False, "Not connected to server" @@ -39,9 +36,9 @@ class BatchMessage(object): rc, resp_list = self.rpc_client.send_raw_msg(msg, block = False) if len(self.batch_list) == 1: - return CmdResponse(True, [CmdResponse(rc, resp_list)]) + return True, [(rc, resp_list)] else: - return CmdResponse(rc, resp_list) + return rc, resp_list # JSON RPC v2.0 client @@ -130,7 +127,7 @@ class JsonRpcClient(object): self.socket.send(msg, flags = zmq.NOBLOCK) except zmq.error.ZMQError as e: self.disconnect() - return CmdResponse(False, "Failed To Get Send Message") + return False, "Failed To Get Send Message" got_response = False @@ -148,7 +145,7 @@ class JsonRpcClient(object): if not got_response: self.disconnect() - return CmdResponse(False, "Failed To Get Server Response") + return False, "Failed To Get Server Response" self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n") @@ -162,19 +159,19 @@ class JsonRpcClient(object): for single_response in response_json: rc, msg = self.process_single_response(single_response) - rc_list.append( CmdResponse(rc, msg) ) + rc_list.append( (rc, msg) ) - return CmdResponse(True, rc_list) + return True, rc_list else: rc, msg = self.process_single_response(response_json) - return CmdResponse(rc, msg) + return rc, msg def process_single_response (self, response_json): if (response_json.get("jsonrpc") != "2.0"): - return False, "Malformed Response ({0})".format(str(response)) + return False, "Malfromed Response ({0})".format(str(response)) # error reported by server if ("error" in response_json): @@ -185,7 +182,7 @@ class JsonRpcClient(object): # if no error there should be a result if ("result" not in response_json): - return False, "Malformed Response ({0})".format(str(response)) + return False, "Malfromed Response ({0})".format(str(response)) return True, response_json["result"] @@ -194,7 +191,7 @@ class JsonRpcClient(object): def set_verbose(self, mode): self.verbose = mode - def disconnect(self): + def disconnect (self): if self.connected: self.socket.close(linger = 0) self.context.destroy(linger = 0) @@ -247,3 +244,246 @@ class JsonRpcClient(object): print "Shutting down RPC client\n" if hasattr(self, "context"): self.context.destroy(linger=0) + +# MOVE THIS TO DAN'S FILE +class TrexStatelessClient(JsonRpcClient): + + def __init__ (self, server, port, user): + + super(TrexStatelessClient, self).__init__(server, port) + + self.user = user + self.port_handlers = {} + + self.supported_cmds = [] + self.system_info = None + self.server_version = None + + + def whoami (self): + return self.user + + def ping_rpc_server(self): + + return self.invoke_rpc_method("ping", block = False) + + def get_rpc_server_version (self): + return self.server_version + + def get_system_info (self): + if not self.system_info: + return {} + + return self.system_info + + def get_supported_cmds(self): + if not self.supported_cmds: + return {} + + return self.supported_cmds + + def get_port_count (self): + if not self.system_info: + return 0 + + return self.system_info["port_count"] + + # sync the client with all the server required data + def sync (self): + + # get server version + rc, msg = self.invoke_rpc_method("get_version") + if not rc: + self.disconnect() + return rc, msg + + self.server_version = msg + + # get supported commands + rc, msg = self.invoke_rpc_method("get_supported_cmds") + if not rc: + self.disconnect() + return rc, msg + + self.supported_cmds = [str(x) for x in msg if x] + + # get system info + rc, msg = self.invoke_rpc_method("get_system_info") + if not rc: + self.disconnect() + return rc, msg + + self.system_info = msg + + return True, "" + + def connect (self): + rc, err = super(TrexStatelessClient, self).connect() + if not rc: + return rc, err + + return self.sync() + + + # take ownership over ports + def take_ownership (self, port_id_array, force = False): + if not self.connected: + return False, "Not connected to server" + + batch = self.create_batch() + + for port_id in port_id_array: + batch.add("acquire", params = {"port_id":port_id, "user":self.user, "force":force}) + + rc, resp_list = batch.invoke() + if not rc: + return rc, resp_list + + for i, rc in enumerate(resp_list): + if rc[0]: + self.port_handlers[port_id_array[i]] = rc[1] + + return True, resp_list + + + def release_ports (self, port_id_array): + batch = self.create_batch() + + for port_id in port_id_array: + + # let the server handle un-acquired errors + if self.port_handlers.get(port_id): + handler = self.port_handlers[port_id] + else: + handler = "" + + batch.add("release", params = {"port_id":port_id, "handler":handler}) + + + rc, resp_list = batch.invoke() + if not rc: + return rc, resp_list + + for i, rc in enumerate(resp_list): + if rc[0]: + self.port_handlers.pop(port_id_array[i]) + + return True, resp_list + + def get_owned_ports (self): + return self.port_handlers.keys() + + # fetch port stats + def get_port_stats (self, port_id_array): + if not self.connected: + return False, "Not connected to server" + + batch = self.create_batch() + + # empty list means all + if port_id_array == []: + port_id_array = list([x for x in xrange(0, self.system_info["port_count"])]) + + for port_id in port_id_array: + + # let the server handle un-acquired errors + if self.port_handlers.get(port_id): + handler = self.port_handlers[port_id] + else: + handler = "" + + batch.add("get_port_stats", params = {"port_id":port_id, "handler":handler}) + + + rc, resp_list = batch.invoke() + + return rc, resp_list + + # snapshot will take a snapshot of all your owned ports for streams and etc. + def snapshot(self): + + + if len(self.get_owned_ports()) == 0: + return {} + + snap = {} + + batch = self.create_batch() + + for port_id in self.get_owned_ports(): + + batch.add("get_port_stats", params = {"port_id": port_id, "handler": self.port_handlers[port_id]}) + batch.add("get_stream_list", params = {"port_id": port_id, "handler": self.port_handlers[port_id]}) + + rc, resp_list = batch.invoke() + if not rc: + return rc, resp_list + + # split the list to 2s + index = 0 + for port_id in self.get_owned_ports(): + if not resp_list[index] or not resp_list[index + 1]: + snap[port_id] = None + continue + + # fetch the first two + stats = resp_list[index][1] + stream_list = resp_list[index + 1][1] + + port = {} + port['status'] = stats['status'] + port['stream_list'] = [] + + # get all the streams + if len(stream_list) > 0: + batch = self.create_batch() + for stream_id in stream_list: + batch.add("get_stream", params = {"port_id": port_id, "stream_id": stream_id, "handler": self.port_handlers[port_id]}) + + rc, stream_resp_list = batch.invoke() + if not rc: + port = {} + + port['streams'] = {} + for i, resp in enumerate(stream_resp_list): + if resp[0]: + port['streams'][stream_list[i]] = resp[1] + + snap[port_id] = port + + # move to next one + index += 2 + + + return snap + + # add stream + def add_stream (self, port_id, stream_id, isg, next_stream_id, packet): + if not port_id in self.get_owned_ports(): + return False, "Port {0} is not owned... please take ownership before adding streams".format(port_id) + + handler = self.port_handlers[port_id] + + stream = {} + stream['enabled'] = True + stream['self_start'] = True + stream['isg'] = isg + stream['next_stream_id'] = next_stream_id + stream['packet'] = {} + stream['packet']['binary'] = packet + stream['packet']['meta'] = "" + stream['vm'] = [] + stream['rx_stats'] = {} + stream['rx_stats']['enabled'] = False + + stream['mode'] = {} + stream['mode']['type'] = 'continuous' + stream['mode']['pps'] = 10.0 + + params = {} + params['handler'] = handler + params['stream'] = stream + params['port_id'] = port_id + params['stream_id'] = stream_id + + return self.invoke_rpc_method('add_stream', params = params) |