summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/client/trex_async_client.py
diff options
context:
space:
mode:
authorDan Klein <danklein10@gmail.com>2015-12-20 00:07:44 +0200
committerDan Klein <danklein10@gmail.com>2015-12-20 00:07:44 +0200
commit4ca24cf31919870a684fe78f17c856e0d220e6d5 (patch)
treef40ab95e52adca3ac713d61eb9fa3fd0d136e4ea /scripts/automation/trex_control_plane/client/trex_async_client.py
parent1895d21485621c3428d045fa0f5b9daf165c8260 (diff)
parent5cef472bcdc6c0b7e20e5cc42485ed5570c10f8c (diff)
Merge branch 'master' into dan_stateless
Diffstat (limited to 'scripts/automation/trex_control_plane/client/trex_async_client.py')
-rw-r--r--scripts/automation/trex_control_plane/client/trex_async_client.py116
1 files changed, 99 insertions, 17 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py
index 8b274134..459d6915 100644
--- a/scripts/automation/trex_control_plane/client/trex_async_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_async_client.py
@@ -19,6 +19,7 @@ import re
from common.trex_stats import *
from common.trex_streams import *
+from common.trex_types import *
# basic async stats class
class CTRexAsyncStats(object):
@@ -152,38 +153,115 @@ class CTRexAsyncStatsManager():
class CTRexAsyncClient():
- def __init__ (self, server, port, stateless_client):
+ def __init__ (self, server, port, stateless_client, prn_func = None):
self.port = port
self.server = server
self.stateless_client = stateless_client
+ self.prn_func = prn_func
self.raw_snapshot = {}
self.stats = CTRexAsyncStatsManager()
+ self.last_data_recv_ts = 0
+
+ self.connected = False
+
+ # connects the async channel
+ def connect (self):
+
+ if self.connected:
+ self.disconnect()
self.tr = "tcp://{0}:{1}".format(self.server, self.port)
- print "\nConnecting To ZMQ Publisher At {0}".format(self.tr)
+ msg = "\nConnecting To ZMQ Publisher On {0}".format(self.tr)
+
+ if self.prn_func:
+ self.prn_func(msg)
+ else:
+ print msg
+
+ # Socket to talk to server
+ self.context = zmq.Context()
+ self.socket = self.context.socket(zmq.SUB)
+
+
+ # before running the thread - mark as active
self.active = True
- self.t = threading.Thread(target= self.run)
+ self.t = threading.Thread(target = self._run)
# kill this thread on exit and don't add it to the join list
self.t.setDaemon(True)
self.t.start()
- def run(self):
+ self.connected = True
- # Socket to talk to server
- self.context = zmq.Context()
- self.socket = self.context.socket(zmq.SUB)
+ # wait for data streaming from the server
+ timeout = time.time() + 5
+ while not self.is_alive():
+ time.sleep(0.01)
+ if time.time() > timeout:
+ self.disconnect()
+ return RC_ERR("*** [subscriber] - no data flow from server at : " + self.tr)
+
+ return RC_OK()
+
+
+ # disconnect
+ def disconnect (self):
+ if not self.connected:
+ return
+
+ # signal that the context was destroyed (exit the thread loop)
+ self.context.term()
+
+ # mark for join and join
+ self.active = False
+ self.t.join()
+
+ # done
+ self.connected = False
+
+ # thread function
+ def _run (self):
+
+
+ # socket must be created on the same thread
self.socket.connect(self.tr)
self.socket.setsockopt(zmq.SUBSCRIBE, '')
+ self.socket.setsockopt(zmq.RCVTIMEO, 5000)
+
+ got_data = False
while self.active:
- line = self.socket.recv_string()
+ try:
+
+ line = self.socket.recv_string()
+ self.last_data_recv_ts = time.time()
+
+ # signal once
+ if not got_data:
+ self.stateless_client.on_async_alive()
+ got_data = True
+
+
+ # got a timeout - mark as not alive and retry
+ except zmq.Again:
+
+ # signal once
+ if got_data:
+ self.stateless_client.on_async_dead()
+ got_data = False
+
+ continue
+
+ except zmq.ContextTerminated:
+ # outside thread signaled us to exit
+ break
+
msg = json.loads(line)
name = msg['name']
@@ -193,7 +271,19 @@ class CTRexAsyncClient():
self.__dispatch(name, type, data)
- def get_stats(self):
+
+ # closing of socket must be from the same thread
+ self.socket.close(linger = 0)
+
+
+ # did we get info for the last 3 seconds ?
+ def is_alive (self):
+ if self.last_data_recv_ts == None:
+ return False
+
+ return ( (time.time() - self.last_data_recv_ts) < 3 )
+
+ def get_stats (self):
return self.stats
def get_raw_snapshot (self):
@@ -203,7 +293,6 @@ class CTRexAsyncClient():
def __dispatch (self, name, type, data):
# stats
if name == "trex-global":
- # self.stats.update(data)
self.stateless_client.handle_async_stats_update(data)
# events
elif name == "trex-event":
@@ -212,10 +301,3 @@ class CTRexAsyncClient():
pass
- def stop (self):
- self.active = False
- self.t.join()
-
-
-if __name__ == "__main__":
- pass \ No newline at end of file