summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/server
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane/server')
-rwxr-xr-xscripts/automation/trex_control_plane/server/CCustomLogger.py11
-rwxr-xr-xscripts/automation/trex_control_plane/server/singleton_daemon.py95
-rwxr-xr-xscripts/automation/trex_control_plane/server/trex_launch_thread.py89
-rwxr-xr-xscripts/automation/trex_control_plane/server/trex_server.py52
4 files changed, 125 insertions, 122 deletions
diff --git a/scripts/automation/trex_control_plane/server/CCustomLogger.py b/scripts/automation/trex_control_plane/server/CCustomLogger.py
index a8823cea..6d3974a6 100755
--- a/scripts/automation/trex_control_plane/server/CCustomLogger.py
+++ b/scripts/automation/trex_control_plane/server/CCustomLogger.py
@@ -3,15 +3,13 @@ import sys
import os
import logging
+def prepare_dir(log_path):
+ log_dir = os.path.dirname(log_path)
+ if not os.path.exists(log_dir):
+ os.makedirs(log_dir)
def setup_custom_logger(name, log_path = None):
# first make sure path availabe
-# if log_path is None:
-# log_path = os.getcwd()+'/trex_log.log'
-# else:
-# directory = os.path.dirname(log_path)
-# if not os.path.exists(directory):
-# os.makedirs(directory)
logging.basicConfig(level = logging.INFO,
format = '%(asctime)s %(name)-10s %(module)-20s %(levelname)-8s %(message)s',
datefmt = '%m-%d %H:%M')
@@ -31,6 +29,7 @@ def setup_custom_logger(name, log_path = None):
def setup_daemon_logger (name, log_path = None):
# first make sure path availabe
+ prepare_dir(log_path)
try:
os.unlink(log_path)
except:
diff --git a/scripts/automation/trex_control_plane/server/singleton_daemon.py b/scripts/automation/trex_control_plane/server/singleton_daemon.py
index 0a3b9c09..1784cc42 100755
--- a/scripts/automation/trex_control_plane/server/singleton_daemon.py
+++ b/scripts/automation/trex_control_plane/server/singleton_daemon.py
@@ -2,14 +2,17 @@ import errno
import os
import shlex
import socket
+import signal
import tempfile
import types
from subprocess import Popen
from time import sleep
+import outer_packages
+import jsonrpclib
# uses Unix sockets for determine running process.
# (assumes used daemons will register proper socket)
-# all daemons should use -p argument as listening tcp port
+# all daemons should use -p argument as listening tcp port and check_connectivity RPC method
class SingletonDaemon(object):
# run_cmd can be function of how to run daemon or a str to run at subprocess
@@ -28,14 +31,14 @@ class SingletonDaemon(object):
# returns True if daemon is running
def is_running(self):
- lock_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
try:
- lock_socket.bind('\0' + self.tag) # the check is ~200000 faster and more reliable than checking via 'netstat' or 'ps' etc.
+ lock_socket = register_socket(self.tag) # the check is ~200000 faster and more reliable than checking via 'netstat' or 'ps' etc.
+ lock_socket.shutdown(socket.SHUT_RDWR)
lock_socket.close()
except socket.error: # Unix socket in use
return True
# Unix socket is not used, but maybe it's old version of daemon not using socket
- return bool(self.get_pid())
+ return bool(self.get_pid_by_listening_port())
# get pid of running daemon by registered Unix socket (most robust way)
@@ -71,59 +74,70 @@ class SingletonDaemon(object):
if pid:
return pid
-
- # kill daemon
- def kill(self, timeout = 5):
- pid = self.get_pid()
- if not pid:
- return False
- ret_code, stdout, stderr = run_command('kill %s' % pid) # usual kill
- if ret_code:
- raise Exception('Failed to run kill command for %s: %s' % (self.name, [ret_code, stdout, stderr]))
+ def kill_by_signal(self, pid, signal_name, timeout):
+ os.kill(pid, signal_name)
poll_rate = 0.1
for i in range(int(timeout / poll_rate)):
if not self.is_running():
return True
sleep(poll_rate)
- ret_code, stdout, stderr = run_command('kill -9 %s' % pid) # unconditional kill
- if ret_code:
- raise Exception('Failed to run kill -9 command for %s: %s' % (self.name, [ret_code, stdout, stderr]))
- for i in range(int(timeout / poll_rate)):
- if not self.is_running():
+
+ # kill daemon, with verification
+ def kill(self, timeout = 15):
+ pid = self.get_pid()
+ if not pid:
+ raise Exception('%s is not running' % self.name)
+ # try Ctrl+C, usual kill, kill -9
+ for signal_name in [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]:
+ if self.kill_by_signal(pid, signal_name, timeout):
return True
- sleep(poll_rate)
raise Exception('Could not kill %s, even with -9' % self.name)
+ # try connection as RPC client, return True upon success, False if fail
+ def check_connectivity(self, timeout = 15):
+ daemon = jsonrpclib.Server('http://127.0.0.1:%s/' % self.port)
+ poll_rate = 0.1
+ for i in range(int(timeout/poll_rate)):
+ try:
+ daemon.check_connectivity()
+ return True
+ except socket.error: # daemon is not up yet
+ sleep(poll_rate)
+ return False
# start daemon
# returns True if success, False if already running
- def start(self, timeout = 5):
+ def start(self, timeout = 20):
if self.is_running():
raise Exception('%s is already running' % self.name)
if not self.run_cmd:
raise Exception('No starting command registered for %s' % self.name)
if type(self.run_cmd) is types.FunctionType:
self.run_cmd()
- else:
- with tempfile.TemporaryFile() as stdout_file, tempfile.TemporaryFile() as stderr_file:
- proc = Popen(shlex.split('%s -p %s' % (self.run_cmd, self.port)), cwd = self.dir, close_fds = True,
- stdout = stdout_file, stderr = stderr_file)
- if timeout > 0:
- poll_rate = 0.1
- for i in range(int(timeout/poll_rate)):
- sleep(poll_rate)
- if bool(proc.poll()): # process ended with error
- stdout_file.seek(0)
- stderr_file.seek(0)
- raise Exception('Run of %s ended unexpectfully: %s' % (self.name, [proc.returncode, stdout_file.read().decode(errors = 'replace'), stderr_file.read().decode(errors = 'replace')]))
- elif proc.poll() == 0: # process runs other process, and ended
- break
- if self.is_running():
- return True
- raise Exception('%s failed to run.' % self.name)
+ return
+ with tempfile.TemporaryFile() as stdout_file, tempfile.TemporaryFile() as stderr_file:
+ proc = Popen(shlex.split('%s -p %s' % (self.run_cmd, self.port)), cwd = self.dir, close_fds = True,
+ stdout = stdout_file, stderr = stderr_file)
+ if timeout > 0:
+ poll_rate = 0.1
+ for i in range(int(timeout/poll_rate)):
+ if self.is_running():
+ break
+ sleep(poll_rate)
+ if bool(proc.poll()): # process ended with error
+ stdout_file.seek(0)
+ stderr_file.seek(0)
+ raise Exception('Run of %s ended unexpectfully: %s' % (self.name, [proc.returncode, stdout_file.read().decode(errors = 'replace'), stderr_file.read().decode(errors = 'replace')]))
+ elif proc.poll() == 0: # process runs other process, and ended
+ break
+ if self.is_running():
+ if self.check_connectivity():
+ return True
+ raise Exception('Daemon process is running, but no connectivity')
+ raise Exception('%s failed to run.' % self.name)
# restart the daemon
- def restart(self, timeout = 5):
+ def restart(self, timeout = 15):
if self.is_running():
self.kill(timeout)
return self.start(timeout)
@@ -135,8 +149,9 @@ def register_socket(tag):
lock_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
try:
lock_socket.bind('\0%s' % tag)
+ return lock_socket
except socket.error:
- raise Exception('Error: process with tag %s is already running.' % tag)
+ raise socket.error('Error: process with tag %s is already running.' % tag)
# runs command
def run_command(command, timeout = 15, cwd = None):
@@ -152,6 +167,8 @@ def run_command(command, timeout = 15, cwd = None):
if proc.poll() is None:
proc.kill() # timeout
return (errno.ETIME, '', 'Timeout on running: %s' % command)
+ else:
+ proc.wait()
stdout_file.seek(0)
stderr_file.seek(0)
return (proc.returncode, stdout_file.read().decode(errors = 'replace'), stderr_file.read().decode(errors = 'replace'))
diff --git a/scripts/automation/trex_control_plane/server/trex_launch_thread.py b/scripts/automation/trex_control_plane/server/trex_launch_thread.py
index 74ce1750..22606753 100755
--- a/scripts/automation/trex_control_plane/server/trex_launch_thread.py
+++ b/scripts/automation/trex_control_plane/server/trex_launch_thread.py
@@ -6,6 +6,7 @@ import signal
import socket
from common.trex_status_e import TRexStatus
import subprocess
+import shlex
import time
import threading
import logging
@@ -29,49 +30,51 @@ class AsynchronousTRexSession(threading.Thread):
self.trexObj.zmq_dump = {}
def run (self):
-
- with open(os.devnull, 'w') as DEVNULL:
- self.time_stamps['start'] = self.time_stamps['run_time'] = time.time()
- self.session = subprocess.Popen("exec "+self.cmd, cwd = self.launch_path, shell=True, stdin = DEVNULL, stderr = subprocess.PIPE, preexec_fn=os.setsid)
- logger.info("TRex session initialized successfully, Parent process pid is {pid}.".format( pid = self.session.pid ))
- while self.session.poll() is None: # subprocess is NOT finished
- time.sleep(0.5)
- if self.stoprequest.is_set():
- logger.debug("Abort request received by handling thread. Terminating TRex session." )
- os.killpg(self.session.pid, signal.SIGUSR1)
- self.trexObj.set_status(TRexStatus.Idle)
- self.trexObj.set_verbose_status("TRex is Idle")
- break
-
- self.time_stamps['run_time'] = time.time() - self.time_stamps['start']
-
- try:
- if self.time_stamps['run_time'] < 5:
- logger.error("TRex run failed due to wrong input parameters, or due to readability issues.")
- self.trexObj.set_verbose_status("TRex run failed due to wrong input parameters, or due to readability issues.\n\nTRex command: {cmd}\n\nRun output:\n{output}".format(
- cmd = self.cmd, output = self.load_trex_output(self.export_path)))
- self.trexObj.errcode = -11
- elif (self.session.returncode is not None and self.session.returncode != 0) or ( (self.time_stamps['run_time'] < self.duration) and (not self.stoprequest.is_set()) ):
- if (self.session.returncode is not None and self.session.returncode != 0):
- logger.debug("Failed TRex run due to session return code ({ret_code})".format( ret_code = self.session.returncode ) )
- elif ( (self.time_stamps['run_time'] < self.duration) and not self.stoprequest.is_set()):
- logger.debug("Failed TRex run due to running time ({runtime}) combined with no-stopping request.".format( runtime = self.time_stamps['run_time'] ) )
-
- logger.warning("TRex run was terminated unexpectedly by outer process or by the hosting OS")
- self.trexObj.set_verbose_status("TRex run was terminated unexpectedly by outer process or by the hosting OS.\n\nRun output:\n{output}".format(
- output = self.load_trex_output(self.export_path)))
- self.trexObj.errcode = -15
- else:
- logger.info("TRex run session finished.")
- self.trexObj.set_verbose_status('TRex finished.')
- self.trexObj.errcode = None
-
- finally:
- self.trexObj.set_status(TRexStatus.Idle)
- logger.info("TRex running state changed to 'Idle'.")
- self.trexObj.expect_trex.clear()
- logger.debug("Finished handling a single run of TRex.")
- self.trexObj.zmq_dump = None
+ try:
+ with open(self.export_path, 'w') as output_file:
+ self.time_stamps['start'] = self.time_stamps['run_time'] = time.time()
+ self.session = subprocess.Popen(shlex.split(self.cmd), cwd = self.launch_path, stdout = output_file, preexec_fn=os.setsid, close_fds = True)
+ logger.info("TRex session initialized successfully, Parent process pid is {pid}.".format( pid = self.session.pid ))
+ while self.session.poll() is None: # subprocess is NOT finished
+ time.sleep(0.5)
+ if self.stoprequest.is_set():
+ logger.debug("Abort request received by handling thread. Terminating TRex session." )
+ os.killpg(self.session.pid, signal.SIGUSR1)
+ self.trexObj.set_status(TRexStatus.Idle)
+ self.trexObj.set_verbose_status("TRex is Idle")
+ break
+ except Exception as e:
+ logger.error(e)
+
+ self.time_stamps['run_time'] = time.time() - self.time_stamps['start']
+
+ try:
+ if self.time_stamps['run_time'] < 5:
+ logger.error("TRex run failed due to wrong input parameters, or due to readability issues.")
+ self.trexObj.set_verbose_status("TRex run failed due to wrong input parameters, or due to readability issues.\n\nTRex command: {cmd}\n\nRun output:\n{output}".format(
+ cmd = self.cmd, output = self.load_trex_output(self.export_path)))
+ self.trexObj.errcode = -11
+ elif (self.session.returncode is not None and self.session.returncode != 0) or ( (self.time_stamps['run_time'] < self.duration) and (not self.stoprequest.is_set()) ):
+ if (self.session.returncode is not None and self.session.returncode != 0):
+ logger.debug("Failed TRex run due to session return code ({ret_code})".format( ret_code = self.session.returncode ) )
+ elif ( (self.time_stamps['run_time'] < self.duration) and not self.stoprequest.is_set()):
+ logger.debug("Failed TRex run due to running time ({runtime}) combined with no-stopping request.".format( runtime = self.time_stamps['run_time'] ) )
+
+ logger.warning("TRex run was terminated unexpectedly by outer process or by the hosting OS")
+ self.trexObj.set_verbose_status("TRex run was terminated unexpectedly by outer process or by the hosting OS.\n\nRun output:\n{output}".format(
+ output = self.load_trex_output(self.export_path)))
+ self.trexObj.errcode = -15
+ else:
+ logger.info("TRex run session finished.")
+ self.trexObj.set_verbose_status('TRex finished.')
+ self.trexObj.errcode = None
+
+ finally:
+ self.trexObj.set_status(TRexStatus.Idle)
+ logger.info("TRex running state changed to 'Idle'.")
+ self.trexObj.expect_trex.clear()
+ logger.debug("Finished handling a single run of TRex.")
+ self.trexObj.zmq_dump = None
def join (self, timeout = None):
self.stoprequest.set()
diff --git a/scripts/automation/trex_control_plane/server/trex_server.py b/scripts/automation/trex_control_plane/server/trex_server.py
index 45ef9ac1..9fe7d70b 100755
--- a/scripts/automation/trex_control_plane/server/trex_server.py
+++ b/scripts/automation/trex_control_plane/server/trex_server.py
@@ -30,9 +30,9 @@ import shlex
import tempfile
try:
- from .singleton_daemon import register_socket
+ from .singleton_daemon import register_socket, run_command
except:
- from singleton_daemon import register_socket
+ from singleton_daemon import register_socket, run_command
# setup the logger
@@ -134,6 +134,7 @@ class CTRexServer(object):
self.server.register_function(self.add)
self.server.register_function(self.cancel_reservation)
self.server.register_function(self.connectivity_check)
+ self.server.register_function(self.connectivity_check, 'check_connectivity') # alias
self.server.register_function(self.force_trex_kill)
self.server.register_function(self.get_file)
self.server.register_function(self.get_files_list)
@@ -164,16 +165,6 @@ class CTRexServer(object):
self.server.shutdown()
#self.server.server_close()
- def _run_command(self, command, timeout = 15, cwd = None):
- if timeout:
- command = 'timeout %s %s' % (timeout, command)
- # pipes might stuck, even with timeout
- with tempfile.TemporaryFile() as stdout_file, tempfile.TemporaryFile() as stderr_file:
- proc = subprocess.Popen(shlex.split(command), stdout=stdout_file, stderr=stderr_file, cwd = cwd)
- proc.wait()
- stdout_file.seek(0)
- stderr_file.seek(0)
- return (proc.returncode, stdout_file.read().decode(errors = 'replace'), stderr_file.read().decode(errors = 'replace'))
# get files from Trex server and return their content (mainly for logs)
@staticmethod
@@ -234,7 +225,7 @@ class CTRexServer(object):
try:
logger.info("Processing get_trex_version() command.")
if not self.trex_version:
- ret_code, stdout, stderr = self._run_command('./t-rex-64 --help', cwd = self.TREX_PATH, timeout = 0)
+ ret_code, stdout, stderr = run_command('./t-rex-64 --help', cwd = self.TREX_PATH)
search_result = re.search('\n\s*(Version\s*:.+)', stdout, re.DOTALL)
if not search_result:
raise Exception('Could not determine version from ./t-rex-64 --help')
@@ -319,7 +310,7 @@ class CTRexServer(object):
return False
- def start_trex(self, trex_cmd_options, user, block_to_success = True, timeout = 40, stateless = False):
+ def start_trex(self, trex_cmd_options, user, block_to_success = True, timeout = 40, stateless = False, debug_image = False, trex_args = ''):
with self.start_lock:
logger.info("Processing start_trex() command.")
if self.is_reserved():
@@ -332,7 +323,7 @@ class CTRexServer(object):
return Fault(-13, '') # raise at client TRexInUseError
try:
- server_cmd_data = self.generate_run_cmd(stateless = stateless, **trex_cmd_options)
+ server_cmd_data = self.generate_run_cmd(stateless = stateless, debug_image = debug_image, trex_args = trex_args, **trex_cmd_options)
self.zmq_monitor.first_dump = True
self.trex.start_trex(self.TREX_PATH, server_cmd_data)
logger.info("TRex session has been successfully initiated.")
@@ -383,7 +374,7 @@ class CTRexServer(object):
# returns list of tuples (pid, command line) of running TRex(es)
def get_trex_cmds(self):
logger.info('Processing get_trex_cmds() command.')
- ret_code, stdout, stderr = self._run_command('ps -u root --format pid,comm,cmd')
+ ret_code, stdout, stderr = run_command('ps -u root --format pid,comm,cmd')
if ret_code:
raise Exception('Failed to determine running processes, stderr: %s' % stderr)
trex_cmds_list = []
@@ -396,23 +387,14 @@ class CTRexServer(object):
return trex_cmds_list
- def kill_all_trexes(self):
+ # Silently tries to kill TRexes with given signal.
+ # Responsibility of client to verify with get_trex_cmds.
+ def kill_all_trexes(self, signal_name):
logger.info('Processing kill_all_trexes() command.')
trex_cmds_list = self.get_trex_cmds()
- if not trex_cmds_list:
- return False
for pid, cmd in trex_cmds_list:
- logger.info('Killing process %s %s' % (pid, cmd))
- self._run_command('kill %s' % pid)
- ret_code_ps, _, _ = self._run_command('ps -p %s' % pid)
- if not ret_code_ps:
- logger.info('Killing with -9.')
- self._run_command('kill -9 %s' % pid)
- ret_code_ps, _, _ = self._run_command('ps -p %s' % pid)
- if not ret_code_ps:
- logger.info('Could not kill process.')
- raise Exception('Could not kill process %s %s' % (pid, cmd))
- return True
+ logger.info('Killing with signal %s process %s %s' % (signal_name, pid, cmd))
+ os.kill(int(pid), signal_name)
def wait_until_kickoff_finish (self, timeout = 40):
@@ -431,7 +413,7 @@ class CTRexServer(object):
return self.trex.get_running_info()
- def generate_run_cmd (self, iom = 0, export_path="/tmp/trex.txt", stateless = False, **kwargs):
+ def generate_run_cmd (self, iom = 0, export_path="/tmp/trex.txt", stateless = False, debug_image = False, trex_args = '', **kwargs):
""" generate_run_cmd(self, iom, export_path, kwargs) -> str
Generates a custom running command for the kick-off of the TRex traffic generator.
@@ -468,6 +450,8 @@ class CTRexServer(object):
continue
else:
trex_cmd_options += (dash + '{k} {val}'.format( k = tmp_key, val = value ))
+ if trex_args:
+ trex_cmd_options += ' %s' % trex_args
if not stateless:
if 'f' not in kwargs:
@@ -475,12 +459,12 @@ class CTRexServer(object):
if 'd' not in kwargs:
raise Exception('Argument -d should be specified in stateful command')
- cmd = "{nice}{run_command} --iom {io} {cmd_options} --no-key > {export}".format( # -- iom 0 disables the periodic log to the screen (not needed)
+ cmd = "{nice}{run_command}{debug_image} --iom {io} {cmd_options} --no-key".format( # -- iom 0 disables the periodic log to the screen (not needed)
nice = '' if self.trex_nice == 0 else 'nice -n %s ' % self.trex_nice,
run_command = self.TREX_START_CMD,
+ debug_image = '-debug' if debug_image else '',
cmd_options = trex_cmd_options,
- io = iom,
- export = export_path )
+ io = iom)
logger.info("TREX FULL COMMAND: {command}".format(command = cmd) )