summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane')
-rw-r--r--scripts/automation/trex_control_plane/client/trex_async_client.py82
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py143
-rwxr-xr-xscripts/automation/trex_control_plane/client_utils/jsonrpc_client.py4
-rwxr-xr-xscripts/automation/trex_control_plane/common/trex_streams.py3
-rwxr-xr-xscripts/automation/trex_control_plane/console/trex_console.py66
5 files changed, 202 insertions, 96 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py
index 6793a4ca..e38c6ca7 100644
--- a/scripts/automation/trex_control_plane/client/trex_async_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_async_client.py
@@ -159,39 +159,94 @@ class CTRexAsyncClient():
self.stats = TrexAsyncStatsManager()
+ self.connected = False
+
+ # connects the async channel
+ def connect (self):
+
+ if self.connected:
+ self.disconnect()
self.tr = "tcp://{0}:{1}".format(self.server, self.port)
- print "\nConnecting To ZMQ Publisher At {0}".format(self.tr)
+ print "\nConnecting To ZMQ Publisher On {0}".format(self.tr)
+ # Socket to talk to server
+ self.context = zmq.Context()
+ self.socket = self.context.socket(zmq.SUB)
+
+
+ # before running the thread - mark as active
self.active = True
- self.t = threading.Thread(target = self.run)
+ self.alive = False
+ self.t = threading.Thread(target = self._run)
# kill this thread on exit and don't add it to the join list
self.t.setDaemon(True)
self.t.start()
+ self.connected = True
- def run (self):
+ # wait for data streaming from the server
+ timeout = time.time() + 5
+ while not self.alive:
+ time.sleep(0.01)
+ if time.time() > timeout:
+ self.disconnect()
+ return False, "*** [subscriber] - no data flow from server at : " + self.tr
- # Socket to talk to server
- self.context = zmq.Context()
- self.socket = self.context.socket(zmq.SUB)
+ return True, ""
+
+
+ # disconnect
+ def disconnect (self):
+ if not self.connected:
+ return
+
+ # signal that the context was destroyed (exit the thread loop)
+ self.context.term()
+
+ # mark for join and join
+ self.active = False
+ self.t.join()
+
+ # done
+ self.connected = False
+
+ # thread function
+ def _run (self):
+
+ # no data yet...
+ self.alive = False
+ # socket must be created on the same thread
self.socket.connect(self.tr)
self.socket.setsockopt(zmq.SUBSCRIBE, '')
- self.socket.setsockopt(zmq.RCVTIMEO, 3000)
+ self.socket.setsockopt(zmq.RCVTIMEO, 5000)
while self.active:
try:
line = self.socket.recv_string();
- self.stateless_client.on_async_alive()
+ if not self.alive:
+ self.stateless_client.on_async_alive()
+ self.alive = True
+
+ # got a timeout - mark as not alive and retry
except zmq.Again:
- self.stateless_client.on_async_dead()
+
+ if self.alive:
+ self.stateless_client.on_async_dead()
+ self.alive = False
+
continue
+ except zmq.ContextTerminated:
+ # outside thread signaled us to exit
+ self.alive = False
+ break
+
msg = json.loads(line)
name = msg['name']
@@ -201,6 +256,10 @@ class CTRexAsyncClient():
self.__dispatch(name, type, data)
+
+ # closing of socket must be from the same thread
+ self.socket.close(linger = 0)
+
def get_stats (self):
return self.stats
@@ -220,8 +279,3 @@ class CTRexAsyncClient():
else:
pass
-
- def stop (self):
- self.active = False
- self.t.join()
-
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index 4436be75..3a9d74a9 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -414,6 +414,17 @@ class Port(object):
return self.ok()
+
+ def validate (self):
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc, data = self.transmit("validate", params)
+ if not rc:
+ return self.err(data)
+
+ return self.ok()
+
################# events handler ######################
def async_event_port_stopped (self):
self.state = self.STATE_STREAMS
@@ -434,14 +445,13 @@ class CTRexStatelessClient(object):
self._server_version = None
self.__err_log = None
- self._async_client = CTRexAsyncClient(server, async_port, self)
+ self.async_client = CTRexAsyncClient(server, async_port, self)
self.streams_db = CStreamsDB()
- self.connected = False
-
self.events = []
+ self.connected = False
################# events handler ######################
def add_event_log (self, msg, ev_type, show = False):
@@ -504,7 +514,8 @@ class CTRexStatelessClient(object):
self.ports[port_id].async_event_port_stopped()
def async_event_server_stopped (self):
- self.disconnect()
+ self.connected = False
+
def get_events (self):
return self.events
@@ -519,6 +530,11 @@ class CTRexStatelessClient(object):
def wrap(*args):
time1 = time.time()
ret = f(*args)
+
+ # don't want to print on error
+ if ret.bad():
+ return ret
+
delta = time.time() - time1
for unit in ['sec','ms','usec']:
@@ -566,13 +582,19 @@ class CTRexStatelessClient(object):
# connection sequence
def connect(self):
+ # clear this flag
self.connected = False
- # connect
+ # connect sync channel
rc, data = self.comm_link.connect()
if not rc:
return RC_ERR(data)
+ # connect async channel
+ rc, data = self.async_client.connect()
+ if not rc:
+ return RC_ERR(data)
+
# version
rc, data = self.transmit("get_version")
if not rc:
@@ -618,24 +640,19 @@ class CTRexStatelessClient(object):
def disconnect(self):
- self.connected = False
self.comm_link.disconnect()
+ self.async_client.disconnect()
return RC_OK()
def on_async_dead (self):
- if self.is_connected():
+ if self.connected:
msg = 'lost connection to server'
self.add_event_log(msg, 'local', True)
-
- self.disconnect()
+ self.connected = False
def on_async_alive (self):
- if not self.is_connected():
- msg = 'server connection restored'
- self.add_event_log(msg, 'local', True)
-
- self.cmd_connect()
+ pass
########### cached queries (no server traffic) ###########
@@ -659,7 +676,7 @@ class CTRexStatelessClient(object):
return port_ids
def get_stats_async (self):
- return self._async_client.get_stats()
+ return self.async_client.get_stats()
def get_connection_port (self):
return self.comm_link.port
@@ -842,6 +859,17 @@ class CTRexStatelessClient(object):
return rc
+ def validate (self, port_id_list = None):
+ port_id_list = self.__ports(port_id_list)
+
+ rc = RC()
+
+ for port_id in port_id_list:
+ rc.add(self.ports[port_id].validate())
+
+ return rc
+
+
def get_port_stats(self, port_id=None):
pass
@@ -962,19 +990,6 @@ class CTRexStatelessClient(object):
return RC_OK()
- def cmd_pause_line (self, line):
- '''Pause active traffic in specified ports on TRex\n'''
- parser = parsing_opts.gen_parser(self,
- "pause",
- self.cmd_stop_line.__doc__,
- parsing_opts.PORT_LIST_WITH_ALL)
-
- opts = parser.parse_args(line.split())
- if opts is None:
- return RC_ERR("bad command line paramters")
-
- return self.cmd_pause(opts.ports)
-
# resume cmd
def cmd_resume (self, port_id_list):
@@ -995,19 +1010,6 @@ class CTRexStatelessClient(object):
return RC_OK()
- def cmd_resume_line (self, line):
- '''Resume active traffic in specified ports on TRex\n'''
- parser = parsing_opts.gen_parser(self,
- "resume",
- self.cmd_stop_line.__doc__,
- parsing_opts.PORT_LIST_WITH_ALL)
-
- opts = parser.parse_args(line.split())
- if opts is None:
- return RC_ERR("bad command line paramters")
-
- return self.cmd_resume(opts.ports)
-
# start cmd
def cmd_start (self, port_id_list, stream_list, mult, force, duration):
@@ -1043,6 +1045,15 @@ class CTRexStatelessClient(object):
return RC_OK()
+
+ def cmd_validate (self, port_id_list):
+ rc = self.validate(port_id_list)
+ rc.annotate("Validating streams on port(s) {0}:".format(port_id_list))
+ if rc.bad():
+ return rc
+
+ return RC_OK()
+
############## High Level API With Parser ################
@timing
def cmd_start_line (self, line):
@@ -1126,7 +1137,53 @@ class CTRexStatelessClient(object):
def cmd_reset_line (self, line):
return self.cmd_reset()
-
+
+ @timing
+ def cmd_pause_line (self, line):
+ '''Pause active traffic in specified ports on TRex\n'''
+ parser = parsing_opts.gen_parser(self,
+ "pause",
+ self.cmd_stop_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
+
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ return RC_ERR("bad command line paramters")
+
+ return self.cmd_pause(opts.ports)
+
+
+ @timing
+ def cmd_resume_line (self, line):
+ '''Resume active traffic in specified ports on TRex\n'''
+ parser = parsing_opts.gen_parser(self,
+ "resume",
+ self.cmd_stop_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
+
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ return RC_ERR("bad command line paramters")
+
+ return self.cmd_resume(opts.ports)
+
+
+ @timing
+ def cmd_validate_line (self, line):
+ '''validates port(s) stream configuration\n'''
+
+ parser = parsing_opts.gen_parser(self,
+ "validate",
+ self.cmd_validate_line.__doc__,
+ parsing_opts.PORT_LIST_WITH_ALL)
+
+ opts = parser.parse_args(line.split())
+ if opts is None:
+ return RC_ERR("bad command line paramters")
+
+ return self.cmd_validate(opts.ports)
+
+
def cmd_exit_line (self, line):
print format_text("Exiting\n", 'bold')
# a way to exit
@@ -1262,5 +1319,3 @@ class CTRexStatelessClient(object):
if __name__ == "__main__":
pass
-
-
diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
index a5789c46..90d7f8e8 100755
--- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
+++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
@@ -133,7 +133,7 @@ class JsonRpcClient(object):
tries += 1
if tries > 10:
self.disconnect()
- return CmdResponse(False, "Failed to send message to server")
+ return CmdResponse(False, "*** [RPC] - Failed to send message to server")
tries = 0
@@ -145,7 +145,7 @@ class JsonRpcClient(object):
tries += 1
if tries > 10:
self.disconnect()
- return CmdResponse(False, "Failed to get server response")
+ return CmdResponse(False, "*** [RPC] - Failed to get server response")
self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n")
diff --git a/scripts/automation/trex_control_plane/common/trex_streams.py b/scripts/automation/trex_control_plane/common/trex_streams.py
index c2823445..89de7286 100755
--- a/scripts/automation/trex_control_plane/common/trex_streams.py
+++ b/scripts/automation/trex_control_plane/common/trex_streams.py
@@ -14,7 +14,7 @@ StreamPack = namedtuple('StreamPack', ['stream_id', 'stream'])
class CStreamList(object):
def __init__(self):
- self.streams_list = {}
+ self.streams_list = OrderedDict()
self.yaml_loader = CTRexYAMLLoader(os.path.join(os.path.dirname(os.path.realpath(__file__)),
"rpc_defaults.yaml"))
@@ -82,6 +82,7 @@ class CStreamList(object):
stream_ids = {}
for idx, stream_name in enumerate(self.streams_list):
stream_ids[stream_name] = idx
+
# next, iterate over the streams and transform them from working with names to ids.
# with that build a new dict with old stream_name as the key, and StreamPack as the stored value
compiled_streams = {}
diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py
index be8fb70e..9d855f98 100755
--- a/scripts/automation/trex_control_plane/console/trex_console.py
+++ b/scripts/automation/trex_control_plane/console/trex_console.py
@@ -35,7 +35,7 @@ from common.text_opts import *
from client_utils.general_utils import user_input, get_current_user
import trex_status
import parsing_opts
-
+from functools import wraps
__version__ = "1.1"
@@ -128,6 +128,18 @@ class TRexConsole(TRexGeneralCmd):
################### internal section ########################
+ def verify_connected(f):
+ @wraps(f)
+ def wrap(*args):
+ inst = args[0]
+ if not inst.stateless_client.is_connected():
+ print format_text("\nNot connected to server\n", 'bold')
+ return
+
+ ret = f(*args)
+ return ret
+
+ return wrap
def get_console_identifier(self):
return "{context}_{server}".format(context=self.__class__.__name__,
@@ -141,6 +153,7 @@ class TRexConsole(TRexGeneralCmd):
self.__dict__[name] = getattr(self.trex_console, name)
def postcmd(self, stop, line):
+
if self.stateless_client.is_connected():
self.prompt = "TRex > "
else:
@@ -207,13 +220,9 @@ class TRexConsole(TRexGeneralCmd):
####################### shell commands #######################
+ @verify_connected
def do_ping (self, line):
'''Ping the server\n'''
-
- if not self.stateless_client.is_connected():
- print format_text("\nNot connected to server\n", 'bold')
- return
-
rc = self.stateless_client.cmd_ping()
if rc.bad():
return
@@ -303,13 +312,10 @@ class TRexConsole(TRexGeneralCmd):
if (l > 2) and (s[l - 2] in file_flags):
return TRexConsole.tree_autocomplete(s[l - 1])
+ @verify_connected
def do_start(self, line):
'''Start selected traffic in specified port(s) on TRex\n'''
- if not self.stateless_client.is_connected():
- print format_text("\nNot connected to server\n", 'bold')
- return
-
self.stateless_client.cmd_start_line(line)
@@ -317,64 +323,57 @@ class TRexConsole(TRexGeneralCmd):
self.do_start("-h")
############# stop
+ @verify_connected
def do_stop(self, line):
'''stops port(s) transmitting traffic\n'''
- if not self.stateless_client.is_connected():
- print format_text("\nNot connected to server\n", 'bold')
- return
-
self.stateless_client.cmd_stop_line(line)
def help_stop(self):
self.do_stop("-h")
############# update
+ @verify_connected
def do_update(self, line):
'''update speed of port(s)currently transmitting traffic\n'''
- if not self.stateless_client.is_connected():
- print format_text("\nNot connected to server\n", 'bold')
- return
-
self.stateless_client.cmd_update_line(line)
def help_update (self):
self.do_update("-h")
############# pause
+ @verify_connected
def do_pause(self, line):
'''pause port(s) transmitting traffic\n'''
- if not self.stateless_client.is_connected():
- print format_text("\nNot connected to server\n", 'bold')
- return
-
self.stateless_client.cmd_pause_line(line)
############# resume
+ @verify_connected
def do_resume(self, line):
'''resume port(s) transmitting traffic\n'''
- if not self.stateless_client.is_connected():
- print format_text("\nNot connected to server\n", 'bold')
- return
-
self.stateless_client.cmd_resume_line(line)
########## reset
+ @verify_connected
def do_reset (self, line):
'''force stop all ports\n'''
- if not self.stateless_client.is_connected():
- print format_text("\nNot connected to server\n", 'bold')
- return
-
self.stateless_client.cmd_reset_line(line)
-
+
+ ######### validate
+ @verify_connected
+ def do_validate (self, line):
+ '''validates port(s) stream configuration\n'''
+
+ self.stateless_client.cmd_validate_line(line)
+
+
def help_events (self):
self.do_events("-h")
@@ -404,13 +403,10 @@ class TRexConsole(TRexGeneralCmd):
print format_text("\n\nEvent log was cleared\n\n")
# tui
+ @verify_connected
def do_tui (self, line):
'''Shows a graphical console\n'''
- if not self.stateless_client.is_connected():
- print format_text("\nNot connected to server\n", 'bold')
- return
-
self.do_verbose('off')
trex_status.show_trex_status(self.stateless_client)