diff options
Diffstat (limited to 'scripts/automation/trex_control_plane/server')
8 files changed, 1059 insertions, 0 deletions
diff --git a/scripts/automation/trex_control_plane/server/CCustomLogger.py b/scripts/automation/trex_control_plane/server/CCustomLogger.py new file mode 100755 index 00000000..ecf7d519 --- /dev/null +++ b/scripts/automation/trex_control_plane/server/CCustomLogger.py @@ -0,0 +1,100 @@ +
+import sys
+import os
+import logging
+
+
+def setup_custom_logger(name, log_path = None):
+ # first make sure path availabe
+# if log_path is None:
+# log_path = os.getcwd()+'/trex_log.log'
+# else:
+# directory = os.path.dirname(log_path)
+# if not os.path.exists(directory):
+# os.makedirs(directory)
+ logging.basicConfig(level = logging.INFO,
+ format = '%(asctime)s %(name)-10s %(module)-20s %(levelname)-8s %(message)s',
+ datefmt = '%m-%d %H:%M')
+# filename= log_path,
+# filemode= 'w')
+#
+# # define a Handler which writes INFO messages or higher to the sys.stderr
+# consoleLogger = logging.StreamHandler()
+# consoleLogger.setLevel(logging.ERROR)
+# # set a format which is simpler for console use
+# formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
+# # tell the handler to use this format
+# consoleLogger.setFormatter(formatter)
+#
+# # add the handler to the logger
+# logging.getLogger(name).addHandler(consoleLogger)
+
+def setup_daemon_logger (name, log_path = None):
+ # first make sure path availabe
+ logging.basicConfig(level = logging.INFO,
+ format = '%(asctime)s %(name)-10s %(module)-20s %(levelname)-8s %(message)s',
+ datefmt = '%m-%d %H:%M',
+ filename= log_path,
+ filemode= 'w')
+
+class CustomLogger(object):
+
+ def __init__(self, log_filename):
+ # Store the original stdout and stderr
+ sys.stdout.flush()
+ sys.stderr.flush()
+
+ self.stdout_fd = os.dup(sys.stdout.fileno())
+ self.devnull = os.open('/dev/null', os.O_WRONLY)
+ self.log_file = open(log_filename, 'w')
+ self.silenced = False
+ self.pending_log_file_prints = 0
+
+ # silence all prints from stdout
+ def silence(self):
+ os.dup2(self.devnull, sys.stdout.fileno())
+ self.silenced = True
+
+ # restore stdout status
+ def restore(self):
+ sys.stdout.flush()
+ sys.stderr.flush()
+ # Restore normal stdout
+ os.dup2(self.stdout_fd, sys.stdout.fileno())
+ self.silenced = False
+
+ #print a message to the log (both stdout / log file)
+ def log(self, text, force = False, newline = True):
+ self.log_file.write((text + "\n") if newline else text)
+ self.pending_log_file_prints += 1
+
+ if (self.pending_log_file_prints >= 10):
+ self.log_file.flush()
+ self.pending_log_file_prints = 0
+
+ self.console(text, force, newline)
+
+ # print a message to the console alone
+ def console(self, text, force = False, newline = True):
+ _text = (text + "\n") if newline else text
+ # if we are silenced and not forced - go home
+ if self.silenced and not force:
+ return
+
+ if self.silenced:
+ os.write(self.stdout_fd, _text)
+ else:
+ sys.stdout.write(_text)
+
+ sys.stdout.flush()
+
+ # flush
+ def flush(self):
+ sys.stdout.flush()
+ self.log_file.flush()
+
+ def __exit__(self, type, value, traceback):
+ sys.stdout.flush()
+ self.log_file.flush()
+ os.close(self.devnull)
+ os.close(self.log_file)
diff --git a/scripts/automation/trex_control_plane/server/extended_daemon_runner.py b/scripts/automation/trex_control_plane/server/extended_daemon_runner.py new file mode 100755 index 00000000..07eedd9f --- /dev/null +++ b/scripts/automation/trex_control_plane/server/extended_daemon_runner.py @@ -0,0 +1,144 @@ +#!/usr/bin/python
+
+import outer_packages
+import lockfile
+from daemon import runner,daemon
+from daemon.runner import *
+import os, sys
+from argparse import ArgumentParser
+from trex_server import trex_parser
+try:
+ from python_lib.termstyle import termstyle
+except ImportError:
+ import termstyle
+
+
+
+def daemonize_parser (parser_obj, action_funcs, help_menu):
+ """Update the regular process parser to deal with daemon process options"""
+ parser_obj.description += " (as a daemon process)"
+ parser_obj.usage = None
+ parser_obj.add_argument("action", choices = action_funcs,
+ action="store", help = help_menu )
+ return
+
+
+class ExtendedDaemonRunner(runner.DaemonRunner):
+ """ Controller for a callable running in a separate background process.
+
+ The first command-line argument is the action to take:
+
+ * 'start': Become a daemon and call `app.run()`.
+ * 'stop': Exit the daemon process specified in the PID file.
+ * 'restart': Stop, then start.
+
+ """
+
+ help_menu = """Specify action command to be applied on server.
+ (*) start : start the application in as a daemon process.
+ (*) show : prompt a updated status of daemon process (running/ not running).
+ (*) stop : exit the daemon process.
+ (*) restart : stop, then start again the application as daemon process
+ (*) start-live : start the application in live mode (no daemon process).
+ """
+
+ def __init__ (self, app, parser_obj):
+ """ Set up the parameters of a new runner.
+ THIS METHOD INTENTIONALLY DO NOT INVOKE SUPER __init__() METHOD
+
+ :param app: The application instance; see below.
+ :return: ``None``.
+
+ The `app` argument must have the following attributes:
+
+ * `stdin_path`, `stdout_path`, `stderr_path`: Filesystem paths
+ to open and replace the existing `sys.stdin`, `sys.stdout`,
+ `sys.stderr`.
+
+ * `pidfile_path`: Absolute filesystem path to a file that will
+ be used as the PID file for the daemon. If ``None``, no PID
+ file will be used.
+
+ * `pidfile_timeout`: Used as the default acquisition timeout
+ value supplied to the runner's PID lock file.
+
+ * `run`: Callable that will be invoked when the daemon is
+ started.
+
+ """
+ super(runner.DaemonRunner, self).__init__()
+ # update action_funcs to support more operations
+ self.update_action_funcs()
+
+ daemonize_parser(parser_obj, self.action_funcs, ExtendedDaemonRunner.help_menu)
+ args = parser_obj.parse_args()
+ self.action = unicode(args.action)
+
+ self.app = app
+ self.daemon_context = daemon.DaemonContext()
+ self.daemon_context.stdin = open(app.stdin_path, 'rt')
+ self.daemon_context.stdout = open(app.stdout_path, 'w+t')
+ self.daemon_context.stderr = open(
+ app.stderr_path, 'a+t', buffering=0)
+
+ self.pidfile = None
+ if app.pidfile_path is not None:
+ self.pidfile = make_pidlockfile(app.pidfile_path, app.pidfile_timeout)
+ self.daemon_context.pidfile = self.pidfile
+
+ # mask out all arguments that aren't relevant to main app script
+
+
+ def update_action_funcs (self):
+ self.action_funcs.update({u'start-live': self._start_live, u'show': self._show}) # add key (=action), value (=desired func)
+
+ @staticmethod
+ def _start_live (self):
+ self.app.run()
+
+ @staticmethod
+ def _show (self):
+ if self.pidfile.is_locked():
+ print termstyle.red("T-Rex server daemon is running")
+ else:
+ print termstyle.red("T-Rex server daemon is NOT running")
+
+ def do_action (self):
+ self.__prevent_duplicate_runs()
+ self.__prompt_init_msg()
+ try:
+ super(ExtendedDaemonRunner, self).do_action()
+ if self.action == 'stop':
+ self.__verify_termination()
+ except runner.DaemonRunnerStopFailureError:
+ if self.action == 'restart':
+ # error means server wasn't running in the first place- so start it!
+ self.action = 'start'
+ self.do_action()
+
+
+ def __prevent_duplicate_runs (self):
+ if self.action == 'start' and self.pidfile.is_locked():
+ print termstyle.green("Server daemon is already running")
+ exit(1)
+ elif self.action == 'stop' and not self.pidfile.is_locked():
+ print termstyle.green("Server daemon is not running")
+ exit(1)
+
+ def __prompt_init_msg (self):
+ if self.action == 'start':
+ print termstyle.green("Starting daemon server...")
+ elif self.action == 'stop':
+ print termstyle.green("Stopping daemon server...")
+
+ def __verify_termination (self):
+ pass
+# import time
+# while self.pidfile.is_locked():
+# time.sleep(2)
+# self._stop()
+#
+
+
+if __name__ == "__main__":
+ pass
diff --git a/scripts/automation/trex_control_plane/server/outer_packages.py b/scripts/automation/trex_control_plane/server/outer_packages.py new file mode 100755 index 00000000..ab25ea68 --- /dev/null +++ b/scripts/automation/trex_control_plane/server/outer_packages.py @@ -0,0 +1,66 @@ +#!/router/bin/python + +import sys,site +import platform,os +import tarfile +import errno +import pwd + +CURRENT_PATH = os.path.dirname(os.path.realpath(__file__)) +ROOT_PATH = os.path.abspath(os.path.join(CURRENT_PATH, os.pardir)) # path to trex_control_plane directory +PATH_TO_PYTHON_LIB = os.path.abspath(os.path.join(ROOT_PATH, 'python_lib')) + +SERVER_MODULES = ['enum34-1.0.4', + # 'jsonrpclib-0.1.3', + 'jsonrpclib-pelix-0.2.5', + 'zmq', + 'python-daemon-2.0.5', + 'lockfile-0.10.2', + 'termstyle' + ] + +def extract_zmq_package (): + """make sure zmq package is available""" + + os.chdir(PATH_TO_PYTHON_LIB) + if not os.path.exists('zmq'): + if os.path.exists('zmq_fedora.tar.gz'): # make sure tar file is available for extraction + try: + tar = tarfile.open("zmq_fedora.tar.gz") + # finally, extract the tarfile locally + tar.extractall() + except OSError as err: + if err.errno == errno.EACCES: + # fall back. try extracting using currently logged in user + stat_info = os.stat(PATH_TO_PYTHON_LIB) + uid = stat_info.st_uid + logged_user = pwd.getpwuid(uid).pw_name + if logged_user != 'root': + try: + os.system("sudo -u {user} tar -zxvf zmq_fedora.tar.gz".format(user = logged_user)) + except: + raise OSError(13, 'Permission denied: Please make sure that logged user have sudo access and writing privileges to `python_lib` directory.') + else: + raise OSError(13, 'Permission denied: Please make sure that logged user have sudo access and writing privileges to `python_lib` directory.') + finally: + tar.close() + else: + raise IOError("File 'zmq_fedora.tar.gz' couldn't be located at python_lib directory.") + os.chdir(CURRENT_PATH) + +def import_server_modules (): + # must be in a higher priority + sys.path.insert(0, PATH_TO_PYTHON_LIB) + sys.path.append(ROOT_PATH) + extract_zmq_package() + import_module_list(SERVER_MODULES) + +def import_module_list (modules_list): + assert(isinstance(modules_list, list)) + for p in modules_list: + full_path = os.path.join(PATH_TO_PYTHON_LIB, p) + fix_path = os.path.normcase(full_path) + site.addsitedir(full_path) + + +import_server_modules() diff --git a/scripts/automation/trex_control_plane/server/trex_daemon_server b/scripts/automation/trex_control_plane/server/trex_daemon_server new file mode 100755 index 00000000..3494e303 --- /dev/null +++ b/scripts/automation/trex_control_plane/server/trex_daemon_server @@ -0,0 +1,25 @@ +#!/usr/bin/python + +import os +import sys + +core = 0 + +if '--core' in sys.argv: + try: + idx = sys.argv.index('--core') + core = int(sys.argv[idx + 1]) + if core > 31 or core < 0: + print "Error: please provide core argument between 0 to 31" + exit(-1) + del sys.argv[idx:idx+2] + except IndexError: + print "Error: please make sure core option provided with argument" + exit(-1) + except ValueError: + print "Error: please make sure core option provided with integer argument" + exit(-1) + +str_argv = ' '.join(sys.argv[1:]) +cmd = "taskset -c {core} python automation/trex_control_plane/server/trex_daemon_server.py {argv}".format(core = core, argv = str_argv) +os.system(cmd) diff --git a/scripts/automation/trex_control_plane/server/trex_daemon_server.py b/scripts/automation/trex_control_plane/server/trex_daemon_server.py new file mode 100755 index 00000000..5032423a --- /dev/null +++ b/scripts/automation/trex_control_plane/server/trex_daemon_server.py @@ -0,0 +1,87 @@ +#!/usr/bin/python + +import outer_packages +import daemon +from trex_server import do_main_program, trex_parser +import CCustomLogger + +import logging +import time +import sys +import os, errno +import grp +import signal +from daemon import runner +from extended_daemon_runner import ExtendedDaemonRunner +import lockfile +import errno + +class TRexServerApp(object): + def __init__(self): + TRexServerApp.create_working_dirs() + self.stdin_path = '/dev/null' + self.stdout_path = '/dev/tty' # All standard prints will come up from this source. + self.stderr_path = "/var/log/trex/trex_daemon_server.log" # All log messages will come up from this source + self.pidfile_path = '/var/run/trex/trex_daemon_server.pid' + self.pidfile_timeout = 5 # timeout in seconds + + def run(self): + do_main_program() + + + @staticmethod + def create_working_dirs(): + if not os.path.exists('/var/log/trex'): + os.mkdir('/var/log/trex') + if not os.path.exists('/var/run/trex'): + os.mkdir('/var/run/trex') + + + +def main (): + + trex_app = TRexServerApp() + + # setup the logger + default_log_path = '/var/log/trex/trex_daemon_server.log' + + try: + CCustomLogger.setup_daemon_logger('TRexServer', default_log_path) + logger = logging.getLogger('TRexServer') + logger.setLevel(logging.INFO) + formatter = logging.Formatter("%(asctime)s %(name)-10s %(module)-20s %(levelname)-8s %(message)s") + handler = logging.FileHandler("/var/log/trex/trex_daemon_server.log") + logger.addHandler(handler) + except EnvironmentError, e: + if e.errno == errno.EACCES: # catching permission denied error + print "Launching user must have sudo privileges in order to run T-Rex daemon.\nTerminating daemon process." + exit(-1) + + try: + daemon_runner = ExtendedDaemonRunner(trex_app, trex_parser) + except IOError as err: + # catch 'tty' error when launching server from remote location + if err.errno == errno.ENXIO: + trex_app.stdout_path = "/dev/null" + daemon_runner = ExtendedDaemonRunner(trex_app, trex_parser) + else: + raise + + #This ensures that the logger file handle does not get closed during daemonization + daemon_runner.daemon_context.files_preserve=[handler.stream] + + try: + if not set(['start', 'stop']).isdisjoint(set(sys.argv)): + print "Logs are saved at: {log_path}".format( log_path = default_log_path ) + daemon_runner.do_action() + + except lockfile.LockTimeout as inst: + logger.error(inst) + print inst + print """ + Please try again once the timeout has been reached. + If this error continues, consider killing the process manually and restart the daemon.""" + + +if __name__ == "__main__": + main() diff --git a/scripts/automation/trex_control_plane/server/trex_launch_thread.py b/scripts/automation/trex_control_plane/server/trex_launch_thread.py new file mode 100755 index 00000000..b4be60a9 --- /dev/null +++ b/scripts/automation/trex_control_plane/server/trex_launch_thread.py @@ -0,0 +1,92 @@ +#!/router/bin/python
+
+
+import os
+import signal
+import socket
+from common.trex_status_e import TRexStatus
+import subprocess
+import time
+import threading
+import logging
+import CCustomLogger
+
+# setup the logger
+CCustomLogger.setup_custom_logger('TRexServer')
+logger = logging.getLogger('TRexServer')
+
+
+class AsynchronousTRexSession(threading.Thread):
+ def __init__(self, trexObj , trex_launch_path, trex_cmd_data):
+ super(AsynchronousTRexSession, self).__init__()
+ self.stoprequest = threading.Event()
+ self.terminateFlag = False
+ self.launch_path = trex_launch_path
+ self.cmd, self.export_path, self.duration = trex_cmd_data
+ self.session = None
+ self.trexObj = trexObj
+ self.time_stamps = {'start' : None, 'run_time' : None}
+ self.trexObj.zmq_dump = {}
+
+ def run (self):
+
+ with open(os.devnull, 'w') as DEVNULL:
+ self.time_stamps['start'] = self.time_stamps['run_time'] = time.time()
+ self.session = subprocess.Popen("exec "+self.cmd, cwd = self.launch_path, shell=True, stdin = DEVNULL, stderr = subprocess.PIPE, preexec_fn=os.setsid)
+ logger.info("T-Rex session initialized successfully, Parent process pid is {pid}.".format( pid = self.session.pid ))
+ while self.session.poll() is None: # subprocess is NOT finished
+ time.sleep(0.5)
+ if self.stoprequest.is_set():
+ logger.debug("Abort request received by handling thread. Terminating T-Rex session." )
+ os.killpg(self.session.pid, signal.SIGUSR1)
+ self.trexObj.set_status(TRexStatus.Idle)
+ self.trexObj.set_verbose_status("T-Rex is Idle")
+ break
+
+ self.time_stamps['run_time'] = time.time() - self.time_stamps['start']
+
+ try:
+ if self.time_stamps['run_time'] < 5:
+ logger.error("T-Rex run failed due to wrong input parameters, or due to reachability issues.")
+ self.trexObj.set_verbose_status("T-Rex run failed due to wrong input parameters, or due to reachability issues.\n\nT-Rex command: {cmd}\n\nRun output:\n{output}".format(
+ cmd = self.cmd, output = self.load_trex_output(self.export_path)))
+ self.trexObj.errcode = -11
+ elif (self.session.returncode is not None and self.session.returncode < 0) or ( (self.time_stamps['run_time'] < self.duration) and (not self.stoprequest.is_set()) ):
+ if (self.session.returncode is not None and self.session.returncode < 0):
+ logger.debug("Failed T-Rex run due to session return code ({ret_code})".format( ret_code = self.session.returncode ) )
+ elif ( (self.time_stamps['run_time'] < self.duration) and not self.stoprequest.is_set()):
+ logger.debug("Failed T-Rex run due to running time ({runtime}) combined with no-stopping request.".format( runtime = self.time_stamps['run_time'] ) )
+
+ logger.warning("T-Rex run was terminated unexpectedly by outer process or by the hosting OS")
+ self.trexObj.set_verbose_status("T-Rex run was terminated unexpectedly by outer process or by the hosting OS.\n\nRun output:\n{output}".format(
+ output = self.load_trex_output(self.export_path)))
+ self.trexObj.errcode = -15
+ else:
+ logger.info("T-Rex run session finished.")
+ self.trexObj.set_verbose_status('T-Rex finished.')
+ self.trexObj.errcode = None
+
+ finally:
+ self.trexObj.set_status(TRexStatus.Idle)
+ logger.info("TRex running state changed to 'Idle'.")
+ self.trexObj.expect_trex.clear()
+ logger.debug("Finished handling a single run of T-Rex.")
+ self.trexObj.zmq_dump = None
+
+ def join (self, timeout = None):
+ self.stoprequest.set()
+ super(AsynchronousTRexSession, self).join(timeout)
+
+ def load_trex_output (self, export_path):
+ output = None
+ with open(export_path, 'r') as f:
+ output = f.read()
+ return output
+
+
+
+
+
+if __name__ == "__main__":
+ pass
+
diff --git a/scripts/automation/trex_control_plane/server/trex_server.py b/scripts/automation/trex_control_plane/server/trex_server.py new file mode 100755 index 00000000..992a1d5f --- /dev/null +++ b/scripts/automation/trex_control_plane/server/trex_server.py @@ -0,0 +1,465 @@ +#!/usr/bin/python + + +import os +import stat +import sys +import time +import outer_packages +import zmq +from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer +import jsonrpclib +from jsonrpclib import Fault +import binascii +import socket +import errno +import signal +import binascii +from common.trex_status_e import TRexStatus +from common.trex_exceptions import * +import subprocess +from random import randrange +#import shlex +import logging +import threading +import CCustomLogger +from trex_launch_thread import AsynchronousTRexSession +from zmq_monitor_thread import ZmqMonitorSession +from argparse import ArgumentParser, RawTextHelpFormatter +from json import JSONEncoder + + +# setup the logger +CCustomLogger.setup_custom_logger('TRexServer') +logger = logging.getLogger('TRexServer') + +class CTRexServer(object): + """This class defines the server side of the RESTfull interaction with T-Rex""" + DEFAULT_TREX_PATH = '/auto/proj-pcube-b/apps/PL-b/tools/bp_sim2/v1.55/' #'/auto/proj-pcube-b/apps/PL-b/tools/nightly/trex_latest' + TREX_START_CMD = './t-rex-64' + DEFAULT_FILE_PATH = '/tmp/trex_files/' + + def __init__(self, trex_path, trex_files_path, trex_host = socket.gethostname(), trex_daemon_port = 8090, trex_zmq_port = 4500): + """ + Parameters + ---------- + trex_host : str + a string of the t-rex ip address or hostname. + default value: machine hostname as fetched from socket.gethostname() + trex_daemon_port : int + the port number on which the trex-daemon server can be reached + default value: 8090 + trex_zmq_port : int + the port number on which trex's zmq module will interact with daemon server + default value: 4500 + + Instatiate a T-Rex client object, and connecting it to listening daemon-server + """ + self.TREX_PATH = os.path.abspath(os.path.dirname(trex_path+'/')) + self.trex_files_path = os.path.abspath(os.path.dirname(trex_files_path+'/')) + self.__check_trex_path_validity() + self.__check_files_path_validity() + self.trex = CTRex() + self.trex_host = trex_host + self.trex_daemon_port = trex_daemon_port + self.trex_zmq_port = trex_zmq_port + self.trex_server_path = "http://{hostname}:{port}".format( hostname = trex_host, port = trex_daemon_port ) + self.start_lock = threading.Lock() + self.__reservation = None + self.zmq_monitor = ZmqMonitorSession(self.trex, self.trex_zmq_port) # intiate single ZMQ monitor thread for server usage + + def add(self, x, y): + print "server function add ",x,y + logger.info("Processing add function. Parameters are: {0}, {1} ".format( x, y )) + return x + y + # return Fault(-10, "") + + def push_file (self, filename, bin_data): + logger.info("Processing push_file() command.") + try: + filepath = os.path.abspath(os.path.join(self.trex_files_path, filename)) + with open(filepath, 'wb') as f: + f.write(binascii.a2b_base64(bin_data)) + logger.info("push_file() command finished. `{name}` was saved at {fpath}".format( name = filename, fpath = self.trex_files_path)) + return True + except IOError as inst: + logger.error("push_file method failed. " + str(inst)) + return False + + def connectivity_check (self): + logger.info("Processing connectivity_check function.") + return True + + def start(self): + """This method fires up the daemon server based on initialized parameters of the class""" + # initialize the server instance with given reasources + try: + print "Firing up T-Rex REST daemon @ port {trex_port} ...\n".format( trex_port = self.trex_daemon_port ) + logger.info("Firing up T-Rex REST daemon @ port {trex_port} ...".format( trex_port = self.trex_daemon_port )) + logger.info("current working dir is: {0}".format(self.TREX_PATH) ) + logger.info("current files dir is : {0}".format(self.trex_files_path) ) + logger.debug("Starting TRex server. Registering methods to process.") + self.server = SimpleJSONRPCServer( (self.trex_host, self.trex_daemon_port) ) + except socket.error as e: + if e.errno == errno.EADDRINUSE: + logger.error("T-Rex server requested address already in use. Aborting server launching.") + print "T-Rex server requested address already in use. Aborting server launching." + raise socket.error(errno.EADDRINUSE, "T-Rex daemon requested address already in use. Server launch aborted. Please make sure no other process is using the desired server properties.") + + # set further functionality and peripherals to server instance + try: + self.server.register_function(self.add) + self.server.register_function(self.connectivity_check) + self.server.register_function(self.start_trex) + self.server.register_function(self.stop_trex) + self.server.register_function(self.wait_until_kickoff_finish) + self.server.register_function(self.get_running_status) + self.server.register_function(self.is_running) + self.server.register_function(self.get_running_info) + self.server.register_function(self.is_reserved) + self.server.register_function(self.get_files_path) + self.server.register_function(self.push_file) + self.server.register_function(self.reserve_trex) + self.server.register_function(self.cancel_reservation) + self.server.register_function(self.force_trex_kill) + signal.signal(signal.SIGTSTP, self.stop_handler) + signal.signal(signal.SIGTERM, self.stop_handler) + self.zmq_monitor.start() + self.server.serve_forever() + except KeyboardInterrupt: + logger.info("Daemon shutdown request detected." ) + finally: + self.zmq_monitor.join() # close ZMQ monitor thread reasources + self.server.shutdown() + pass + + def stop_handler (self, signum, frame): + logger.info("Daemon STOP request detected.") + if self.is_running(): + # in case T-Rex process is currently running, stop it before terminating server process + self.stop_trex(self.trex.get_seq()) + sys.exit(0) + + def is_running (self): + run_status = self.trex.get_status() + logger.info("Processing is_running() command. Running status is: {stat}".format(stat = run_status) ) + if run_status==TRexStatus.Running: + return True + else: + return False + + def is_reserved (self): + logger.info("Processing is_reserved() command.") + return bool(self.__reservation) + + def get_running_status (self): + run_status = self.trex.get_status() + logger.info("Processing get_running_status() command. Running status is: {stat}".format(stat = run_status) ) + return { 'state' : run_status.value, 'verbose' : self.trex.get_verbose_status() } + + def get_files_path (self): + logger.info("Processing get_files_path() command." ) + return self.trex_files_path + + def reserve_trex (self, user): + if user == "": + logger.info("T-Rex reservation cannot apply to empty string user. Request denied.") + return Fault(-33, "T-Rex reservation cannot apply to empty string user. Request denied.") + + with self.start_lock: + logger.info("Processing reserve_trex() command.") + if self.is_reserved(): + if user == self.__reservation['user']: + # return True is the same user is asking and already has the resrvation + logger.info("the same user is asking and already has the resrvation. Re-reserving T-Rex.") + return True + + logger.info("T-Rex is already reserved to another user ({res_user}), cannot reserve to another user.".format( res_user = self.__reservation['user'] )) + return Fault(-33, "T-Rex is already reserved to another user ({res_user}). Please make sure T-Rex is free before reserving it.".format( + res_user = self.__reservation['user']) ) # raise at client TRexInUseError + elif self.trex.get_status() != TRexStatus.Idle: + logger.info("T-Rex is currently running, cannot reserve T-Rex unless in Idle state.") + return Fault(-13, 'T-Rex is currently running, cannot reserve T-Rex unless in Idle state. Please try again when T-Rex run finished.') # raise at client TRexInUseError + else: + logger.info("T-Rex is now reserved for user ({res_user}).".format( res_user = user )) + self.__reservation = {'user' : user, 'since' : time.ctime()} + logger.debug("Reservation details: "+ str(self.__reservation)) + return True + + def cancel_reservation (self, user): + with self.start_lock: + logger.info("Processing cancel_reservation() command.") + if self.is_reserved(): + if self.__reservation['user'] == user: + logger.info("T-Rex reservation to {res_user} has been canceled successfully.".format(res_user = self.__reservation['user'])) + self.__reservation = None + return True + else: + logger.warning("T-Rex is reserved to different user than the provided one. Reservation wasn't canceled.") + return Fault(-33, "Cancel reservation request is available to the user that holds the reservation. Request denied") # raise at client TRexRequestDenied + + else: + logger.info("T-Rex is not reserved to anyone. No need to cancel anything") + assert(self.__reservation is None) + return False + + + def start_trex(self, trex_cmd_options, user, block_to_success = True, timeout = 30): + with self.start_lock: + logger.info("Processing start_trex() command.") + if self.is_reserved(): + # check if this is not the user to which T-Rex is reserved + if self.__reservation['user'] != user: + logger.info("T-Rex is reserved to another user ({res_user}). Only that user is allowed to initiate new runs.".format(res_user = self.__reservation['user'])) + return Fault(-33, "T-Rex is reserved to another user ({res_user}). Only that user is allowed to initiate new runs.".format(res_user = self.__reservation['user'])) # raise at client TRexRequestDenied + elif self.trex.get_status() != TRexStatus.Idle: + logger.info("T-Rex is already taken, cannot create another run until done.") + return Fault(-13, '') # raise at client TRexInUseError + + try: + server_cmd_data = self.generate_run_cmd(**trex_cmd_options) + self.zmq_monitor.first_dump = True + self.trex.start_trex(self.TREX_PATH, server_cmd_data) + logger.info("T-Rex session has been successfully initiated.") + if block_to_success: + # delay server response until T-Rex is at 'Running' state. + start_time = time.time() + trex_state = None + while (time.time() - start_time) < timeout : + trex_state = self.trex.get_status() + if trex_state != TRexStatus.Starting: + break + else: + time.sleep(0.5) + + # check for T-Rex run started normally + if trex_state == TRexStatus.Starting: # reached timeout + logger.warning("TimeoutError: T-Rex initiation outcome could not be obtained, since T-Rex stays at Starting state beyond defined timeout.") + return Fault(-12, 'TimeoutError: T-Rex initiation outcome could not be obtained, since T-Rex stays at Starting state beyond defined timeout.') # raise at client TRexWarning + elif trex_state == TRexStatus.Idle: + return Fault(-11, self.trex.get_verbose_status()) # raise at client TRexError + + # reach here only if T-Rex is at 'Running' state + self.trex.gen_seq() + return self.trex.get_seq() # return unique seq number to client + + except TypeError as e: + logger.error("T-Rex command generation failed, probably because either -f (traffic generation .yaml file) and -c (num of cores) was not specified correctly.\nReceived params: {params}".format( params = trex_cmd_options) ) + raise TypeError('T-Rex -f (traffic generation .yaml file) and -c (num of cores) must be specified.') + + + def stop_trex(self, seq): + logger.info("Processing stop_trex() command.") + if self.trex.get_seq()== seq: + logger.debug("Abort request legit since seq# match") + return self.trex.stop_trex() + else: + if self.trex.get_status() != TRexStatus.Idle: + logger.warning("Abort request is only allowed to process initiated the run. Request denied.") + + return Fault(-33, 'Abort request is only allowed to process initiated the run. Request denied.') # raise at client TRexRequestDenied + else: + return False + + def force_trex_kill (self): + logger.info("Processing force_trex_kill() command. --> Killing T-Rex session indiscriminately.") + return self.trex.stop_trex() + + def wait_until_kickoff_finish (self, timeout = 40): + # block until T-Rex exits Starting state + logger.info("Processing wait_until_kickoff_finish() command.") + trex_state = None + start_time = time.time() + while (time.time() - start_time) < timeout : + trex_state = self.trex.get_status() + if trex_state != TRexStatus.Starting: + return + return Fault(-12, 'TimeoutError: T-Rex initiation outcome could not be obtained, since T-Rex stays at Starting state beyond defined timeout.') # raise at client TRexWarning + + def get_running_info (self): + logger.info("Processing get_running_info() command.") + return self.trex.get_running_info() + + def generate_run_cmd (self, f, d, iom = 0, export_path="/tmp/trex.txt", **kwargs): + """ generate_run_cmd(self, trex_cmd_options, export_path) -> str + + Generates a custom running command for the kick-off of the T-Rex traffic generator. + Returns a tuple of command (string) and export path (string) to be issued on the trex server + + Parameters + ---------- + trex_cmd_options : str + Defines the exact command to run on the t-rex + Example: "-c 2 -m 0.500000 -d 100 -f cap2/sfr.yaml --nc -p -l 1000" + export_path : str + a full system path to which the results of the trex-run will be logged. + + """ + if 'results_file_path' in kwargs: + export_path = kwargs['results_file_path'] + del kwargs['results_file_path'] + + + # adding additional options to the command + trex_cmd_options = '' + for key, value in kwargs.iteritems(): + tmp_key = key.replace('_','-') + dash = ' -' if (len(key)==1) else ' --' + if (value == True) and (str(value) != '1'): # checking also int(value) to excape from situation that 1 translates by python to 'True' + trex_cmd_options += (dash + tmp_key) + else: + trex_cmd_options += (dash + '{k} {val}'.format( k = tmp_key, val = value )) + + cmd = "{run_command} -f {gen_file} -d {duration} --iom {io} {cmd_options} --no-key > {export}".format( # -- iom 0 disables the periodic log to the screen (not needed) + run_command = self.TREX_START_CMD, + gen_file = f, + duration = d, + cmd_options = trex_cmd_options, + io = iom, + export = export_path ) + + logger.info("T-REX FULL COMMAND: {command}".format(command = cmd) ) + + return (cmd, export_path, long(d)) + + def __check_trex_path_validity(self): + # check for executable existance + if not os.path.exists(self.TREX_PATH+'/t-rex-64'): + print "The provided T-Rex path do not contain an executable T-Rex file.\nPlease check the path and retry." + logger.error("The provided T-Rex path do not contain an executable T-Rex file") + exit(-1) + # check for executable permissions + st = os.stat(self.TREX_PATH+'/t-rex-64') + if not bool(st.st_mode & (stat.S_IXUSR ) ): + print "The provided T-Rex path do not contain an T-Rex file with execution privileges.\nPlease check the files permissions and retry." + logger.error("The provided T-Rex path do not contain an T-Rex file with execution privileges") + exit(-1) + else: + return + + def __check_files_path_validity(self): + # first, check for path existance. otherwise, try creating it with appropriate credentials + if not os.path.exists(self.trex_files_path): + try: + os.makedirs(self.trex_files_path, 0660) + return + except os.error as inst: + print "The provided files path does not exist and cannot be created with needed access credentials using root user.\nPlease check the path's permissions and retry." + logger.error("The provided files path does not exist and cannot be created with needed access credentials using root user.") + exit(-1) + elif os.access(self.trex_files_path, os.W_OK): + return + else: + print "The provided files path has insufficient access credentials for root user.\nPlease check the path's permissions and retry." + logger.error("The provided files path has insufficient access credentials for root user") + exit(-1) + +class CTRex(object): + def __init__(self): + self.status = TRexStatus.Idle + self.verbose_status = 'T-Rex is Idle' + self.errcode = None + self.session = None + self.zmq_monitor = None + self.zmq_dump = None + self.seq = None + self.expect_trex = threading.Event() + self.encoder = JSONEncoder() + + def get_status(self): + return self.status + + def set_status(self, new_status): + self.status = new_status + + def get_verbose_status(self): + return self.verbose_status + + def set_verbose_status(self, new_status): + self.verbose_status = new_status + + def gen_seq (self): + self.seq = randrange(1,1000) + + def get_seq (self): + return self.seq + + def get_running_info (self): + if self.status == TRexStatus.Running: + return self.encoder.encode(self.zmq_dump) + else: + logger.info("T-Rex isn't running. Running information isn't available.") + if self.status == TRexStatus.Idle: + if self.errcode is not None: # some error occured + logger.info("T-Rex is in Idle state, with errors. returning fault") + return Fault(self.errcode, self.verbose_status) # raise at client relevant exception, depending on the reason the error occured + else: + logger.info("T-Rex is in Idle state, no errors. returning {}") + return u'{}' + + return Fault(-12, self.verbose_status) # raise at client TRexWarning, indicating T-Rex is back to Idle state or still in Starting state + + def stop_trex(self): + if self.status == TRexStatus.Idle: + # t-rex isn't running, nothing to abort + logger.info("T-Rex isn't running. No need to stop anything.") + if self.errcode is not None: # some error occured, notify client despite T-Rex already stopped + return Fault(self.errcode, self.verbose_status) # raise at client relevant exception, depending on the reason the error occured + return False + else: + # handle stopping t-rex's run + self.session.join() + logger.info("T-Rex session has been successfully aborted.") + return True + + def start_trex(self, trex_launch_path, trex_cmd): + self.set_status(TRexStatus.Starting) + logger.info("TRex running state changed to 'Starting'.") + self.set_verbose_status('T-Rex is starting (data is not available yet)') + + self.errcode = None + self.session = AsynchronousTRexSession(self, trex_launch_path, trex_cmd) + self.session.start() + self.expect_trex.set() +# self.zmq_monitor= ZmqMonitorSession(self, zmq_port) +# self.zmq_monitor.start() + + + +def generate_trex_parser (): + default_path = os.path.abspath(os.path.join(outer_packages.CURRENT_PATH, os.pardir, os.pardir, os.pardir)) + default_files_path = os.path.abspath(CTRexServer.DEFAULT_FILE_PATH) + + parser = ArgumentParser(description = 'Run server application for T-Rex traffic generator', + formatter_class = RawTextHelpFormatter, + usage = """ +trex_daemon_server [options] +""" ) + + parser.add_argument('-v', '--version', action='version', version='%(prog)s 1.0') + parser.add_argument("-p", "--daemon-port", type=int, default = 8090, metavar="PORT", dest="daemon_port", + help="Select port on which the daemon runs.\nDefault port is 8090.", action="store") + parser.add_argument("-z", "--zmq-port", dest="zmq_port", type=int, + action="store", help="Select port on which the ZMQ module listens to T-Rex.\nDefault port is 4500.", metavar="PORT", + default = 4500) + parser.add_argument("-t", "--trex-path", dest="trex_path", + action="store", help="Specify the compiled T-Rex directory from which T-Rex would run.\nDefault path is: {def_path}.".format( def_path = default_path ), + metavar="PATH", default = default_path ) + parser.add_argument("-f", "--files-path", dest="files_path", + action="store", help="Specify a path to directory on which pushed files will be saved at.\nDefault path is: {def_path}.".format( def_path = default_files_path ), + metavar="PATH", default = default_files_path ) + return parser + +trex_parser = generate_trex_parser() + +def do_main_program (): + + args = trex_parser.parse_args() + + server = CTRexServer(trex_daemon_port = args.daemon_port, trex_zmq_port = args.zmq_port, trex_path = args.trex_path, trex_files_path = args.files_path) + server.start() + + +if __name__ == "__main__": + do_main_program() + diff --git a/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py b/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py new file mode 100755 index 00000000..28e154ee --- /dev/null +++ b/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py @@ -0,0 +1,80 @@ +#!/router/bin/python
+
+import os
+import outer_packages
+import zmq
+import threading
+import logging
+import CCustomLogger
+from json import JSONDecoder
+from common.trex_status_e import TRexStatus
+
+# setup the logger
+CCustomLogger.setup_custom_logger('TRexServer')
+logger = logging.getLogger('TRexServer')
+
+class ZmqMonitorSession(threading.Thread):
+ def __init__(self, trexObj , zmq_port):
+ super(ZmqMonitorSession, self).__init__()
+ self.stoprequest = threading.Event()
+# self.terminateFlag = False
+ self.first_dump = True
+ self.zmq_port = zmq_port
+ self.zmq_publisher = "tcp://localhost:{port}".format( port = self.zmq_port )
+# self.context = zmq.Context()
+# self.socket = self.context.socket(zmq.SUB)
+ self.trexObj = trexObj
+ self.expect_trex = self.trexObj.expect_trex # used to signal if T-Rex is expected to run and if data should be considered
+ self.decoder = JSONDecoder()
+ logger.info("ZMQ monitor initialization finished")
+
+ def run (self):
+ self.context = zmq.Context()
+ self.socket = self.context.socket(zmq.SUB)
+ logger.info("ZMQ monitor started listening @ {pub}".format( pub = self.zmq_publisher ) )
+ self.socket.connect(self.zmq_publisher)
+ self.socket.setsockopt(zmq.SUBSCRIBE, '')
+
+ while not self.stoprequest.is_set():
+ try:
+ zmq_dump = self.socket.recv() # This call is BLOCKING until data received!
+ if self.expect_trex.is_set():
+ self.parse_and_update_zmq_dump(zmq_dump)
+ logger.debug("ZMQ dump received on socket, and saved to trexObject.")
+ except Exception as e:
+ if self.stoprequest.is_set():
+ # allow this exception since it comes from ZMQ monitor termination
+ pass
+ else:
+ logger.error("ZMQ monitor thrown an exception. Received exception: {ex}".format(ex = e))
+ raise
+
+ def join (self, timeout = None):
+ self.stoprequest.set()
+ logger.debug("Handling termination of ZMQ monitor thread")
+ self.socket.close()
+ self.context.term()
+ logger.info("ZMQ monitor resources has been freed.")
+ super(ZmqMonitorSession, self).join(timeout)
+
+ def parse_and_update_zmq_dump (self, zmq_dump):
+ try:
+ dict_obj = self.decoder.decode(zmq_dump)
+ except ValueError:
+ logger.error("ZMQ dump failed JSON-RPC decode. Ignoring. Bad dump was: {dump}".format(dump = zmq_dump))
+ dict_obj = None
+
+ # add to trex_obj zmq latest dump, based on its 'name' header
+ if dict_obj is not None and dict_obj!={}:
+ self.trexObj.zmq_dump[dict_obj['name']] = dict_obj
+ if self.first_dump:
+ # change TRexStatus from starting to Running once the first ZMQ dump is obtained and parsed successfully
+ self.first_dump = False
+ self.trexObj.set_status(TRexStatus.Running)
+ self.trexObj.set_verbose_status("T-Rex is Running")
+ logger.info("First ZMQ dump received and successfully parsed. TRex running state changed to 'Running'.")
+
+
+if __name__ == "__main__":
+ pass
+
|