From 7d3be8c612e295820649779335288c197b80ccb2 Mon Sep 17 00:00:00 2001 From: Dan Klein Date: Mon, 24 Aug 2015 17:28:17 +0300 Subject: Changes location of console and fixed dependencies --- .../client_utils/general_utils.py | 27 ++- .../client_utils/jsonrpc_client.py | 186 +++++++++++++++ .../client_utils/outer_packages.py | 30 +++ .../trex_control_plane/console/trex_console.py | 252 +++++++++++++++++++++ .../trex_control_plane/console/trex_root_path.py | 15 ++ .../trex_control_plane/console/trex_status.py | 212 +++++++++++++++++ .../trex_control_plane/server/outer_packages.py | 1 - .../server/zmq_monitor_thread.py | 20 +- 8 files changed, 729 insertions(+), 14 deletions(-) create mode 100644 scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py create mode 100644 scripts/automation/trex_control_plane/client_utils/outer_packages.py create mode 100644 scripts/automation/trex_control_plane/console/trex_console.py create mode 100644 scripts/automation/trex_control_plane/console/trex_root_path.py create mode 100644 scripts/automation/trex_control_plane/console/trex_status.py (limited to 'scripts/automation/trex_control_plane') diff --git a/scripts/automation/trex_control_plane/client_utils/general_utils.py b/scripts/automation/trex_control_plane/client_utils/general_utils.py index 5544eabc..b5912628 100755 --- a/scripts/automation/trex_control_plane/client_utils/general_utils.py +++ b/scripts/automation/trex_control_plane/client_utils/general_utils.py @@ -1,6 +1,9 @@ #!/router/bin/python -import sys,site +import sys +import site +import string +import random import os try: @@ -50,7 +53,27 @@ def find_path_to_pardir (pardir, base_path = os.getcwd() ): """ components = base_path.split(os.sep) return str.join(os.sep, components[:components.index(pardir)+1]) - + + +def random_id_gen(length=8): + """ + A generator for creating a random chars id of specific length + + :parameters: + length : int + the desired length of the generated id + + default: 8 + + :return: + a random id with each next() request. + """ + id_chars = string.ascii_lowercase + string.digits + while True: + return_id = '' + for i in range(length): + return_id += random.choice(id_chars) + yield return_id if __name__ == "__main__": diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py new file mode 100644 index 00000000..1631c494 --- /dev/null +++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py @@ -0,0 +1,186 @@ +#!/router/bin/python + +import outer_packages +import zmq +import json +import general_utils +from time import sleep + +class JsonRpcClient(object): + + def __init__ (self, default_server, default_port): + self.verbose = False + self.connected = False + + # default values + self.port = default_port + self.server = default_server + self.id_gen = general_utils.random_id_gen() + + def get_connection_details (self): + rc = {} + rc['server'] = self.server + rc['port'] = self.port + + return rc + + def pretty_json (self, json_str): + return json.dumps(json.loads(json_str), indent = 4, separators=(',', ': '), sort_keys = True) + + def verbose_msg (self, msg): + if not self.verbose: + return + + print "[verbose] " + msg + + + def create_jsonrpc_v2 (self, method_name, params = {}, id = None): + msg = {} + msg["jsonrpc"] = "2.0" + msg["method"] = method_name + + msg["params"] = params + + msg["id"] = id + + return json.dumps(msg) + + def invoke_rpc_method (self, method_name, params = {}, block = False): + rc, msg = self._invoke_rpc_method(method_name, params, block) + if not rc: + self.disconnect() + + return rc, msg + + def _invoke_rpc_method (self, method_name, params = {}, block = False): + if not self.connected: + return False, "Not connected to server" + + id = self.id_gen.next() + msg = self.create_jsonrpc_v2(method_name, params, id = id) + + self.verbose_msg("Sending Request To Server:\n\n" + self.pretty_json(msg) + "\n") + + if block: + self.socket.send(msg) + else: + try: + self.socket.send(msg, flags = zmq.NOBLOCK) + except zmq.error.ZMQError as e: + return False, "Failed To Get Send Message" + + got_response = False + + if block: + response = self.socket.recv() + got_response = True + else: + for i in xrange(0 ,10): + try: + response = self.socket.recv(flags = zmq.NOBLOCK) + got_response = True + break + except zmq.Again: + sleep(0.2) + + if not got_response: + return False, "Failed To Get Server Response" + + self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n") + + # decode + response_json = json.loads(response) + + if (response_json.get("jsonrpc") != "2.0"): + return False, "Malfromed Response ({0})".format(str(response)) + + if (response_json.get("id") != id): + return False, "Server Replied With Bad ID ({0})".format(str(response)) + + # error reported by server + if ("error" in response_json): + return True, response_json["error"]["message"] + + # if no error there should be a result + if ("result" not in response_json): + return False, "Malfromed Response ({0})".format(str(response)) + + return True, response_json["result"] + + + def ping_rpc_server(self): + + return self.invoke_rpc_method("ping", block = False) + + def get_rpc_server_status (self): + return self.invoke_rpc_method("get_status") + + def query_rpc_server(self): + return self.invoke_rpc_method("get_reg_cmds") + + + def set_verbose(self, mode): + self.verbose = mode + + def disconnect (self): + if self.connected: + self.socket.close(linger = 0) + self.context.destroy(linger = 0) + self.connected = False + return True, "" + else: + return False, "Not connected to server" + + def connect(self, server = None, port = None): + if self.connected: + self.disconnect() + + self.context = zmq.Context() + + self.server = (server if server else self.server) + self.port = (port if port else self.port) + + # Socket to talk to server + self.transport = "tcp://{0}:{1}".format(self.server, self.port) + + print "\nConnecting To RPC Server On {0}".format(self.transport) + + self.socket = self.context.socket(zmq.REQ) + try: + self.socket.connect(self.transport) + except zmq.error.ZMQError as e: + return False, "ZMQ Error: Bad server or port name: " + str(e) + + + self.connected = True + + # ping the server + rc, err = self.ping_rpc_server() + if not rc: + self.disconnect() + return rc, err + + return True, "" + + + def reconnect(self): + # connect using current values + return self.connect() + + if not self.connected: + return False, "Not connected to server" + + # reconnect + return self.connect(self.server, self.port) + + + def is_connected(self): + return self.connected + + + def __del__(self): + print "Shutting down RPC client\n" + self.context.destroy(linger=0) + +if __name__ == "__main__": + pass diff --git a/scripts/automation/trex_control_plane/client_utils/outer_packages.py b/scripts/automation/trex_control_plane/client_utils/outer_packages.py new file mode 100644 index 00000000..53cce991 --- /dev/null +++ b/scripts/automation/trex_control_plane/client_utils/outer_packages.py @@ -0,0 +1,30 @@ +#!/router/bin/python + +import sys +import site +import os + +CURRENT_PATH = os.path.dirname(os.path.realpath(__file__)) +ROOT_PATH = os.path.abspath(os.path.join(CURRENT_PATH, os.pardir)) # path to trex_control_plane directory +PATH_TO_PYTHON_LIB = os.path.abspath(os.path.join(ROOT_PATH, os.pardir, os.pardir, + os.pardir, 'external_libs', 'python')) + +CLIENT_UTILS_MODULES = ['zmq'] + + +def import_client_utils_modules(): + # must be in a higher priority + sys.path.insert(0, PATH_TO_PYTHON_LIB) + sys.path.append(ROOT_PATH) + import_module_list(CLIENT_UTILS_MODULES) + + +def import_module_list(modules_list): + assert(isinstance(modules_list, list)) + for p in modules_list: + full_path = os.path.join(PATH_TO_PYTHON_LIB, p) + fix_path = os.path.normcase(full_path) + site.addsitedir(full_path) + +import_client_utils_modules() + diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py new file mode 100644 index 00000000..6514a51c --- /dev/null +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -0,0 +1,252 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import cmd +import json +import ast +import argparse +import sys +import trex_root_path +from client_utils.jsonrpc_client import JsonRpcClient +import trex_status + +class TrexConsole(cmd.Cmd): + """Trex Console""" + + def __init__(self, rpc_client): + cmd.Cmd.__init__(self) + + self.rpc_client = rpc_client + + self.do_connect("") + + self.intro = "\n-=TRex Console V1.0=-\n" + self.intro += "\nType 'help' or '?' for supported actions\n" + + self.verbose = False + + self.postcmd(False, "") + + + # a cool hack - i stole this function and added space + def completenames(self, text, *ignored): + dotext = 'do_'+text + return [a[3:]+' ' for a in self.get_names() if a.startswith(dotext)] + + # set verbose on / off + def do_verbose (self, line): + '''shows or set verbose mode\n''' + if line == "": + print "\nverbose is " + ("on\n" if self.verbose else "off\n") + + elif line == "on": + self.verbose = True + self.rpc_client.set_verbose(True) + print "\nverbose set to on\n" + + elif line == "off": + self.verbose = False + self.rpc_client.set_verbose(False) + print "\nverbose set to off\n" + + else: + print "\nplease specify 'on' or 'off'\n" + + # query the server for registered commands + def do_query_server(self, line): + '''query the RPC server for supported remote commands\n''' + + rc, msg = self.rpc_client.query_rpc_server() + if not rc: + print "\n*** " + msg + "\n" + return + + print "\nRPC server supports the following commands: \n\n" + for func in msg: + if func: + print func + print "\n" + + def do_ping (self, line): + '''Pings the RPC server\n''' + + print "\n-> Pinging RPC server" + + rc, msg = self.rpc_client.ping_rpc_server() + if rc: + print "[SUCCESS]\n" + else: + print "\n*** " + msg + "\n" + return + + def do_connect (self, line): + '''Connects to the server\n''' + + if line == "": + rc, msg = self.rpc_client.connect() + else: + sp = line.split() + if (len(sp) != 2): + print "\n[usage] connect [server] [port] or without parameters\n" + return + + rc, msg = self.rpc_client.connect(sp[0], sp[1]) + + if rc: + print "[SUCCESS]\n" + else: + print "\n*** " + msg + "\n" + return + + rc, msg = self.rpc_client.query_rpc_server() + + if rc: + self.supported_rpc = [str(x) for x in msg if x] + + def do_rpc (self, line): + '''Launches a RPC on the server\n''' + + if line == "": + print "\nUsage: [method name] [param dict as string]\n" + print "Example: rpc test_add {'x': 12, 'y': 17}\n" + return + + sp = line.split(' ', 1) + method = sp[0] + + params = None + bad_parse = False + if len(sp) > 1: + + try: + params = ast.literal_eval(sp[1]) + if not isinstance(params, dict): + bad_parse = True + + except ValueError as e1: + bad_parse = True + except SyntaxError as e2: + bad_parse = True + + if bad_parse: + print "\nValue should be a valid dict: '{0}'".format(sp[1]) + print "\nUsage: [method name] [param dict as string]\n" + print "Example: rpc test_add {'x': 12, 'y': 17}\n" + return + + rc, msg = self.rpc_client.invoke_rpc_method(method, params) + if rc: + print "\nServer Response:\n\n" + json.dumps(msg) + "\n" + else: + print "\n*** " + msg + "\n" + #print "Please try 'reconnect' to reconnect to server" + + + def complete_rpc (self, text, line, begidx, endidx): + return [x for x in self.supported_rpc if x.startswith(text)] + + def do_status (self, line): + '''Shows a graphical console\n''' + + self.do_verbose('off') + trex_status.show_trex_status(self.rpc_client) + + def do_quit(self, line): + '''exit the client\n''' + return True + + def do_disconnect (self, line): + '''Disconnect from the server\n''' + if not self.rpc_client.is_connected(): + print "Not connected to server\n" + return + + rc, msg = self.rpc_client.disconnect() + if rc: + print "[SUCCESS]\n" + else: + print msg + "\n" + + def postcmd(self, stop, line): + if self.rpc_client.is_connected(): + self.prompt = "TRex > " + else: + self.supported_rpc = None + self.prompt = "TRex (offline) > " + + return stop + + def default(self, line): + print "'{0}' is an unrecognized command. type 'help' or '?' for a list\n".format(line) + + def do_help (self, line): + '''Shows This Help Screen\n''' + if line: + try: + func = getattr(self, 'help_' + line) + except AttributeError: + try: + doc = getattr(self, 'do_' + line).__doc__ + if doc: + self.stdout.write("%s\n"%str(doc)) + return + except AttributeError: + pass + self.stdout.write("%s\n"%str(self.nohelp % (line,))) + return + func() + return + + print "\nSupported Console Commands:" + print "----------------------------\n" + + cmds = [x[3:] for x in self.get_names() if x.startswith("do_")] + for cmd in cmds: + if cmd == "EOF": + continue + + try: + doc = getattr(self, 'do_' + cmd).__doc__ + if doc: + help = str(doc) + else: + help = "*** Undocumented Function ***\n" + except AttributeError: + help = "*** Undocumented Function ***\n" + + print "{:<30} {:<30}".format(cmd + " - ", help) + + + # aliasing + do_exit = do_EOF = do_q = do_quit + +def setParserOptions (): + parser = argparse.ArgumentParser(prog="trex_console.py") + + parser.add_argument("-s", "--server", help = "T-Rex Server [default is localhost]", + default = "localhost", + type = str) + + parser.add_argument("-p", "--port", help = "T-Rex Server Port [default is 5050]\n", + default = 5050, + type = int) + + return parser + +def main (): + parser = setParserOptions() + options = parser.parse_args(sys.argv[1:]) + + # RPC client + rpc_client = JsonRpcClient(options.server, options.port) + + # console + try: + console = TrexConsole(rpc_client) + console.cmdloop() + except KeyboardInterrupt as e: + print "\n\n*** Caught Ctrl + C... Exiting...\n\n" + return + +if __name__ == '__main__': + main() + diff --git a/scripts/automation/trex_control_plane/console/trex_root_path.py b/scripts/automation/trex_control_plane/console/trex_root_path.py new file mode 100644 index 00000000..de4ec03b --- /dev/null +++ b/scripts/automation/trex_control_plane/console/trex_root_path.py @@ -0,0 +1,15 @@ +#!/router/bin/python + +import os +import sys + +def add_root_to_path (): + """adds trex_control_plane root dir to script path, up to `depth` parent dirs""" + root_dirname = 'trex_control_plane' + file_path = os.path.dirname(os.path.realpath(__file__)) + + components = file_path.split(os.sep) + sys.path.append( str.join(os.sep, components[:components.index(root_dirname)+1]) ) + return + +add_root_to_path() diff --git a/scripts/automation/trex_control_plane/console/trex_status.py b/scripts/automation/trex_control_plane/console/trex_status.py new file mode 100644 index 00000000..8ee669b5 --- /dev/null +++ b/scripts/automation/trex_control_plane/console/trex_status.py @@ -0,0 +1,212 @@ +from time import sleep + +import os + +import curses +from curses import panel +import random +import collections +import operator +import datetime + +g_curses_active = False + +# +def percentage (a, total): + x = int ((float(a) / total) * 100) + return str(x) + "%" + +# panel object +class TrexStatusPanel(): + def __init__ (self, h, l, y, x, headline): + self.h = h + self.l = l + self.y = y + self.x = x + self.headline = headline + + self.win = curses.newwin(h, l, y, x) + self.win.erase() + self.win.box() + + self.win.addstr(1, 2, headline, curses.A_UNDERLINE) + self.win.refresh() + + panel.new_panel(self.win) + self.panel = panel.new_panel(self.win) + self.panel.top() + + def clear (self): + self.win.erase() + self.win.box() + self.win.addstr(1, 2, self.headline, curses.A_UNDERLINE) + + def getwin (self): + return self.win + +def float_to_human_readable (size, suffix = "bps"): + for unit in ['','K','M','G']: + if abs(size) < 1024.0: + return "%3.1f %s%s" % (size, unit, suffix) + size /= 1024.0 + return "NaN" + +# status object +class TrexStatus(): + def __init__ (self, stdscr, rpc_client): + self.stdscr = stdscr + self.log = [] + self.rpc_client = rpc_client + + self.get_server_info() + + def get_server_info (self): + rc, msg = self.rpc_client.get_rpc_server_status() + + if rc: + self.server_status = msg + else: + self.server_status = None + + def add_log_event (self, msg): + self.log.append("[{0}] {1}".format(str(datetime.datetime.now().time()), msg)) + + def add_panel (self, h, l, y, x, headline): + win = curses.newwin(h, l, y, x) + win.erase() + win.box() + + win.addstr(1, 2, headline) + win.refresh() + + panel.new_panel(win) + panel1 = panel.new_panel(win) + panel1.top() + + return win, panel1 + + # static info panel + def update_info (self): + if self.server_status == None: + return + + self.info_panel.clear() + + connection_details = self.rpc_client.get_connection_details() + + self.info_panel.getwin().addstr(3, 2, "{:<30} {:30}".format("Server:", connection_details['server'] + ":" + str(connection_details['port']))) + self.info_panel.getwin().addstr(4, 2, "{:<30} {:30}".format("Version:", self.server_status["general"]["version"])) + self.info_panel.getwin().addstr(5, 2, "{:<30} {:30}".format("Build:", + self.server_status["general"]["build_date"] + " @ " + self.server_status["general"]["build_time"] + " by " + self.server_status["general"]["version_user"])) + + self.info_panel.getwin().addstr(6, 2, "{:<30} {:30}".format("Server Uptime:", self.server_status["general"]["uptime"])) + + # general stats + def update_general (self, gen_stats): + pass + + # control panel + def update_control (self): + self.control_panel.clear() + + self.control_panel.getwin().addstr(1, 2, "'f' - freeze, 'c' - clear stats, 'p' - ping server, 'q' - quit") + + index = 3 + + cut = len(self.log) - 4 + if cut < 0: + cut = 0 + + for l in self.log[cut:]: + self.control_panel.getwin().addstr(index, 2, l) + index += 1 + + def generate_layout (self): + self.max_y = self.stdscr.getmaxyx()[0] + self.max_x = self.stdscr.getmaxyx()[1] + + # create cls panel + self.main_panel = TrexStatusPanel(int(self.max_y * 0.8), self.max_x / 2, 0,0, "Trex Activity:") + + self.general_panel = TrexStatusPanel(int(self.max_y * 0.6), self.max_x / 2, 0, self.max_x /2, "General Statistics:") + + self.info_panel = TrexStatusPanel(int(self.max_y * 0.2), self.max_x / 2, int(self.max_y * 0.6), self.max_x /2, "Server Info:") + + self.control_panel = TrexStatusPanel(int(self.max_y * 0.2), self.max_x , int(self.max_y * 0.8), 0, "") + + panel.update_panels(); self.stdscr.refresh() + + def wait_for_key_input (self): + ch = self.stdscr.getch() + + if (ch != curses.ERR): + # stop/start status + if (ch == ord('f')): + self.update_active = not self.update_active + self.add_log_event("Update continued" if self.update_active else "Update stopped") + + elif (ch == ord('p')): + self.add_log_event("Pinging RPC server") + rc, msg = self.rpc_client.ping_rpc_server() + if rc: + self.add_log_event("Server replied: '{0}'".format(msg)) + else: + self.add_log_event("Failed to get reply") + + # c - clear stats + elif (ch == ord('c')): + self.add_log_event("Statistics cleared") + + elif (ch == ord('q')): + return False + else: + self.add_log_event("Unknown key pressed {0}".format("'" + chr(ch) + "'" if chr(ch).isalpha() else "")) + + return True + + # main run entry point + def run (self): + try: + curses.curs_set(0) + except: + pass + + curses.use_default_colors() + self.stdscr.nodelay(1) + curses.nonl() + curses.noecho() + + self.generate_layout() + + self.update_active = True + while (True): + + rc = self.wait_for_key_input() + if not rc: + break + + self.update_control() + self.update_info() + + panel.update_panels(); + self.stdscr.refresh() + sleep(0.1) + + +def show_trex_status_internal (stdscr, rpc_client): + trex_status = TrexStatus(stdscr, rpc_client) + trex_status.run() + +def show_trex_status (rpc_client): + + try: + curses.wrapper(show_trex_status_internal, rpc_client) + except KeyboardInterrupt: + curses.endwin() + +def cleanup (): + try: + curses.endwin() + except: + pass + diff --git a/scripts/automation/trex_control_plane/server/outer_packages.py b/scripts/automation/trex_control_plane/server/outer_packages.py index 730934bf..f07ed59a 100755 --- a/scripts/automation/trex_control_plane/server/outer_packages.py +++ b/scripts/automation/trex_control_plane/server/outer_packages.py @@ -12,7 +12,6 @@ PATH_TO_PYTHON_LIB = os.path.abspath(os.path.join(ROOT_PATH, os.pardir, os.pard SERVER_MODULES = ['enum34-1.0.4', 'jsonrpclib-pelix-0.2.5', 'zmq', - 'pyzmq-14.7.0', 'python-daemon-2.0.5', 'lockfile-0.10.2', 'termstyle' diff --git a/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py b/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py index 28e154ee..7a278af8 100755 --- a/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py +++ b/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py @@ -13,25 +13,23 @@ from common.trex_status_e import TRexStatus CCustomLogger.setup_custom_logger('TRexServer') logger = logging.getLogger('TRexServer') + class ZmqMonitorSession(threading.Thread): def __init__(self, trexObj , zmq_port): super(ZmqMonitorSession, self).__init__() self.stoprequest = threading.Event() -# self.terminateFlag = False self.first_dump = True self.zmq_port = zmq_port - self.zmq_publisher = "tcp://localhost:{port}".format( port = self.zmq_port ) -# self.context = zmq.Context() -# self.socket = self.context.socket(zmq.SUB) + self.zmq_publisher = "tcp://localhost:{port}".format(port=self.zmq_port) self.trexObj = trexObj self.expect_trex = self.trexObj.expect_trex # used to signal if T-Rex is expected to run and if data should be considered self.decoder = JSONDecoder() logger.info("ZMQ monitor initialization finished") - def run (self): + def run(self): self.context = zmq.Context() self.socket = self.context.socket(zmq.SUB) - logger.info("ZMQ monitor started listening @ {pub}".format( pub = self.zmq_publisher ) ) + logger.info("ZMQ monitor started listening @ {pub}".format(pub=self.zmq_publisher)) self.socket.connect(self.zmq_publisher) self.socket.setsockopt(zmq.SUBSCRIBE, '') @@ -46,10 +44,10 @@ class ZmqMonitorSession(threading.Thread): # allow this exception since it comes from ZMQ monitor termination pass else: - logger.error("ZMQ monitor thrown an exception. Received exception: {ex}".format(ex = e)) + logger.error("ZMQ monitor thrown an exception. Received exception: {ex}".format(ex=e)) raise - def join (self, timeout = None): + def join(self, timeout=None): self.stoprequest.set() logger.debug("Handling termination of ZMQ monitor thread") self.socket.close() @@ -57,15 +55,15 @@ class ZmqMonitorSession(threading.Thread): logger.info("ZMQ monitor resources has been freed.") super(ZmqMonitorSession, self).join(timeout) - def parse_and_update_zmq_dump (self, zmq_dump): + def parse_and_update_zmq_dump(self, zmq_dump): try: dict_obj = self.decoder.decode(zmq_dump) except ValueError: - logger.error("ZMQ dump failed JSON-RPC decode. Ignoring. Bad dump was: {dump}".format(dump = zmq_dump)) + logger.error("ZMQ dump failed JSON-RPC decode. Ignoring. Bad dump was: {dump}".format(dump=zmq_dump)) dict_obj = None # add to trex_obj zmq latest dump, based on its 'name' header - if dict_obj is not None and dict_obj!={}: + if dict_obj is not None and dict_obj != {}: self.trexObj.zmq_dump[dict_obj['name']] = dict_obj if self.first_dump: # change TRexStatus from starting to Running once the first ZMQ dump is obtained and parsed successfully -- cgit 1.2.3-korg