From 89b608ae766705950efc5f4914b01b9a32b6a0e7 Mon Sep 17 00:00:00 2001 From: Yaroslav Brustinov Date: Fri, 13 May 2016 20:11:51 +0300 Subject: add master daemon --- .../trex_control_plane/server/trex_server.py | 137 +++++++++++++++------ 1 file changed, 98 insertions(+), 39 deletions(-) (limited to 'scripts/automation/trex_control_plane/server/trex_server.py') diff --git a/scripts/automation/trex_control_plane/server/trex_server.py b/scripts/automation/trex_control_plane/server/trex_server.py index e32fc9d1..3dcb3e97 100755 --- a/scripts/automation/trex_control_plane/server/trex_server.py +++ b/scripts/automation/trex_control_plane/server/trex_server.py @@ -27,6 +27,8 @@ from zmq_monitor_thread import ZmqMonitorSession from argparse import ArgumentParser, RawTextHelpFormatter from json import JSONEncoder import re +import shlex +import tempfile # setup the logger @@ -52,6 +54,8 @@ class CTRexServer(object): trex_zmq_port : int the port number on which trex's zmq module will interact with daemon server default value: 4500 + nice: int + priority of the TRex process Instantiate a TRex client object, and connecting it to listening daemon-server """ @@ -76,7 +80,6 @@ class CTRexServer(object): raise Exception(err) def add(self, x, y): - print "server function add ",x,y logger.info("Processing add function. Parameters are: {0}, {1} ".format( x, y )) return x + y # return Fault(-10, "") @@ -123,38 +126,49 @@ class CTRexServer(object): raise # set further functionality and peripherals to server instance + self.server.register_function(self.add) + self.server.register_function(self.cancel_reservation) + self.server.register_function(self.connectivity_check) + self.server.register_function(self.force_trex_kill) + self.server.register_function(self.get_file) + self.server.register_function(self.get_files_list) + self.server.register_function(self.get_files_path) + self.server.register_function(self.get_running_info) + self.server.register_function(self.get_running_status) + self.server.register_function(self.get_trex_cmds) + self.server.register_function(self.get_trex_daemon_log) + self.server.register_function(self.get_trex_log) + self.server.register_function(self.get_trex_version) + self.server.register_function(self.is_reserved) + self.server.register_function(self.is_running) + self.server.register_function(self.kill_all_trexes) + self.server.register_function(self.push_file) + self.server.register_function(self.reserve_trex) + self.server.register_function(self.start_trex) + self.server.register_function(self.stop_trex) + self.server.register_function(self.wait_until_kickoff_finish) + signal.signal(signal.SIGTSTP, self.stop_handler) + signal.signal(signal.SIGTERM, self.stop_handler) try: - self.server.register_function(self.add) - self.server.register_function(self.cancel_reservation) - self.server.register_function(self.connectivity_check) - self.server.register_function(self.force_trex_kill) - self.server.register_function(self.get_file) - self.server.register_function(self.get_files_list) - self.server.register_function(self.get_files_path) - self.server.register_function(self.get_running_info) - self.server.register_function(self.get_running_status) - self.server.register_function(self.get_trex_daemon_log) - self.server.register_function(self.get_trex_log) - self.server.register_function(self.get_trex_version) - self.server.register_function(self.is_reserved) - self.server.register_function(self.is_running) - self.server.register_function(self.push_file) - self.server.register_function(self.reserve_trex) - self.server.register_function(self.start_trex) - self.server.register_function(self.stop_trex) - self.server.register_function(self.wait_until_kickoff_finish) - signal.signal(signal.SIGTSTP, self.stop_handler) - signal.signal(signal.SIGTERM, self.stop_handler) self.zmq_monitor.start() self.server.serve_forever() except KeyboardInterrupt: logger.info("Daemon shutdown request detected." ) - except Exception as e: - logger.error(e) finally: self.zmq_monitor.join() # close ZMQ monitor thread resources self.server.shutdown() - pass + #self.server.server_close() + + def _run_command(self, command, timeout = 15, cwd = None): + if timeout: + command = 'timeout %s %s' % (timeout, command) + # pipes might stuck, even with timeout + with tempfile.TemporaryFile() as stdout_file, tempfile.TemporaryFile() as stderr_file: + proc = subprocess.Popen(shlex.split(command), stdout=stdout_file, stderr=stderr_file, cwd = cwd) + proc.wait() + stdout_file.seek(0) + stderr_file.seek(0) + return (proc.returncode, stdout_file.read().decode(errors = 'replace'), stderr_file.read().decode(errors = 'replace')) # get files from Trex server and return their content (mainly for logs) @staticmethod @@ -215,8 +229,7 @@ class CTRexServer(object): try: logger.info("Processing get_trex_version() command.") if not self.trex_version: - help_print = subprocess.Popen(['./t-rex-64', '--help'], cwd = self.TREX_PATH, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - (stdout, stderr) = help_print.communicate() + ret_code, stdout, stderr = self._run_command('./t-rex-64 --help', cwd = self.TREX_PATH, timeout = 0) search_result = re.search('\n\s*(Version\s*:.+)', stdout, re.DOTALL) if not search_result: raise Exception('Could not determine version from ./t-rex-64 --help') @@ -226,7 +239,7 @@ class CTRexServer(object): else: return binascii.a2b_base64(self.trex_version) except Exception as e: - err_str = "Can't get trex version, error: {0}".format(e) + err_str = "Can't get trex version, error: %s" % e logger.error(err_str) return Fault(-33, err_str) @@ -301,7 +314,7 @@ class CTRexServer(object): return False - def start_trex(self, trex_cmd_options, user, block_to_success = True, timeout = 40): + def start_trex(self, trex_cmd_options, user, block_to_success = True, timeout = 40, stateless = False): with self.start_lock: logger.info("Processing start_trex() command.") if self.is_reserved(): @@ -313,8 +326,8 @@ class CTRexServer(object): logger.info("TRex is already taken, cannot create another run until done.") return Fault(-13, '') # raise at client TRexInUseError - try: - server_cmd_data = self.generate_run_cmd(**trex_cmd_options) + try: + server_cmd_data = self.generate_run_cmd(stateless = stateless, **trex_cmd_options) self.zmq_monitor.first_dump = True self.trex.start_trex(self.TREX_PATH, server_cmd_data) logger.info("TRex session has been successfully initiated.") @@ -342,7 +355,7 @@ class CTRexServer(object): except TypeError as e: logger.error("TRex command generation failed, probably because either -f (traffic generation .yaml file) and -c (num of cores) was not specified correctly.\nReceived params: {params}".format( params = trex_cmd_options) ) - raise TypeError('TRex -f (traffic generation .yaml file) and -c (num of cores) must be specified.') + raise TypeError('TRex -f (traffic generation .yaml file) and -c (num of cores) must be specified. %s' % e) def stop_trex(self, seq): @@ -362,6 +375,41 @@ class CTRexServer(object): logger.info("Processing force_trex_kill() command. --> Killing TRex session indiscriminately.") return self.trex.stop_trex() + # returns list of tuples (pid, command line) of running TRex(es) + def get_trex_cmds(self): + logger.info('Processing get_trex_cmds() command.') + ret_code, stdout, stderr = self._run_command('ps -u root --format pid,comm,cmd') + if ret_code: + raise Exception('Failed to determine running processes, stderr: %s' % stderr) + trex_cmds_list = [] + for line in stdout.splitlines(): + pid, proc_name, full_cmd = line.strip().split(' ', 2) + pid = pid.strip() + full_cmd = full_cmd.strip() + if proc_name.find('t-rex-64') >= 0: + trex_cmds_list.append((pid, full_cmd)) + return trex_cmds_list + + + def kill_all_trexes(self): + logger.info('Processing kill_all_trexes() command.') + trex_cmds_list = self.get_trex_cmds() + if not trex_cmds_list: + return False + for pid, cmd in trex_cmds_list: + logger.info('Killing process %s %s' % (pid, cmd)) + self._run_command('kill %s' % pid) + ret_code_ps, _, _ = self._run_command('ps -p %s' % pid) + if not ret_code_ps: + logger.info('Killing with -9.') + self._run_command('kill -9 %s' % pid) + ret_code_ps, _, _ = self._run_command('ps -p %s' % pid) + if not ret_code_ps: + logger.info('Could not kill process.') + raise Exception('Could not kill process %s %s' % (pid, cmd)) + return True + + def wait_until_kickoff_finish (self, timeout = 40): # block until TRex exits Starting state logger.info("Processing wait_until_kickoff_finish() command.") @@ -377,14 +425,19 @@ class CTRexServer(object): logger.info("Processing get_running_info() command.") return self.trex.get_running_info() - def generate_run_cmd (self, f, d, iom = 0, export_path="/tmp/trex.txt", **kwargs): - """ generate_run_cmd(self, trex_cmd_options, export_path) -> str + + def generate_run_cmd (self, iom = 0, export_path="/tmp/trex.txt", stateless = False, **kwargs): + """ generate_run_cmd(self, iom, export_path, kwargs) -> str Generates a custom running command for the kick-off of the TRex traffic generator. Returns a tuple of command (string) and export path (string) to be issued on the trex server Parameters ---------- + iom: int + 0 = don't print stats screen to log, 1 = print stats (can generate huge logs) + stateless: boolean + True = run as stateless, False = require -f and -d arguments kwargs: dictionary Dictionary of parameters for trex. For example: (c=1, nc=True, l_pkt_mode=3). Notice that when sending command line parameters that has -, you need to replace it with _. @@ -396,7 +449,8 @@ class CTRexServer(object): if 'results_file_path' in kwargs: export_path = kwargs['results_file_path'] del kwargs['results_file_path'] - + if stateless: + kwargs['i'] = True # adding additional options to the command trex_cmd_options = '' @@ -408,18 +462,23 @@ class CTRexServer(object): else: trex_cmd_options += (dash + '{k} {val}'.format( k = tmp_key, val = value )) - cmd = "{nice}{run_command} -f {gen_file} -d {duration} --iom {io} {cmd_options} --no-key > {export}".format( # -- iom 0 disables the periodic log to the screen (not needed) + if not stateless: + if 'f' not in kwargs: + raise Exception('Argument -f should be specified in stateful command') + if 'd' not in kwargs: + raise Exception('Argument -d should be specified in stateful command') + + cmd = "{nice}{run_command} --iom {io} {cmd_options} --no-key > {export}".format( # -- iom 0 disables the periodic log to the screen (not needed) nice = '' if self.trex_nice == 0 else 'nice -n %s ' % self.trex_nice, run_command = self.TREX_START_CMD, - gen_file = f, - duration = d, cmd_options = trex_cmd_options, io = iom, export = export_path ) logger.info("TREX FULL COMMAND: {command}".format(command = cmd) ) - return (cmd, export_path, long(d)) + return (cmd, export_path, kwargs.get('d', 0)) + def __check_trex_path_validity(self): # check for executable existance -- cgit 1.2.3-korg