summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/client/trex_async_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane/client/trex_async_client.py')
-rw-r--r--scripts/automation/trex_control_plane/client/trex_async_client.py40
1 files changed, 29 insertions, 11 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py
index 8fdf7c9b..00304886 100644
--- a/scripts/automation/trex_control_plane/client/trex_async_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_async_client.py
@@ -152,16 +152,19 @@ class CTRexAsyncStatsManager():
class CTRexAsyncClient():
- def __init__ (self, server, port, stateless_client):
+ def __init__ (self, server, port, stateless_client, prn_func = None):
self.port = port
self.server = server
self.stateless_client = stateless_client
+ self.prn_func = prn_func
self.raw_snapshot = {}
self.stats = CTRexAsyncStatsManager()
+ self.last_data_recv_ts = 0
+
self.connected = False
# connects the async channel
@@ -171,7 +174,13 @@ class CTRexAsyncClient():
self.disconnect()
self.tr = "tcp://{0}:{1}".format(self.server, self.port)
- print "\nConnecting To ZMQ Publisher On {0}".format(self.tr)
+
+ msg = "\nConnecting To ZMQ Publisher On {0}".format(self.tr)
+
+ if self.prn_func:
+ self.prn_func(msg)
+ else:
+ print msg
# Socket to talk to server
self.context = zmq.Context()
@@ -180,7 +189,6 @@ class CTRexAsyncClient():
# before running the thread - mark as active
self.active = True
- self.alive = False
self.t = threading.Thread(target = self._run)
# kill this thread on exit and don't add it to the join list
@@ -192,7 +200,7 @@ class CTRexAsyncClient():
# wait for data streaming from the server
timeout = time.time() + 5
- while not self.alive:
+ while not self.is_alive():
time.sleep(0.01)
if time.time() > timeout:
self.disconnect()
@@ -219,35 +227,38 @@ class CTRexAsyncClient():
# thread function
def _run (self):
- # no data yet...
- self.alive = False
# socket must be created on the same thread
self.socket.connect(self.tr)
self.socket.setsockopt(zmq.SUBSCRIBE, '')
self.socket.setsockopt(zmq.RCVTIMEO, 5000)
+ got_data = False
+
while self.active:
try:
line = self.socket.recv_string()
+ self.last_data_recv_ts = time.time()
- if not self.alive:
+ # signal once
+ if not got_data:
self.stateless_client.on_async_alive()
- self.alive = True
+ got_data = True
+
# got a timeout - mark as not alive and retry
except zmq.Again:
- if self.alive:
+ # signal once
+ if got_data:
self.stateless_client.on_async_dead()
- self.alive = False
+ got_data = False
continue
except zmq.ContextTerminated:
# outside thread signaled us to exit
- self.alive = False
break
msg = json.loads(line)
@@ -264,6 +275,13 @@ class CTRexAsyncClient():
self.socket.close(linger = 0)
+ # did we get info for the last 3 seconds ?
+ def is_alive (self):
+ if self.last_data_recv_ts == None:
+ return False
+
+ return ( (time.time() - self.last_data_recv_ts) < 3 )
+
def get_stats (self):
return self.stats