summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/client
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane/client')
-rw-r--r--scripts/automation/trex_control_plane/client/trex_async_client.py40
-rw-r--r--scripts/automation/trex_control_plane/client/trex_port.py3
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py24
3 files changed, 50 insertions, 17 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py
index 8fdf7c9b..00304886 100644
--- a/scripts/automation/trex_control_plane/client/trex_async_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_async_client.py
@@ -152,16 +152,19 @@ class CTRexAsyncStatsManager():
class CTRexAsyncClient():
- def __init__ (self, server, port, stateless_client):
+ def __init__ (self, server, port, stateless_client, prn_func = None):
self.port = port
self.server = server
self.stateless_client = stateless_client
+ self.prn_func = prn_func
self.raw_snapshot = {}
self.stats = CTRexAsyncStatsManager()
+ self.last_data_recv_ts = 0
+
self.connected = False
# connects the async channel
@@ -171,7 +174,13 @@ class CTRexAsyncClient():
self.disconnect()
self.tr = "tcp://{0}:{1}".format(self.server, self.port)
- print "\nConnecting To ZMQ Publisher On {0}".format(self.tr)
+
+ msg = "\nConnecting To ZMQ Publisher On {0}".format(self.tr)
+
+ if self.prn_func:
+ self.prn_func(msg)
+ else:
+ print msg
# Socket to talk to server
self.context = zmq.Context()
@@ -180,7 +189,6 @@ class CTRexAsyncClient():
# before running the thread - mark as active
self.active = True
- self.alive = False
self.t = threading.Thread(target = self._run)
# kill this thread on exit and don't add it to the join list
@@ -192,7 +200,7 @@ class CTRexAsyncClient():
# wait for data streaming from the server
timeout = time.time() + 5
- while not self.alive:
+ while not self.is_alive():
time.sleep(0.01)
if time.time() > timeout:
self.disconnect()
@@ -219,35 +227,38 @@ class CTRexAsyncClient():
# thread function
def _run (self):
- # no data yet...
- self.alive = False
# socket must be created on the same thread
self.socket.connect(self.tr)
self.socket.setsockopt(zmq.SUBSCRIBE, '')
self.socket.setsockopt(zmq.RCVTIMEO, 5000)
+ got_data = False
+
while self.active:
try:
line = self.socket.recv_string()
+ self.last_data_recv_ts = time.time()
- if not self.alive:
+ # signal once
+ if not got_data:
self.stateless_client.on_async_alive()
- self.alive = True
+ got_data = True
+
# got a timeout - mark as not alive and retry
except zmq.Again:
- if self.alive:
+ # signal once
+ if got_data:
self.stateless_client.on_async_dead()
- self.alive = False
+ got_data = False
continue
except zmq.ContextTerminated:
# outside thread signaled us to exit
- self.alive = False
break
msg = json.loads(line)
@@ -264,6 +275,13 @@ class CTRexAsyncClient():
self.socket.close(linger = 0)
+ # did we get info for the last 3 seconds ?
+ def is_alive (self):
+ if self.last_data_recv_ts == None:
+ return False
+
+ return ( (time.time() - self.last_data_recv_ts) < 3 )
+
def get_stats (self):
return self.stats
diff --git a/scripts/automation/trex_control_plane/client/trex_port.py b/scripts/automation/trex_control_plane/client/trex_port.py
index 5c5702dd..4f82e86a 100644
--- a/scripts/automation/trex_control_plane/client/trex_port.py
+++ b/scripts/automation/trex_control_plane/client/trex_port.py
@@ -387,6 +387,9 @@ class Port(object):
def clear_stats(self):
return self.port_stats.clear_stats()
+ def invalidate_stats(self):
+ return self.port_stats.invalidate()
+
################# events handler ######################
def async_event_port_stopped (self):
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index a2b1f6d9..899805cf 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -55,7 +55,7 @@ class CTRexStatelessClient(object):
self.user = username
- self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual)
+ self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual, self.prn_func)
# default verbose level
self.verbose = self.VERBOSE_REGULAR
@@ -68,7 +68,7 @@ class CTRexStatelessClient(object):
self.server_version = {}
self.__err_log = None
- self.async_client = CTRexAsyncClient(server, async_port, self)
+ self.async_client = CTRexAsyncClient(server, async_port, self, self.prn_func)
self.streams_db = CStreamsDB()
self.global_stats = trex_stats.CGlobalStats(self._connection_info,
@@ -105,8 +105,8 @@ class CTRexStatelessClient(object):
st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
self.events.append("{:<10} - {:^8} - {:}".format(st, prefix, format_text(msg, 'bold')))
- if show and self.check_verbose(self.VERBOSE_REGULAR):
- print format_text("\n{:^8} - {:}".format(prefix, format_text(msg, 'bold')))
+ if show:
+ self.prn_func(format_text("\n{:^8} - {:}".format(prefix, format_text(msg, 'bold'))))
def handle_async_stats_update(self, dump_data):
@@ -473,6 +473,10 @@ class CTRexStatelessClient(object):
def get_verbose (self):
return self.verbose
+ def prn_func (self, msg, level = VERBOSE_REGULAR):
+ if self.check_verbose(level):
+ print msg
+
############# server actions ################
# ping server
@@ -754,6 +758,14 @@ class CTRexStatelessClient(object):
return RC_OK()
+ def cmd_invalidate (self, port_id_list):
+ for port_id in port_id_list:
+ self.ports[port_id].invalidate_stats()
+
+ self.global_stats.invalidate()
+
+ return RC_OK()
+
# pause cmd
def cmd_pause (self, port_id_list):
@@ -1146,13 +1158,13 @@ class CTRexStatelessClient(object):
# ------ private classes ------ #
class CCommLink(object):
"""describes the connectivity of the stateless client method"""
- def __init__(self, server="localhost", port=5050, virtual=False):
+ def __init__(self, server="localhost", port=5050, virtual=False, prn_func = None):
super(CTRexStatelessClient.CCommLink, self).__init__()
self.virtual = virtual
self.server = server
self.port = port
self.verbose = False
- self.rpc_link = JsonRpcClient(self.server, self.port)
+ self.rpc_link = JsonRpcClient(self.server, self.port, prn_func)
@property
def is_connected(self):