summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/server
diff options
context:
space:
mode:
authorHanoh Haim <hhaim@cisco.com>2015-06-24 14:03:29 +0300
committerHanoh Haim <hhaim@cisco.com>2015-06-24 14:03:29 +0300
commit8b52a31ed2c299b759f330c4f976b9c70f5765f4 (patch)
tree9d6da5438b5b56b1d2d57e6c13494b4e65d000e7 /scripts/automation/trex_control_plane/server
first version
Diffstat (limited to 'scripts/automation/trex_control_plane/server')
-rwxr-xr-xscripts/automation/trex_control_plane/server/CCustomLogger.py100
-rwxr-xr-xscripts/automation/trex_control_plane/server/extended_daemon_runner.py144
-rwxr-xr-xscripts/automation/trex_control_plane/server/outer_packages.py66
-rwxr-xr-xscripts/automation/trex_control_plane/server/trex_daemon_server25
-rwxr-xr-xscripts/automation/trex_control_plane/server/trex_daemon_server.py87
-rwxr-xr-xscripts/automation/trex_control_plane/server/trex_launch_thread.py92
-rwxr-xr-xscripts/automation/trex_control_plane/server/trex_server.py465
-rwxr-xr-xscripts/automation/trex_control_plane/server/zmq_monitor_thread.py80
8 files changed, 1059 insertions, 0 deletions
diff --git a/scripts/automation/trex_control_plane/server/CCustomLogger.py b/scripts/automation/trex_control_plane/server/CCustomLogger.py
new file mode 100755
index 00000000..ecf7d519
--- /dev/null
+++ b/scripts/automation/trex_control_plane/server/CCustomLogger.py
@@ -0,0 +1,100 @@
+
+import sys
+import os
+import logging
+
+
+def setup_custom_logger(name, log_path = None):
+ # first make sure path availabe
+# if log_path is None:
+# log_path = os.getcwd()+'/trex_log.log'
+# else:
+# directory = os.path.dirname(log_path)
+# if not os.path.exists(directory):
+# os.makedirs(directory)
+ logging.basicConfig(level = logging.INFO,
+ format = '%(asctime)s %(name)-10s %(module)-20s %(levelname)-8s %(message)s',
+ datefmt = '%m-%d %H:%M')
+# filename= log_path,
+# filemode= 'w')
+#
+# # define a Handler which writes INFO messages or higher to the sys.stderr
+# consoleLogger = logging.StreamHandler()
+# consoleLogger.setLevel(logging.ERROR)
+# # set a format which is simpler for console use
+# formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
+# # tell the handler to use this format
+# consoleLogger.setFormatter(formatter)
+#
+# # add the handler to the logger
+# logging.getLogger(name).addHandler(consoleLogger)
+
+def setup_daemon_logger (name, log_path = None):
+ # first make sure path availabe
+ logging.basicConfig(level = logging.INFO,
+ format = '%(asctime)s %(name)-10s %(module)-20s %(levelname)-8s %(message)s',
+ datefmt = '%m-%d %H:%M',
+ filename= log_path,
+ filemode= 'w')
+
+class CustomLogger(object):
+
+ def __init__(self, log_filename):
+ # Store the original stdout and stderr
+ sys.stdout.flush()
+ sys.stderr.flush()
+
+ self.stdout_fd = os.dup(sys.stdout.fileno())
+ self.devnull = os.open('/dev/null', os.O_WRONLY)
+ self.log_file = open(log_filename, 'w')
+ self.silenced = False
+ self.pending_log_file_prints = 0
+
+ # silence all prints from stdout
+ def silence(self):
+ os.dup2(self.devnull, sys.stdout.fileno())
+ self.silenced = True
+
+ # restore stdout status
+ def restore(self):
+ sys.stdout.flush()
+ sys.stderr.flush()
+ # Restore normal stdout
+ os.dup2(self.stdout_fd, sys.stdout.fileno())
+ self.silenced = False
+
+ #print a message to the log (both stdout / log file)
+ def log(self, text, force = False, newline = True):
+ self.log_file.write((text + "\n") if newline else text)
+ self.pending_log_file_prints += 1
+
+ if (self.pending_log_file_prints >= 10):
+ self.log_file.flush()
+ self.pending_log_file_prints = 0
+
+ self.console(text, force, newline)
+
+ # print a message to the console alone
+ def console(self, text, force = False, newline = True):
+ _text = (text + "\n") if newline else text
+ # if we are silenced and not forced - go home
+ if self.silenced and not force:
+ return
+
+ if self.silenced:
+ os.write(self.stdout_fd, _text)
+ else:
+ sys.stdout.write(_text)
+
+ sys.stdout.flush()
+
+ # flush
+ def flush(self):
+ sys.stdout.flush()
+ self.log_file.flush()
+
+ def __exit__(self, type, value, traceback):
+ sys.stdout.flush()
+ self.log_file.flush()
+ os.close(self.devnull)
+ os.close(self.log_file)
diff --git a/scripts/automation/trex_control_plane/server/extended_daemon_runner.py b/scripts/automation/trex_control_plane/server/extended_daemon_runner.py
new file mode 100755
index 00000000..07eedd9f
--- /dev/null
+++ b/scripts/automation/trex_control_plane/server/extended_daemon_runner.py
@@ -0,0 +1,144 @@
+#!/usr/bin/python
+
+import outer_packages
+import lockfile
+from daemon import runner,daemon
+from daemon.runner import *
+import os, sys
+from argparse import ArgumentParser
+from trex_server import trex_parser
+try:
+ from python_lib.termstyle import termstyle
+except ImportError:
+ import termstyle
+
+
+
+def daemonize_parser (parser_obj, action_funcs, help_menu):
+ """Update the regular process parser to deal with daemon process options"""
+ parser_obj.description += " (as a daemon process)"
+ parser_obj.usage = None
+ parser_obj.add_argument("action", choices = action_funcs,
+ action="store", help = help_menu )
+ return
+
+
+class ExtendedDaemonRunner(runner.DaemonRunner):
+ """ Controller for a callable running in a separate background process.
+
+ The first command-line argument is the action to take:
+
+ * 'start': Become a daemon and call `app.run()`.
+ * 'stop': Exit the daemon process specified in the PID file.
+ * 'restart': Stop, then start.
+
+ """
+
+ help_menu = """Specify action command to be applied on server.
+ (*) start : start the application in as a daemon process.
+ (*) show : prompt a updated status of daemon process (running/ not running).
+ (*) stop : exit the daemon process.
+ (*) restart : stop, then start again the application as daemon process
+ (*) start-live : start the application in live mode (no daemon process).
+ """
+
+ def __init__ (self, app, parser_obj):
+ """ Set up the parameters of a new runner.
+ THIS METHOD INTENTIONALLY DO NOT INVOKE SUPER __init__() METHOD
+
+ :param app: The application instance; see below.
+ :return: ``None``.
+
+ The `app` argument must have the following attributes:
+
+ * `stdin_path`, `stdout_path`, `stderr_path`: Filesystem paths
+ to open and replace the existing `sys.stdin`, `sys.stdout`,
+ `sys.stderr`.
+
+ * `pidfile_path`: Absolute filesystem path to a file that will
+ be used as the PID file for the daemon. If ``None``, no PID
+ file will be used.
+
+ * `pidfile_timeout`: Used as the default acquisition timeout
+ value supplied to the runner's PID lock file.
+
+ * `run`: Callable that will be invoked when the daemon is
+ started.
+
+ """
+ super(runner.DaemonRunner, self).__init__()
+ # update action_funcs to support more operations
+ self.update_action_funcs()
+
+ daemonize_parser(parser_obj, self.action_funcs, ExtendedDaemonRunner.help_menu)
+ args = parser_obj.parse_args()
+ self.action = unicode(args.action)
+
+ self.app = app
+ self.daemon_context = daemon.DaemonContext()
+ self.daemon_context.stdin = open(app.stdin_path, 'rt')
+ self.daemon_context.stdout = open(app.stdout_path, 'w+t')
+ self.daemon_context.stderr = open(
+ app.stderr_path, 'a+t', buffering=0)
+
+ self.pidfile = None
+ if app.pidfile_path is not None:
+ self.pidfile = make_pidlockfile(app.pidfile_path, app.pidfile_timeout)
+ self.daemon_context.pidfile = self.pidfile
+
+ # mask out all arguments that aren't relevant to main app script
+
+
+ def update_action_funcs (self):
+ self.action_funcs.update({u'start-live': self._start_live, u'show': self._show}) # add key (=action), value (=desired func)
+
+ @staticmethod
+ def _start_live (self):
+ self.app.run()
+
+ @staticmethod
+ def _show (self):
+ if self.pidfile.is_locked():
+ print termstyle.red("T-Rex server daemon is running")
+ else:
+ print termstyle.red("T-Rex server daemon is NOT running")
+
+ def do_action (self):
+ self.__prevent_duplicate_runs()
+ self.__prompt_init_msg()
+ try:
+ super(ExtendedDaemonRunner, self).do_action()
+ if self.action == 'stop':
+ self.__verify_termination()
+ except runner.DaemonRunnerStopFailureError:
+ if self.action == 'restart':
+ # error means server wasn't running in the first place- so start it!
+ self.action = 'start'
+ self.do_action()
+
+
+ def __prevent_duplicate_runs (self):
+ if self.action == 'start' and self.pidfile.is_locked():
+ print termstyle.green("Server daemon is already running")
+ exit(1)
+ elif self.action == 'stop' and not self.pidfile.is_locked():
+ print termstyle.green("Server daemon is not running")
+ exit(1)
+
+ def __prompt_init_msg (self):
+ if self.action == 'start':
+ print termstyle.green("Starting daemon server...")
+ elif self.action == 'stop':
+ print termstyle.green("Stopping daemon server...")
+
+ def __verify_termination (self):
+ pass
+# import time
+# while self.pidfile.is_locked():
+# time.sleep(2)
+# self._stop()
+#
+
+
+if __name__ == "__main__":
+ pass
diff --git a/scripts/automation/trex_control_plane/server/outer_packages.py b/scripts/automation/trex_control_plane/server/outer_packages.py
new file mode 100755
index 00000000..ab25ea68
--- /dev/null
+++ b/scripts/automation/trex_control_plane/server/outer_packages.py
@@ -0,0 +1,66 @@
+#!/router/bin/python
+
+import sys,site
+import platform,os
+import tarfile
+import errno
+import pwd
+
+CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
+ROOT_PATH = os.path.abspath(os.path.join(CURRENT_PATH, os.pardir)) # path to trex_control_plane directory
+PATH_TO_PYTHON_LIB = os.path.abspath(os.path.join(ROOT_PATH, 'python_lib'))
+
+SERVER_MODULES = ['enum34-1.0.4',
+ # 'jsonrpclib-0.1.3',
+ 'jsonrpclib-pelix-0.2.5',
+ 'zmq',
+ 'python-daemon-2.0.5',
+ 'lockfile-0.10.2',
+ 'termstyle'
+ ]
+
+def extract_zmq_package ():
+ """make sure zmq package is available"""
+
+ os.chdir(PATH_TO_PYTHON_LIB)
+ if not os.path.exists('zmq'):
+ if os.path.exists('zmq_fedora.tar.gz'): # make sure tar file is available for extraction
+ try:
+ tar = tarfile.open("zmq_fedora.tar.gz")
+ # finally, extract the tarfile locally
+ tar.extractall()
+ except OSError as err:
+ if err.errno == errno.EACCES:
+ # fall back. try extracting using currently logged in user
+ stat_info = os.stat(PATH_TO_PYTHON_LIB)
+ uid = stat_info.st_uid
+ logged_user = pwd.getpwuid(uid).pw_name
+ if logged_user != 'root':
+ try:
+ os.system("sudo -u {user} tar -zxvf zmq_fedora.tar.gz".format(user = logged_user))
+ except:
+ raise OSError(13, 'Permission denied: Please make sure that logged user have sudo access and writing privileges to `python_lib` directory.')
+ else:
+ raise OSError(13, 'Permission denied: Please make sure that logged user have sudo access and writing privileges to `python_lib` directory.')
+ finally:
+ tar.close()
+ else:
+ raise IOError("File 'zmq_fedora.tar.gz' couldn't be located at python_lib directory.")
+ os.chdir(CURRENT_PATH)
+
+def import_server_modules ():
+ # must be in a higher priority
+ sys.path.insert(0, PATH_TO_PYTHON_LIB)
+ sys.path.append(ROOT_PATH)
+ extract_zmq_package()
+ import_module_list(SERVER_MODULES)
+
+def import_module_list (modules_list):
+ assert(isinstance(modules_list, list))
+ for p in modules_list:
+ full_path = os.path.join(PATH_TO_PYTHON_LIB, p)
+ fix_path = os.path.normcase(full_path)
+ site.addsitedir(full_path)
+
+
+import_server_modules()
diff --git a/scripts/automation/trex_control_plane/server/trex_daemon_server b/scripts/automation/trex_control_plane/server/trex_daemon_server
new file mode 100755
index 00000000..3494e303
--- /dev/null
+++ b/scripts/automation/trex_control_plane/server/trex_daemon_server
@@ -0,0 +1,25 @@
+#!/usr/bin/python
+
+import os
+import sys
+
+core = 0
+
+if '--core' in sys.argv:
+ try:
+ idx = sys.argv.index('--core')
+ core = int(sys.argv[idx + 1])
+ if core > 31 or core < 0:
+ print "Error: please provide core argument between 0 to 31"
+ exit(-1)
+ del sys.argv[idx:idx+2]
+ except IndexError:
+ print "Error: please make sure core option provided with argument"
+ exit(-1)
+ except ValueError:
+ print "Error: please make sure core option provided with integer argument"
+ exit(-1)
+
+str_argv = ' '.join(sys.argv[1:])
+cmd = "taskset -c {core} python automation/trex_control_plane/server/trex_daemon_server.py {argv}".format(core = core, argv = str_argv)
+os.system(cmd)
diff --git a/scripts/automation/trex_control_plane/server/trex_daemon_server.py b/scripts/automation/trex_control_plane/server/trex_daemon_server.py
new file mode 100755
index 00000000..5032423a
--- /dev/null
+++ b/scripts/automation/trex_control_plane/server/trex_daemon_server.py
@@ -0,0 +1,87 @@
+#!/usr/bin/python
+
+import outer_packages
+import daemon
+from trex_server import do_main_program, trex_parser
+import CCustomLogger
+
+import logging
+import time
+import sys
+import os, errno
+import grp
+import signal
+from daemon import runner
+from extended_daemon_runner import ExtendedDaemonRunner
+import lockfile
+import errno
+
+class TRexServerApp(object):
+ def __init__(self):
+ TRexServerApp.create_working_dirs()
+ self.stdin_path = '/dev/null'
+ self.stdout_path = '/dev/tty' # All standard prints will come up from this source.
+ self.stderr_path = "/var/log/trex/trex_daemon_server.log" # All log messages will come up from this source
+ self.pidfile_path = '/var/run/trex/trex_daemon_server.pid'
+ self.pidfile_timeout = 5 # timeout in seconds
+
+ def run(self):
+ do_main_program()
+
+
+ @staticmethod
+ def create_working_dirs():
+ if not os.path.exists('/var/log/trex'):
+ os.mkdir('/var/log/trex')
+ if not os.path.exists('/var/run/trex'):
+ os.mkdir('/var/run/trex')
+
+
+
+def main ():
+
+ trex_app = TRexServerApp()
+
+ # setup the logger
+ default_log_path = '/var/log/trex/trex_daemon_server.log'
+
+ try:
+ CCustomLogger.setup_daemon_logger('TRexServer', default_log_path)
+ logger = logging.getLogger('TRexServer')
+ logger.setLevel(logging.INFO)
+ formatter = logging.Formatter("%(asctime)s %(name)-10s %(module)-20s %(levelname)-8s %(message)s")
+ handler = logging.FileHandler("/var/log/trex/trex_daemon_server.log")
+ logger.addHandler(handler)
+ except EnvironmentError, e:
+ if e.errno == errno.EACCES: # catching permission denied error
+ print "Launching user must have sudo privileges in order to run T-Rex daemon.\nTerminating daemon process."
+ exit(-1)
+
+ try:
+ daemon_runner = ExtendedDaemonRunner(trex_app, trex_parser)
+ except IOError as err:
+ # catch 'tty' error when launching server from remote location
+ if err.errno == errno.ENXIO:
+ trex_app.stdout_path = "/dev/null"
+ daemon_runner = ExtendedDaemonRunner(trex_app, trex_parser)
+ else:
+ raise
+
+ #This ensures that the logger file handle does not get closed during daemonization
+ daemon_runner.daemon_context.files_preserve=[handler.stream]
+
+ try:
+ if not set(['start', 'stop']).isdisjoint(set(sys.argv)):
+ print "Logs are saved at: {log_path}".format( log_path = default_log_path )
+ daemon_runner.do_action()
+
+ except lockfile.LockTimeout as inst:
+ logger.error(inst)
+ print inst
+ print """
+ Please try again once the timeout has been reached.
+ If this error continues, consider killing the process manually and restart the daemon."""
+
+
+if __name__ == "__main__":
+ main()
diff --git a/scripts/automation/trex_control_plane/server/trex_launch_thread.py b/scripts/automation/trex_control_plane/server/trex_launch_thread.py
new file mode 100755
index 00000000..b4be60a9
--- /dev/null
+++ b/scripts/automation/trex_control_plane/server/trex_launch_thread.py
@@ -0,0 +1,92 @@
+#!/router/bin/python
+
+
+import os
+import signal
+import socket
+from common.trex_status_e import TRexStatus
+import subprocess
+import time
+import threading
+import logging
+import CCustomLogger
+
+# setup the logger
+CCustomLogger.setup_custom_logger('TRexServer')
+logger = logging.getLogger('TRexServer')
+
+
+class AsynchronousTRexSession(threading.Thread):
+ def __init__(self, trexObj , trex_launch_path, trex_cmd_data):
+ super(AsynchronousTRexSession, self).__init__()
+ self.stoprequest = threading.Event()
+ self.terminateFlag = False
+ self.launch_path = trex_launch_path
+ self.cmd, self.export_path, self.duration = trex_cmd_data
+ self.session = None
+ self.trexObj = trexObj
+ self.time_stamps = {'start' : None, 'run_time' : None}
+ self.trexObj.zmq_dump = {}
+
+ def run (self):
+
+ with open(os.devnull, 'w') as DEVNULL:
+ self.time_stamps['start'] = self.time_stamps['run_time'] = time.time()
+ self.session = subprocess.Popen("exec "+self.cmd, cwd = self.launch_path, shell=True, stdin = DEVNULL, stderr = subprocess.PIPE, preexec_fn=os.setsid)
+ logger.info("T-Rex session initialized successfully, Parent process pid is {pid}.".format( pid = self.session.pid ))
+ while self.session.poll() is None: # subprocess is NOT finished
+ time.sleep(0.5)
+ if self.stoprequest.is_set():
+ logger.debug("Abort request received by handling thread. Terminating T-Rex session." )
+ os.killpg(self.session.pid, signal.SIGUSR1)
+ self.trexObj.set_status(TRexStatus.Idle)
+ self.trexObj.set_verbose_status("T-Rex is Idle")
+ break
+
+ self.time_stamps['run_time'] = time.time() - self.time_stamps['start']
+
+ try:
+ if self.time_stamps['run_time'] < 5:
+ logger.error("T-Rex run failed due to wrong input parameters, or due to reachability issues.")
+ self.trexObj.set_verbose_status("T-Rex run failed due to wrong input parameters, or due to reachability issues.\n\nT-Rex command: {cmd}\n\nRun output:\n{output}".format(
+ cmd = self.cmd, output = self.load_trex_output(self.export_path)))
+ self.trexObj.errcode = -11
+ elif (self.session.returncode is not None and self.session.returncode < 0) or ( (self.time_stamps['run_time'] < self.duration) and (not self.stoprequest.is_set()) ):
+ if (self.session.returncode is not None and self.session.returncode < 0):
+ logger.debug("Failed T-Rex run due to session return code ({ret_code})".format( ret_code = self.session.returncode ) )
+ elif ( (self.time_stamps['run_time'] < self.duration) and not self.stoprequest.is_set()):
+ logger.debug("Failed T-Rex run due to running time ({runtime}) combined with no-stopping request.".format( runtime = self.time_stamps['run_time'] ) )
+
+ logger.warning("T-Rex run was terminated unexpectedly by outer process or by the hosting OS")
+ self.trexObj.set_verbose_status("T-Rex run was terminated unexpectedly by outer process or by the hosting OS.\n\nRun output:\n{output}".format(
+ output = self.load_trex_output(self.export_path)))
+ self.trexObj.errcode = -15
+ else:
+ logger.info("T-Rex run session finished.")
+ self.trexObj.set_verbose_status('T-Rex finished.')
+ self.trexObj.errcode = None
+
+ finally:
+ self.trexObj.set_status(TRexStatus.Idle)
+ logger.info("TRex running state changed to 'Idle'.")
+ self.trexObj.expect_trex.clear()
+ logger.debug("Finished handling a single run of T-Rex.")
+ self.trexObj.zmq_dump = None
+
+ def join (self, timeout = None):
+ self.stoprequest.set()
+ super(AsynchronousTRexSession, self).join(timeout)
+
+ def load_trex_output (self, export_path):
+ output = None
+ with open(export_path, 'r') as f:
+ output = f.read()
+ return output
+
+
+
+
+
+if __name__ == "__main__":
+ pass
+
diff --git a/scripts/automation/trex_control_plane/server/trex_server.py b/scripts/automation/trex_control_plane/server/trex_server.py
new file mode 100755
index 00000000..992a1d5f
--- /dev/null
+++ b/scripts/automation/trex_control_plane/server/trex_server.py
@@ -0,0 +1,465 @@
+#!/usr/bin/python
+
+
+import os
+import stat
+import sys
+import time
+import outer_packages
+import zmq
+from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer
+import jsonrpclib
+from jsonrpclib import Fault
+import binascii
+import socket
+import errno
+import signal
+import binascii
+from common.trex_status_e import TRexStatus
+from common.trex_exceptions import *
+import subprocess
+from random import randrange
+#import shlex
+import logging
+import threading
+import CCustomLogger
+from trex_launch_thread import AsynchronousTRexSession
+from zmq_monitor_thread import ZmqMonitorSession
+from argparse import ArgumentParser, RawTextHelpFormatter
+from json import JSONEncoder
+
+
+# setup the logger
+CCustomLogger.setup_custom_logger('TRexServer')
+logger = logging.getLogger('TRexServer')
+
+class CTRexServer(object):
+ """This class defines the server side of the RESTfull interaction with T-Rex"""
+ DEFAULT_TREX_PATH = '/auto/proj-pcube-b/apps/PL-b/tools/bp_sim2/v1.55/' #'/auto/proj-pcube-b/apps/PL-b/tools/nightly/trex_latest'
+ TREX_START_CMD = './t-rex-64'
+ DEFAULT_FILE_PATH = '/tmp/trex_files/'
+
+ def __init__(self, trex_path, trex_files_path, trex_host = socket.gethostname(), trex_daemon_port = 8090, trex_zmq_port = 4500):
+ """
+ Parameters
+ ----------
+ trex_host : str
+ a string of the t-rex ip address or hostname.
+ default value: machine hostname as fetched from socket.gethostname()
+ trex_daemon_port : int
+ the port number on which the trex-daemon server can be reached
+ default value: 8090
+ trex_zmq_port : int
+ the port number on which trex's zmq module will interact with daemon server
+ default value: 4500
+
+ Instatiate a T-Rex client object, and connecting it to listening daemon-server
+ """
+ self.TREX_PATH = os.path.abspath(os.path.dirname(trex_path+'/'))
+ self.trex_files_path = os.path.abspath(os.path.dirname(trex_files_path+'/'))
+ self.__check_trex_path_validity()
+ self.__check_files_path_validity()
+ self.trex = CTRex()
+ self.trex_host = trex_host
+ self.trex_daemon_port = trex_daemon_port
+ self.trex_zmq_port = trex_zmq_port
+ self.trex_server_path = "http://{hostname}:{port}".format( hostname = trex_host, port = trex_daemon_port )
+ self.start_lock = threading.Lock()
+ self.__reservation = None
+ self.zmq_monitor = ZmqMonitorSession(self.trex, self.trex_zmq_port) # intiate single ZMQ monitor thread for server usage
+
+ def add(self, x, y):
+ print "server function add ",x,y
+ logger.info("Processing add function. Parameters are: {0}, {1} ".format( x, y ))
+ return x + y
+ # return Fault(-10, "")
+
+ def push_file (self, filename, bin_data):
+ logger.info("Processing push_file() command.")
+ try:
+ filepath = os.path.abspath(os.path.join(self.trex_files_path, filename))
+ with open(filepath, 'wb') as f:
+ f.write(binascii.a2b_base64(bin_data))
+ logger.info("push_file() command finished. `{name}` was saved at {fpath}".format( name = filename, fpath = self.trex_files_path))
+ return True
+ except IOError as inst:
+ logger.error("push_file method failed. " + str(inst))
+ return False
+
+ def connectivity_check (self):
+ logger.info("Processing connectivity_check function.")
+ return True
+
+ def start(self):
+ """This method fires up the daemon server based on initialized parameters of the class"""
+ # initialize the server instance with given reasources
+ try:
+ print "Firing up T-Rex REST daemon @ port {trex_port} ...\n".format( trex_port = self.trex_daemon_port )
+ logger.info("Firing up T-Rex REST daemon @ port {trex_port} ...".format( trex_port = self.trex_daemon_port ))
+ logger.info("current working dir is: {0}".format(self.TREX_PATH) )
+ logger.info("current files dir is : {0}".format(self.trex_files_path) )
+ logger.debug("Starting TRex server. Registering methods to process.")
+ self.server = SimpleJSONRPCServer( (self.trex_host, self.trex_daemon_port) )
+ except socket.error as e:
+ if e.errno == errno.EADDRINUSE:
+ logger.error("T-Rex server requested address already in use. Aborting server launching.")
+ print "T-Rex server requested address already in use. Aborting server launching."
+ raise socket.error(errno.EADDRINUSE, "T-Rex daemon requested address already in use. Server launch aborted. Please make sure no other process is using the desired server properties.")
+
+ # set further functionality and peripherals to server instance
+ try:
+ self.server.register_function(self.add)
+ self.server.register_function(self.connectivity_check)
+ self.server.register_function(self.start_trex)
+ self.server.register_function(self.stop_trex)
+ self.server.register_function(self.wait_until_kickoff_finish)
+ self.server.register_function(self.get_running_status)
+ self.server.register_function(self.is_running)
+ self.server.register_function(self.get_running_info)
+ self.server.register_function(self.is_reserved)
+ self.server.register_function(self.get_files_path)
+ self.server.register_function(self.push_file)
+ self.server.register_function(self.reserve_trex)
+ self.server.register_function(self.cancel_reservation)
+ self.server.register_function(self.force_trex_kill)
+ signal.signal(signal.SIGTSTP, self.stop_handler)
+ signal.signal(signal.SIGTERM, self.stop_handler)
+ self.zmq_monitor.start()
+ self.server.serve_forever()
+ except KeyboardInterrupt:
+ logger.info("Daemon shutdown request detected." )
+ finally:
+ self.zmq_monitor.join() # close ZMQ monitor thread reasources
+ self.server.shutdown()
+ pass
+
+ def stop_handler (self, signum, frame):
+ logger.info("Daemon STOP request detected.")
+ if self.is_running():
+ # in case T-Rex process is currently running, stop it before terminating server process
+ self.stop_trex(self.trex.get_seq())
+ sys.exit(0)
+
+ def is_running (self):
+ run_status = self.trex.get_status()
+ logger.info("Processing is_running() command. Running status is: {stat}".format(stat = run_status) )
+ if run_status==TRexStatus.Running:
+ return True
+ else:
+ return False
+
+ def is_reserved (self):
+ logger.info("Processing is_reserved() command.")
+ return bool(self.__reservation)
+
+ def get_running_status (self):
+ run_status = self.trex.get_status()
+ logger.info("Processing get_running_status() command. Running status is: {stat}".format(stat = run_status) )
+ return { 'state' : run_status.value, 'verbose' : self.trex.get_verbose_status() }
+
+ def get_files_path (self):
+ logger.info("Processing get_files_path() command." )
+ return self.trex_files_path
+
+ def reserve_trex (self, user):
+ if user == "":
+ logger.info("T-Rex reservation cannot apply to empty string user. Request denied.")
+ return Fault(-33, "T-Rex reservation cannot apply to empty string user. Request denied.")
+
+ with self.start_lock:
+ logger.info("Processing reserve_trex() command.")
+ if self.is_reserved():
+ if user == self.__reservation['user']:
+ # return True is the same user is asking and already has the resrvation
+ logger.info("the same user is asking and already has the resrvation. Re-reserving T-Rex.")
+ return True
+
+ logger.info("T-Rex is already reserved to another user ({res_user}), cannot reserve to another user.".format( res_user = self.__reservation['user'] ))
+ return Fault(-33, "T-Rex is already reserved to another user ({res_user}). Please make sure T-Rex is free before reserving it.".format(
+ res_user = self.__reservation['user']) ) # raise at client TRexInUseError
+ elif self.trex.get_status() != TRexStatus.Idle:
+ logger.info("T-Rex is currently running, cannot reserve T-Rex unless in Idle state.")
+ return Fault(-13, 'T-Rex is currently running, cannot reserve T-Rex unless in Idle state. Please try again when T-Rex run finished.') # raise at client TRexInUseError
+ else:
+ logger.info("T-Rex is now reserved for user ({res_user}).".format( res_user = user ))
+ self.__reservation = {'user' : user, 'since' : time.ctime()}
+ logger.debug("Reservation details: "+ str(self.__reservation))
+ return True
+
+ def cancel_reservation (self, user):
+ with self.start_lock:
+ logger.info("Processing cancel_reservation() command.")
+ if self.is_reserved():
+ if self.__reservation['user'] == user:
+ logger.info("T-Rex reservation to {res_user} has been canceled successfully.".format(res_user = self.__reservation['user']))
+ self.__reservation = None
+ return True
+ else:
+ logger.warning("T-Rex is reserved to different user than the provided one. Reservation wasn't canceled.")
+ return Fault(-33, "Cancel reservation request is available to the user that holds the reservation. Request denied") # raise at client TRexRequestDenied
+
+ else:
+ logger.info("T-Rex is not reserved to anyone. No need to cancel anything")
+ assert(self.__reservation is None)
+ return False
+
+
+ def start_trex(self, trex_cmd_options, user, block_to_success = True, timeout = 30):
+ with self.start_lock:
+ logger.info("Processing start_trex() command.")
+ if self.is_reserved():
+ # check if this is not the user to which T-Rex is reserved
+ if self.__reservation['user'] != user:
+ logger.info("T-Rex is reserved to another user ({res_user}). Only that user is allowed to initiate new runs.".format(res_user = self.__reservation['user']))
+ return Fault(-33, "T-Rex is reserved to another user ({res_user}). Only that user is allowed to initiate new runs.".format(res_user = self.__reservation['user'])) # raise at client TRexRequestDenied
+ elif self.trex.get_status() != TRexStatus.Idle:
+ logger.info("T-Rex is already taken, cannot create another run until done.")
+ return Fault(-13, '') # raise at client TRexInUseError
+
+ try:
+ server_cmd_data = self.generate_run_cmd(**trex_cmd_options)
+ self.zmq_monitor.first_dump = True
+ self.trex.start_trex(self.TREX_PATH, server_cmd_data)
+ logger.info("T-Rex session has been successfully initiated.")
+ if block_to_success:
+ # delay server response until T-Rex is at 'Running' state.
+ start_time = time.time()
+ trex_state = None
+ while (time.time() - start_time) < timeout :
+ trex_state = self.trex.get_status()
+ if trex_state != TRexStatus.Starting:
+ break
+ else:
+ time.sleep(0.5)
+
+ # check for T-Rex run started normally
+ if trex_state == TRexStatus.Starting: # reached timeout
+ logger.warning("TimeoutError: T-Rex initiation outcome could not be obtained, since T-Rex stays at Starting state beyond defined timeout.")
+ return Fault(-12, 'TimeoutError: T-Rex initiation outcome could not be obtained, since T-Rex stays at Starting state beyond defined timeout.') # raise at client TRexWarning
+ elif trex_state == TRexStatus.Idle:
+ return Fault(-11, self.trex.get_verbose_status()) # raise at client TRexError
+
+ # reach here only if T-Rex is at 'Running' state
+ self.trex.gen_seq()
+ return self.trex.get_seq() # return unique seq number to client
+
+ except TypeError as e:
+ logger.error("T-Rex command generation failed, probably because either -f (traffic generation .yaml file) and -c (num of cores) was not specified correctly.\nReceived params: {params}".format( params = trex_cmd_options) )
+ raise TypeError('T-Rex -f (traffic generation .yaml file) and -c (num of cores) must be specified.')
+
+
+ def stop_trex(self, seq):
+ logger.info("Processing stop_trex() command.")
+ if self.trex.get_seq()== seq:
+ logger.debug("Abort request legit since seq# match")
+ return self.trex.stop_trex()
+ else:
+ if self.trex.get_status() != TRexStatus.Idle:
+ logger.warning("Abort request is only allowed to process initiated the run. Request denied.")
+
+ return Fault(-33, 'Abort request is only allowed to process initiated the run. Request denied.') # raise at client TRexRequestDenied
+ else:
+ return False
+
+ def force_trex_kill (self):
+ logger.info("Processing force_trex_kill() command. --> Killing T-Rex session indiscriminately.")
+ return self.trex.stop_trex()
+
+ def wait_until_kickoff_finish (self, timeout = 40):
+ # block until T-Rex exits Starting state
+ logger.info("Processing wait_until_kickoff_finish() command.")
+ trex_state = None
+ start_time = time.time()
+ while (time.time() - start_time) < timeout :
+ trex_state = self.trex.get_status()
+ if trex_state != TRexStatus.Starting:
+ return
+ return Fault(-12, 'TimeoutError: T-Rex initiation outcome could not be obtained, since T-Rex stays at Starting state beyond defined timeout.') # raise at client TRexWarning
+
+ def get_running_info (self):
+ logger.info("Processing get_running_info() command.")
+ return self.trex.get_running_info()
+
+ def generate_run_cmd (self, f, d, iom = 0, export_path="/tmp/trex.txt", **kwargs):
+ """ generate_run_cmd(self, trex_cmd_options, export_path) -> str
+
+ Generates a custom running command for the kick-off of the T-Rex traffic generator.
+ Returns a tuple of command (string) and export path (string) to be issued on the trex server
+
+ Parameters
+ ----------
+ trex_cmd_options : str
+ Defines the exact command to run on the t-rex
+ Example: "-c 2 -m 0.500000 -d 100 -f cap2/sfr.yaml --nc -p -l 1000"
+ export_path : str
+ a full system path to which the results of the trex-run will be logged.
+
+ """
+ if 'results_file_path' in kwargs:
+ export_path = kwargs['results_file_path']
+ del kwargs['results_file_path']
+
+
+ # adding additional options to the command
+ trex_cmd_options = ''
+ for key, value in kwargs.iteritems():
+ tmp_key = key.replace('_','-')
+ dash = ' -' if (len(key)==1) else ' --'
+ if (value == True) and (str(value) != '1'): # checking also int(value) to excape from situation that 1 translates by python to 'True'
+ trex_cmd_options += (dash + tmp_key)
+ else:
+ trex_cmd_options += (dash + '{k} {val}'.format( k = tmp_key, val = value ))
+
+ cmd = "{run_command} -f {gen_file} -d {duration} --iom {io} {cmd_options} --no-key > {export}".format( # -- iom 0 disables the periodic log to the screen (not needed)
+ run_command = self.TREX_START_CMD,
+ gen_file = f,
+ duration = d,
+ cmd_options = trex_cmd_options,
+ io = iom,
+ export = export_path )
+
+ logger.info("T-REX FULL COMMAND: {command}".format(command = cmd) )
+
+ return (cmd, export_path, long(d))
+
+ def __check_trex_path_validity(self):
+ # check for executable existance
+ if not os.path.exists(self.TREX_PATH+'/t-rex-64'):
+ print "The provided T-Rex path do not contain an executable T-Rex file.\nPlease check the path and retry."
+ logger.error("The provided T-Rex path do not contain an executable T-Rex file")
+ exit(-1)
+ # check for executable permissions
+ st = os.stat(self.TREX_PATH+'/t-rex-64')
+ if not bool(st.st_mode & (stat.S_IXUSR ) ):
+ print "The provided T-Rex path do not contain an T-Rex file with execution privileges.\nPlease check the files permissions and retry."
+ logger.error("The provided T-Rex path do not contain an T-Rex file with execution privileges")
+ exit(-1)
+ else:
+ return
+
+ def __check_files_path_validity(self):
+ # first, check for path existance. otherwise, try creating it with appropriate credentials
+ if not os.path.exists(self.trex_files_path):
+ try:
+ os.makedirs(self.trex_files_path, 0660)
+ return
+ except os.error as inst:
+ print "The provided files path does not exist and cannot be created with needed access credentials using root user.\nPlease check the path's permissions and retry."
+ logger.error("The provided files path does not exist and cannot be created with needed access credentials using root user.")
+ exit(-1)
+ elif os.access(self.trex_files_path, os.W_OK):
+ return
+ else:
+ print "The provided files path has insufficient access credentials for root user.\nPlease check the path's permissions and retry."
+ logger.error("The provided files path has insufficient access credentials for root user")
+ exit(-1)
+
+class CTRex(object):
+ def __init__(self):
+ self.status = TRexStatus.Idle
+ self.verbose_status = 'T-Rex is Idle'
+ self.errcode = None
+ self.session = None
+ self.zmq_monitor = None
+ self.zmq_dump = None
+ self.seq = None
+ self.expect_trex = threading.Event()
+ self.encoder = JSONEncoder()
+
+ def get_status(self):
+ return self.status
+
+ def set_status(self, new_status):
+ self.status = new_status
+
+ def get_verbose_status(self):
+ return self.verbose_status
+
+ def set_verbose_status(self, new_status):
+ self.verbose_status = new_status
+
+ def gen_seq (self):
+ self.seq = randrange(1,1000)
+
+ def get_seq (self):
+ return self.seq
+
+ def get_running_info (self):
+ if self.status == TRexStatus.Running:
+ return self.encoder.encode(self.zmq_dump)
+ else:
+ logger.info("T-Rex isn't running. Running information isn't available.")
+ if self.status == TRexStatus.Idle:
+ if self.errcode is not None: # some error occured
+ logger.info("T-Rex is in Idle state, with errors. returning fault")
+ return Fault(self.errcode, self.verbose_status) # raise at client relevant exception, depending on the reason the error occured
+ else:
+ logger.info("T-Rex is in Idle state, no errors. returning {}")
+ return u'{}'
+
+ return Fault(-12, self.verbose_status) # raise at client TRexWarning, indicating T-Rex is back to Idle state or still in Starting state
+
+ def stop_trex(self):
+ if self.status == TRexStatus.Idle:
+ # t-rex isn't running, nothing to abort
+ logger.info("T-Rex isn't running. No need to stop anything.")
+ if self.errcode is not None: # some error occured, notify client despite T-Rex already stopped
+ return Fault(self.errcode, self.verbose_status) # raise at client relevant exception, depending on the reason the error occured
+ return False
+ else:
+ # handle stopping t-rex's run
+ self.session.join()
+ logger.info("T-Rex session has been successfully aborted.")
+ return True
+
+ def start_trex(self, trex_launch_path, trex_cmd):
+ self.set_status(TRexStatus.Starting)
+ logger.info("TRex running state changed to 'Starting'.")
+ self.set_verbose_status('T-Rex is starting (data is not available yet)')
+
+ self.errcode = None
+ self.session = AsynchronousTRexSession(self, trex_launch_path, trex_cmd)
+ self.session.start()
+ self.expect_trex.set()
+# self.zmq_monitor= ZmqMonitorSession(self, zmq_port)
+# self.zmq_monitor.start()
+
+
+
+def generate_trex_parser ():
+ default_path = os.path.abspath(os.path.join(outer_packages.CURRENT_PATH, os.pardir, os.pardir, os.pardir))
+ default_files_path = os.path.abspath(CTRexServer.DEFAULT_FILE_PATH)
+
+ parser = ArgumentParser(description = 'Run server application for T-Rex traffic generator',
+ formatter_class = RawTextHelpFormatter,
+ usage = """
+trex_daemon_server [options]
+""" )
+
+ parser.add_argument('-v', '--version', action='version', version='%(prog)s 1.0')
+ parser.add_argument("-p", "--daemon-port", type=int, default = 8090, metavar="PORT", dest="daemon_port",
+ help="Select port on which the daemon runs.\nDefault port is 8090.", action="store")
+ parser.add_argument("-z", "--zmq-port", dest="zmq_port", type=int,
+ action="store", help="Select port on which the ZMQ module listens to T-Rex.\nDefault port is 4500.", metavar="PORT",
+ default = 4500)
+ parser.add_argument("-t", "--trex-path", dest="trex_path",
+ action="store", help="Specify the compiled T-Rex directory from which T-Rex would run.\nDefault path is: {def_path}.".format( def_path = default_path ),
+ metavar="PATH", default = default_path )
+ parser.add_argument("-f", "--files-path", dest="files_path",
+ action="store", help="Specify a path to directory on which pushed files will be saved at.\nDefault path is: {def_path}.".format( def_path = default_files_path ),
+ metavar="PATH", default = default_files_path )
+ return parser
+
+trex_parser = generate_trex_parser()
+
+def do_main_program ():
+
+ args = trex_parser.parse_args()
+
+ server = CTRexServer(trex_daemon_port = args.daemon_port, trex_zmq_port = args.zmq_port, trex_path = args.trex_path, trex_files_path = args.files_path)
+ server.start()
+
+
+if __name__ == "__main__":
+ do_main_program()
+
diff --git a/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py b/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py
new file mode 100755
index 00000000..28e154ee
--- /dev/null
+++ b/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py
@@ -0,0 +1,80 @@
+#!/router/bin/python
+
+import os
+import outer_packages
+import zmq
+import threading
+import logging
+import CCustomLogger
+from json import JSONDecoder
+from common.trex_status_e import TRexStatus
+
+# setup the logger
+CCustomLogger.setup_custom_logger('TRexServer')
+logger = logging.getLogger('TRexServer')
+
+class ZmqMonitorSession(threading.Thread):
+ def __init__(self, trexObj , zmq_port):
+ super(ZmqMonitorSession, self).__init__()
+ self.stoprequest = threading.Event()
+# self.terminateFlag = False
+ self.first_dump = True
+ self.zmq_port = zmq_port
+ self.zmq_publisher = "tcp://localhost:{port}".format( port = self.zmq_port )
+# self.context = zmq.Context()
+# self.socket = self.context.socket(zmq.SUB)
+ self.trexObj = trexObj
+ self.expect_trex = self.trexObj.expect_trex # used to signal if T-Rex is expected to run and if data should be considered
+ self.decoder = JSONDecoder()
+ logger.info("ZMQ monitor initialization finished")
+
+ def run (self):
+ self.context = zmq.Context()
+ self.socket = self.context.socket(zmq.SUB)
+ logger.info("ZMQ monitor started listening @ {pub}".format( pub = self.zmq_publisher ) )
+ self.socket.connect(self.zmq_publisher)
+ self.socket.setsockopt(zmq.SUBSCRIBE, '')
+
+ while not self.stoprequest.is_set():
+ try:
+ zmq_dump = self.socket.recv() # This call is BLOCKING until data received!
+ if self.expect_trex.is_set():
+ self.parse_and_update_zmq_dump(zmq_dump)
+ logger.debug("ZMQ dump received on socket, and saved to trexObject.")
+ except Exception as e:
+ if self.stoprequest.is_set():
+ # allow this exception since it comes from ZMQ monitor termination
+ pass
+ else:
+ logger.error("ZMQ monitor thrown an exception. Received exception: {ex}".format(ex = e))
+ raise
+
+ def join (self, timeout = None):
+ self.stoprequest.set()
+ logger.debug("Handling termination of ZMQ monitor thread")
+ self.socket.close()
+ self.context.term()
+ logger.info("ZMQ monitor resources has been freed.")
+ super(ZmqMonitorSession, self).join(timeout)
+
+ def parse_and_update_zmq_dump (self, zmq_dump):
+ try:
+ dict_obj = self.decoder.decode(zmq_dump)
+ except ValueError:
+ logger.error("ZMQ dump failed JSON-RPC decode. Ignoring. Bad dump was: {dump}".format(dump = zmq_dump))
+ dict_obj = None
+
+ # add to trex_obj zmq latest dump, based on its 'name' header
+ if dict_obj is not None and dict_obj!={}:
+ self.trexObj.zmq_dump[dict_obj['name']] = dict_obj
+ if self.first_dump:
+ # change TRexStatus from starting to Running once the first ZMQ dump is obtained and parsed successfully
+ self.first_dump = False
+ self.trexObj.set_status(TRexStatus.Running)
+ self.trexObj.set_verbose_status("T-Rex is Running")
+ logger.info("First ZMQ dump received and successfully parsed. TRex running state changed to 'Running'.")
+
+
+if __name__ == "__main__":
+ pass
+