summaryrefslogtreecommitdiffstats
path: root/scripts
diff options
context:
space:
mode:
authorDan Klein <danklei@cisco.com>2015-08-24 17:28:17 +0300
committerDan Klein <danklei@cisco.com>2015-08-24 17:28:17 +0300
commit7d3be8c612e295820649779335288c197b80ccb2 (patch)
tree78e9636bc8780dedc919c30378a621f425e1cbfc /scripts
parentdab741a80699f86e86c91718872a052cca9bbb25 (diff)
Changes location of console and fixed dependencies
Diffstat (limited to 'scripts')
-rwxr-xr-xscripts/automation/trex_control_plane/client_utils/general_utils.py27
-rw-r--r--scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py186
-rw-r--r--scripts/automation/trex_control_plane/client_utils/outer_packages.py30
-rw-r--r--scripts/automation/trex_control_plane/console/trex_console.py252
-rw-r--r--scripts/automation/trex_control_plane/console/trex_root_path.py15
-rw-r--r--scripts/automation/trex_control_plane/console/trex_status.py212
-rwxr-xr-xscripts/automation/trex_control_plane/server/outer_packages.py1
-rwxr-xr-xscripts/automation/trex_control_plane/server/zmq_monitor_thread.py20
8 files changed, 729 insertions, 14 deletions
diff --git a/scripts/automation/trex_control_plane/client_utils/general_utils.py b/scripts/automation/trex_control_plane/client_utils/general_utils.py
index 5544eabc..b5912628 100755
--- a/scripts/automation/trex_control_plane/client_utils/general_utils.py
+++ b/scripts/automation/trex_control_plane/client_utils/general_utils.py
@@ -1,6 +1,9 @@
#!/router/bin/python
-import sys,site
+import sys
+import site
+import string
+import random
import os
try:
@@ -50,7 +53,27 @@ def find_path_to_pardir (pardir, base_path = os.getcwd() ):
"""
components = base_path.split(os.sep)
return str.join(os.sep, components[:components.index(pardir)+1])
-
+
+
+def random_id_gen(length=8):
+ """
+ A generator for creating a random chars id of specific length
+
+ :parameters:
+ length : int
+ the desired length of the generated id
+
+ default: 8
+
+ :return:
+ a random id with each next() request.
+ """
+ id_chars = string.ascii_lowercase + string.digits
+ while True:
+ return_id = ''
+ for i in range(length):
+ return_id += random.choice(id_chars)
+ yield return_id
if __name__ == "__main__":
diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
new file mode 100644
index 00000000..1631c494
--- /dev/null
+++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
@@ -0,0 +1,186 @@
+#!/router/bin/python
+
+import outer_packages
+import zmq
+import json
+import general_utils
+from time import sleep
+
+class JsonRpcClient(object):
+
+ def __init__ (self, default_server, default_port):
+ self.verbose = False
+ self.connected = False
+
+ # default values
+ self.port = default_port
+ self.server = default_server
+ self.id_gen = general_utils.random_id_gen()
+
+ def get_connection_details (self):
+ rc = {}
+ rc['server'] = self.server
+ rc['port'] = self.port
+
+ return rc
+
+ def pretty_json (self, json_str):
+ return json.dumps(json.loads(json_str), indent = 4, separators=(',', ': '), sort_keys = True)
+
+ def verbose_msg (self, msg):
+ if not self.verbose:
+ return
+
+ print "[verbose] " + msg
+
+
+ def create_jsonrpc_v2 (self, method_name, params = {}, id = None):
+ msg = {}
+ msg["jsonrpc"] = "2.0"
+ msg["method"] = method_name
+
+ msg["params"] = params
+
+ msg["id"] = id
+
+ return json.dumps(msg)
+
+ def invoke_rpc_method (self, method_name, params = {}, block = False):
+ rc, msg = self._invoke_rpc_method(method_name, params, block)
+ if not rc:
+ self.disconnect()
+
+ return rc, msg
+
+ def _invoke_rpc_method (self, method_name, params = {}, block = False):
+ if not self.connected:
+ return False, "Not connected to server"
+
+ id = self.id_gen.next()
+ msg = self.create_jsonrpc_v2(method_name, params, id = id)
+
+ self.verbose_msg("Sending Request To Server:\n\n" + self.pretty_json(msg) + "\n")
+
+ if block:
+ self.socket.send(msg)
+ else:
+ try:
+ self.socket.send(msg, flags = zmq.NOBLOCK)
+ except zmq.error.ZMQError as e:
+ return False, "Failed To Get Send Message"
+
+ got_response = False
+
+ if block:
+ response = self.socket.recv()
+ got_response = True
+ else:
+ for i in xrange(0 ,10):
+ try:
+ response = self.socket.recv(flags = zmq.NOBLOCK)
+ got_response = True
+ break
+ except zmq.Again:
+ sleep(0.2)
+
+ if not got_response:
+ return False, "Failed To Get Server Response"
+
+ self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n")
+
+ # decode
+ response_json = json.loads(response)
+
+ if (response_json.get("jsonrpc") != "2.0"):
+ return False, "Malfromed Response ({0})".format(str(response))
+
+ if (response_json.get("id") != id):
+ return False, "Server Replied With Bad ID ({0})".format(str(response))
+
+ # error reported by server
+ if ("error" in response_json):
+ return True, response_json["error"]["message"]
+
+ # if no error there should be a result
+ if ("result" not in response_json):
+ return False, "Malfromed Response ({0})".format(str(response))
+
+ return True, response_json["result"]
+
+
+ def ping_rpc_server(self):
+
+ return self.invoke_rpc_method("ping", block = False)
+
+ def get_rpc_server_status (self):
+ return self.invoke_rpc_method("get_status")
+
+ def query_rpc_server(self):
+ return self.invoke_rpc_method("get_reg_cmds")
+
+
+ def set_verbose(self, mode):
+ self.verbose = mode
+
+ def disconnect (self):
+ if self.connected:
+ self.socket.close(linger = 0)
+ self.context.destroy(linger = 0)
+ self.connected = False
+ return True, ""
+ else:
+ return False, "Not connected to server"
+
+ def connect(self, server = None, port = None):
+ if self.connected:
+ self.disconnect()
+
+ self.context = zmq.Context()
+
+ self.server = (server if server else self.server)
+ self.port = (port if port else self.port)
+
+ # Socket to talk to server
+ self.transport = "tcp://{0}:{1}".format(self.server, self.port)
+
+ print "\nConnecting To RPC Server On {0}".format(self.transport)
+
+ self.socket = self.context.socket(zmq.REQ)
+ try:
+ self.socket.connect(self.transport)
+ except zmq.error.ZMQError as e:
+ return False, "ZMQ Error: Bad server or port name: " + str(e)
+
+
+ self.connected = True
+
+ # ping the server
+ rc, err = self.ping_rpc_server()
+ if not rc:
+ self.disconnect()
+ return rc, err
+
+ return True, ""
+
+
+ def reconnect(self):
+ # connect using current values
+ return self.connect()
+
+ if not self.connected:
+ return False, "Not connected to server"
+
+ # reconnect
+ return self.connect(self.server, self.port)
+
+
+ def is_connected(self):
+ return self.connected
+
+
+ def __del__(self):
+ print "Shutting down RPC client\n"
+ self.context.destroy(linger=0)
+
+if __name__ == "__main__":
+ pass
diff --git a/scripts/automation/trex_control_plane/client_utils/outer_packages.py b/scripts/automation/trex_control_plane/client_utils/outer_packages.py
new file mode 100644
index 00000000..53cce991
--- /dev/null
+++ b/scripts/automation/trex_control_plane/client_utils/outer_packages.py
@@ -0,0 +1,30 @@
+#!/router/bin/python
+
+import sys
+import site
+import os
+
+CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
+ROOT_PATH = os.path.abspath(os.path.join(CURRENT_PATH, os.pardir)) # path to trex_control_plane directory
+PATH_TO_PYTHON_LIB = os.path.abspath(os.path.join(ROOT_PATH, os.pardir, os.pardir,
+ os.pardir, 'external_libs', 'python'))
+
+CLIENT_UTILS_MODULES = ['zmq']
+
+
+def import_client_utils_modules():
+ # must be in a higher priority
+ sys.path.insert(0, PATH_TO_PYTHON_LIB)
+ sys.path.append(ROOT_PATH)
+ import_module_list(CLIENT_UTILS_MODULES)
+
+
+def import_module_list(modules_list):
+ assert(isinstance(modules_list, list))
+ for p in modules_list:
+ full_path = os.path.join(PATH_TO_PYTHON_LIB, p)
+ fix_path = os.path.normcase(full_path)
+ site.addsitedir(full_path)
+
+import_client_utils_modules()
+
diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py
new file mode 100644
index 00000000..6514a51c
--- /dev/null
+++ b/scripts/automation/trex_control_plane/console/trex_console.py
@@ -0,0 +1,252 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import cmd
+import json
+import ast
+import argparse
+import sys
+import trex_root_path
+from client_utils.jsonrpc_client import JsonRpcClient
+import trex_status
+
+class TrexConsole(cmd.Cmd):
+ """Trex Console"""
+
+ def __init__(self, rpc_client):
+ cmd.Cmd.__init__(self)
+
+ self.rpc_client = rpc_client
+
+ self.do_connect("")
+
+ self.intro = "\n-=TRex Console V1.0=-\n"
+ self.intro += "\nType 'help' or '?' for supported actions\n"
+
+ self.verbose = False
+
+ self.postcmd(False, "")
+
+
+ # a cool hack - i stole this function and added space
+ def completenames(self, text, *ignored):
+ dotext = 'do_'+text
+ return [a[3:]+' ' for a in self.get_names() if a.startswith(dotext)]
+
+ # set verbose on / off
+ def do_verbose (self, line):
+ '''shows or set verbose mode\n'''
+ if line == "":
+ print "\nverbose is " + ("on\n" if self.verbose else "off\n")
+
+ elif line == "on":
+ self.verbose = True
+ self.rpc_client.set_verbose(True)
+ print "\nverbose set to on\n"
+
+ elif line == "off":
+ self.verbose = False
+ self.rpc_client.set_verbose(False)
+ print "\nverbose set to off\n"
+
+ else:
+ print "\nplease specify 'on' or 'off'\n"
+
+ # query the server for registered commands
+ def do_query_server(self, line):
+ '''query the RPC server for supported remote commands\n'''
+
+ rc, msg = self.rpc_client.query_rpc_server()
+ if not rc:
+ print "\n*** " + msg + "\n"
+ return
+
+ print "\nRPC server supports the following commands: \n\n"
+ for func in msg:
+ if func:
+ print func
+ print "\n"
+
+ def do_ping (self, line):
+ '''Pings the RPC server\n'''
+
+ print "\n-> Pinging RPC server"
+
+ rc, msg = self.rpc_client.ping_rpc_server()
+ if rc:
+ print "[SUCCESS]\n"
+ else:
+ print "\n*** " + msg + "\n"
+ return
+
+ def do_connect (self, line):
+ '''Connects to the server\n'''
+
+ if line == "":
+ rc, msg = self.rpc_client.connect()
+ else:
+ sp = line.split()
+ if (len(sp) != 2):
+ print "\n[usage] connect [server] [port] or without parameters\n"
+ return
+
+ rc, msg = self.rpc_client.connect(sp[0], sp[1])
+
+ if rc:
+ print "[SUCCESS]\n"
+ else:
+ print "\n*** " + msg + "\n"
+ return
+
+ rc, msg = self.rpc_client.query_rpc_server()
+
+ if rc:
+ self.supported_rpc = [str(x) for x in msg if x]
+
+ def do_rpc (self, line):
+ '''Launches a RPC on the server\n'''
+
+ if line == "":
+ print "\nUsage: [method name] [param dict as string]\n"
+ print "Example: rpc test_add {'x': 12, 'y': 17}\n"
+ return
+
+ sp = line.split(' ', 1)
+ method = sp[0]
+
+ params = None
+ bad_parse = False
+ if len(sp) > 1:
+
+ try:
+ params = ast.literal_eval(sp[1])
+ if not isinstance(params, dict):
+ bad_parse = True
+
+ except ValueError as e1:
+ bad_parse = True
+ except SyntaxError as e2:
+ bad_parse = True
+
+ if bad_parse:
+ print "\nValue should be a valid dict: '{0}'".format(sp[1])
+ print "\nUsage: [method name] [param dict as string]\n"
+ print "Example: rpc test_add {'x': 12, 'y': 17}\n"
+ return
+
+ rc, msg = self.rpc_client.invoke_rpc_method(method, params)
+ if rc:
+ print "\nServer Response:\n\n" + json.dumps(msg) + "\n"
+ else:
+ print "\n*** " + msg + "\n"
+ #print "Please try 'reconnect' to reconnect to server"
+
+
+ def complete_rpc (self, text, line, begidx, endidx):
+ return [x for x in self.supported_rpc if x.startswith(text)]
+
+ def do_status (self, line):
+ '''Shows a graphical console\n'''
+
+ self.do_verbose('off')
+ trex_status.show_trex_status(self.rpc_client)
+
+ def do_quit(self, line):
+ '''exit the client\n'''
+ return True
+
+ def do_disconnect (self, line):
+ '''Disconnect from the server\n'''
+ if not self.rpc_client.is_connected():
+ print "Not connected to server\n"
+ return
+
+ rc, msg = self.rpc_client.disconnect()
+ if rc:
+ print "[SUCCESS]\n"
+ else:
+ print msg + "\n"
+
+ def postcmd(self, stop, line):
+ if self.rpc_client.is_connected():
+ self.prompt = "TRex > "
+ else:
+ self.supported_rpc = None
+ self.prompt = "TRex (offline) > "
+
+ return stop
+
+ def default(self, line):
+ print "'{0}' is an unrecognized command. type 'help' or '?' for a list\n".format(line)
+
+ def do_help (self, line):
+ '''Shows This Help Screen\n'''
+ if line:
+ try:
+ func = getattr(self, 'help_' + line)
+ except AttributeError:
+ try:
+ doc = getattr(self, 'do_' + line).__doc__
+ if doc:
+ self.stdout.write("%s\n"%str(doc))
+ return
+ except AttributeError:
+ pass
+ self.stdout.write("%s\n"%str(self.nohelp % (line,)))
+ return
+ func()
+ return
+
+ print "\nSupported Console Commands:"
+ print "----------------------------\n"
+
+ cmds = [x[3:] for x in self.get_names() if x.startswith("do_")]
+ for cmd in cmds:
+ if cmd == "EOF":
+ continue
+
+ try:
+ doc = getattr(self, 'do_' + cmd).__doc__
+ if doc:
+ help = str(doc)
+ else:
+ help = "*** Undocumented Function ***\n"
+ except AttributeError:
+ help = "*** Undocumented Function ***\n"
+
+ print "{:<30} {:<30}".format(cmd + " - ", help)
+
+
+ # aliasing
+ do_exit = do_EOF = do_q = do_quit
+
+def setParserOptions ():
+ parser = argparse.ArgumentParser(prog="trex_console.py")
+
+ parser.add_argument("-s", "--server", help = "T-Rex Server [default is localhost]",
+ default = "localhost",
+ type = str)
+
+ parser.add_argument("-p", "--port", help = "T-Rex Server Port [default is 5050]\n",
+ default = 5050,
+ type = int)
+
+ return parser
+
+def main ():
+ parser = setParserOptions()
+ options = parser.parse_args(sys.argv[1:])
+
+ # RPC client
+ rpc_client = JsonRpcClient(options.server, options.port)
+
+ # console
+ try:
+ console = TrexConsole(rpc_client)
+ console.cmdloop()
+ except KeyboardInterrupt as e:
+ print "\n\n*** Caught Ctrl + C... Exiting...\n\n"
+ return
+
+if __name__ == '__main__':
+ main()
+
diff --git a/scripts/automation/trex_control_plane/console/trex_root_path.py b/scripts/automation/trex_control_plane/console/trex_root_path.py
new file mode 100644
index 00000000..de4ec03b
--- /dev/null
+++ b/scripts/automation/trex_control_plane/console/trex_root_path.py
@@ -0,0 +1,15 @@
+#!/router/bin/python
+
+import os
+import sys
+
+def add_root_to_path ():
+ """adds trex_control_plane root dir to script path, up to `depth` parent dirs"""
+ root_dirname = 'trex_control_plane'
+ file_path = os.path.dirname(os.path.realpath(__file__))
+
+ components = file_path.split(os.sep)
+ sys.path.append( str.join(os.sep, components[:components.index(root_dirname)+1]) )
+ return
+
+add_root_to_path()
diff --git a/scripts/automation/trex_control_plane/console/trex_status.py b/scripts/automation/trex_control_plane/console/trex_status.py
new file mode 100644
index 00000000..8ee669b5
--- /dev/null
+++ b/scripts/automation/trex_control_plane/console/trex_status.py
@@ -0,0 +1,212 @@
+from time import sleep
+
+import os
+
+import curses
+from curses import panel
+import random
+import collections
+import operator
+import datetime
+
+g_curses_active = False
+
+#
+def percentage (a, total):
+ x = int ((float(a) / total) * 100)
+ return str(x) + "%"
+
+# panel object
+class TrexStatusPanel():
+ def __init__ (self, h, l, y, x, headline):
+ self.h = h
+ self.l = l
+ self.y = y
+ self.x = x
+ self.headline = headline
+
+ self.win = curses.newwin(h, l, y, x)
+ self.win.erase()
+ self.win.box()
+
+ self.win.addstr(1, 2, headline, curses.A_UNDERLINE)
+ self.win.refresh()
+
+ panel.new_panel(self.win)
+ self.panel = panel.new_panel(self.win)
+ self.panel.top()
+
+ def clear (self):
+ self.win.erase()
+ self.win.box()
+ self.win.addstr(1, 2, self.headline, curses.A_UNDERLINE)
+
+ def getwin (self):
+ return self.win
+
+def float_to_human_readable (size, suffix = "bps"):
+ for unit in ['','K','M','G']:
+ if abs(size) < 1024.0:
+ return "%3.1f %s%s" % (size, unit, suffix)
+ size /= 1024.0
+ return "NaN"
+
+# status object
+class TrexStatus():
+ def __init__ (self, stdscr, rpc_client):
+ self.stdscr = stdscr
+ self.log = []
+ self.rpc_client = rpc_client
+
+ self.get_server_info()
+
+ def get_server_info (self):
+ rc, msg = self.rpc_client.get_rpc_server_status()
+
+ if rc:
+ self.server_status = msg
+ else:
+ self.server_status = None
+
+ def add_log_event (self, msg):
+ self.log.append("[{0}] {1}".format(str(datetime.datetime.now().time()), msg))
+
+ def add_panel (self, h, l, y, x, headline):
+ win = curses.newwin(h, l, y, x)
+ win.erase()
+ win.box()
+
+ win.addstr(1, 2, headline)
+ win.refresh()
+
+ panel.new_panel(win)
+ panel1 = panel.new_panel(win)
+ panel1.top()
+
+ return win, panel1
+
+ # static info panel
+ def update_info (self):
+ if self.server_status == None:
+ return
+
+ self.info_panel.clear()
+
+ connection_details = self.rpc_client.get_connection_details()
+
+ self.info_panel.getwin().addstr(3, 2, "{:<30} {:30}".format("Server:", connection_details['server'] + ":" + str(connection_details['port'])))
+ self.info_panel.getwin().addstr(4, 2, "{:<30} {:30}".format("Version:", self.server_status["general"]["version"]))
+ self.info_panel.getwin().addstr(5, 2, "{:<30} {:30}".format("Build:",
+ self.server_status["general"]["build_date"] + " @ " + self.server_status["general"]["build_time"] + " by " + self.server_status["general"]["version_user"]))
+
+ self.info_panel.getwin().addstr(6, 2, "{:<30} {:30}".format("Server Uptime:", self.server_status["general"]["uptime"]))
+
+ # general stats
+ def update_general (self, gen_stats):
+ pass
+
+ # control panel
+ def update_control (self):
+ self.control_panel.clear()
+
+ self.control_panel.getwin().addstr(1, 2, "'f' - freeze, 'c' - clear stats, 'p' - ping server, 'q' - quit")
+
+ index = 3
+
+ cut = len(self.log) - 4
+ if cut < 0:
+ cut = 0
+
+ for l in self.log[cut:]:
+ self.control_panel.getwin().addstr(index, 2, l)
+ index += 1
+
+ def generate_layout (self):
+ self.max_y = self.stdscr.getmaxyx()[0]
+ self.max_x = self.stdscr.getmaxyx()[1]
+
+ # create cls panel
+ self.main_panel = TrexStatusPanel(int(self.max_y * 0.8), self.max_x / 2, 0,0, "Trex Activity:")
+
+ self.general_panel = TrexStatusPanel(int(self.max_y * 0.6), self.max_x / 2, 0, self.max_x /2, "General Statistics:")
+
+ self.info_panel = TrexStatusPanel(int(self.max_y * 0.2), self.max_x / 2, int(self.max_y * 0.6), self.max_x /2, "Server Info:")
+
+ self.control_panel = TrexStatusPanel(int(self.max_y * 0.2), self.max_x , int(self.max_y * 0.8), 0, "")
+
+ panel.update_panels(); self.stdscr.refresh()
+
+ def wait_for_key_input (self):
+ ch = self.stdscr.getch()
+
+ if (ch != curses.ERR):
+ # stop/start status
+ if (ch == ord('f')):
+ self.update_active = not self.update_active
+ self.add_log_event("Update continued" if self.update_active else "Update stopped")
+
+ elif (ch == ord('p')):
+ self.add_log_event("Pinging RPC server")
+ rc, msg = self.rpc_client.ping_rpc_server()
+ if rc:
+ self.add_log_event("Server replied: '{0}'".format(msg))
+ else:
+ self.add_log_event("Failed to get reply")
+
+ # c - clear stats
+ elif (ch == ord('c')):
+ self.add_log_event("Statistics cleared")
+
+ elif (ch == ord('q')):
+ return False
+ else:
+ self.add_log_event("Unknown key pressed {0}".format("'" + chr(ch) + "'" if chr(ch).isalpha() else ""))
+
+ return True
+
+ # main run entry point
+ def run (self):
+ try:
+ curses.curs_set(0)
+ except:
+ pass
+
+ curses.use_default_colors()
+ self.stdscr.nodelay(1)
+ curses.nonl()
+ curses.noecho()
+
+ self.generate_layout()
+
+ self.update_active = True
+ while (True):
+
+ rc = self.wait_for_key_input()
+ if not rc:
+ break
+
+ self.update_control()
+ self.update_info()
+
+ panel.update_panels();
+ self.stdscr.refresh()
+ sleep(0.1)
+
+
+def show_trex_status_internal (stdscr, rpc_client):
+ trex_status = TrexStatus(stdscr, rpc_client)
+ trex_status.run()
+
+def show_trex_status (rpc_client):
+
+ try:
+ curses.wrapper(show_trex_status_internal, rpc_client)
+ except KeyboardInterrupt:
+ curses.endwin()
+
+def cleanup ():
+ try:
+ curses.endwin()
+ except:
+ pass
+
diff --git a/scripts/automation/trex_control_plane/server/outer_packages.py b/scripts/automation/trex_control_plane/server/outer_packages.py
index 730934bf..f07ed59a 100755
--- a/scripts/automation/trex_control_plane/server/outer_packages.py
+++ b/scripts/automation/trex_control_plane/server/outer_packages.py
@@ -12,7 +12,6 @@ PATH_TO_PYTHON_LIB = os.path.abspath(os.path.join(ROOT_PATH, os.pardir, os.pard
SERVER_MODULES = ['enum34-1.0.4',
'jsonrpclib-pelix-0.2.5',
'zmq',
- 'pyzmq-14.7.0',
'python-daemon-2.0.5',
'lockfile-0.10.2',
'termstyle'
diff --git a/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py b/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py
index 28e154ee..7a278af8 100755
--- a/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py
+++ b/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py
@@ -13,25 +13,23 @@ from common.trex_status_e import TRexStatus
CCustomLogger.setup_custom_logger('TRexServer')
logger = logging.getLogger('TRexServer')
+
class ZmqMonitorSession(threading.Thread):
def __init__(self, trexObj , zmq_port):
super(ZmqMonitorSession, self).__init__()
self.stoprequest = threading.Event()
-# self.terminateFlag = False
self.first_dump = True
self.zmq_port = zmq_port
- self.zmq_publisher = "tcp://localhost:{port}".format( port = self.zmq_port )
-# self.context = zmq.Context()
-# self.socket = self.context.socket(zmq.SUB)
+ self.zmq_publisher = "tcp://localhost:{port}".format(port=self.zmq_port)
self.trexObj = trexObj
self.expect_trex = self.trexObj.expect_trex # used to signal if T-Rex is expected to run and if data should be considered
self.decoder = JSONDecoder()
logger.info("ZMQ monitor initialization finished")
- def run (self):
+ def run(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
- logger.info("ZMQ monitor started listening @ {pub}".format( pub = self.zmq_publisher ) )
+ logger.info("ZMQ monitor started listening @ {pub}".format(pub=self.zmq_publisher))
self.socket.connect(self.zmq_publisher)
self.socket.setsockopt(zmq.SUBSCRIBE, '')
@@ -46,10 +44,10 @@ class ZmqMonitorSession(threading.Thread):
# allow this exception since it comes from ZMQ monitor termination
pass
else:
- logger.error("ZMQ monitor thrown an exception. Received exception: {ex}".format(ex = e))
+ logger.error("ZMQ monitor thrown an exception. Received exception: {ex}".format(ex=e))
raise
- def join (self, timeout = None):
+ def join(self, timeout=None):
self.stoprequest.set()
logger.debug("Handling termination of ZMQ monitor thread")
self.socket.close()
@@ -57,15 +55,15 @@ class ZmqMonitorSession(threading.Thread):
logger.info("ZMQ monitor resources has been freed.")
super(ZmqMonitorSession, self).join(timeout)
- def parse_and_update_zmq_dump (self, zmq_dump):
+ def parse_and_update_zmq_dump(self, zmq_dump):
try:
dict_obj = self.decoder.decode(zmq_dump)
except ValueError:
- logger.error("ZMQ dump failed JSON-RPC decode. Ignoring. Bad dump was: {dump}".format(dump = zmq_dump))
+ logger.error("ZMQ dump failed JSON-RPC decode. Ignoring. Bad dump was: {dump}".format(dump=zmq_dump))
dict_obj = None
# add to trex_obj zmq latest dump, based on its 'name' header
- if dict_obj is not None and dict_obj!={}:
+ if dict_obj is not None and dict_obj != {}:
self.trexObj.zmq_dump[dict_obj['name']] = dict_obj
if self.first_dump:
# change TRexStatus from starting to Running once the first ZMQ dump is obtained and parsed successfully