diff options
author | 2015-12-20 00:07:44 +0200 | |
---|---|---|
committer | 2015-12-20 00:07:44 +0200 | |
commit | 4ca24cf31919870a684fe78f17c856e0d220e6d5 (patch) | |
tree | f40ab95e52adca3ac713d61eb9fa3fd0d136e4ea /scripts/automation/trex_control_plane/client/trex_async_client.py | |
parent | 1895d21485621c3428d045fa0f5b9daf165c8260 (diff) | |
parent | 5cef472bcdc6c0b7e20e5cc42485ed5570c10f8c (diff) |
Merge branch 'master' into dan_stateless
Diffstat (limited to 'scripts/automation/trex_control_plane/client/trex_async_client.py')
-rw-r--r-- | scripts/automation/trex_control_plane/client/trex_async_client.py | 116 |
1 files changed, 99 insertions, 17 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 8b274134..459d6915 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -19,6 +19,7 @@ import re from common.trex_stats import * from common.trex_streams import * +from common.trex_types import * # basic async stats class class CTRexAsyncStats(object): @@ -152,38 +153,115 @@ class CTRexAsyncStatsManager(): class CTRexAsyncClient(): - def __init__ (self, server, port, stateless_client): + def __init__ (self, server, port, stateless_client, prn_func = None): self.port = port self.server = server self.stateless_client = stateless_client + self.prn_func = prn_func self.raw_snapshot = {} self.stats = CTRexAsyncStatsManager() + self.last_data_recv_ts = 0 + + self.connected = False + + # connects the async channel + def connect (self): + + if self.connected: + self.disconnect() self.tr = "tcp://{0}:{1}".format(self.server, self.port) - print "\nConnecting To ZMQ Publisher At {0}".format(self.tr) + msg = "\nConnecting To ZMQ Publisher On {0}".format(self.tr) + + if self.prn_func: + self.prn_func(msg) + else: + print msg + + # Socket to talk to server + self.context = zmq.Context() + self.socket = self.context.socket(zmq.SUB) + + + # before running the thread - mark as active self.active = True - self.t = threading.Thread(target= self.run) + self.t = threading.Thread(target = self._run) # kill this thread on exit and don't add it to the join list self.t.setDaemon(True) self.t.start() - def run(self): + self.connected = True - # Socket to talk to server - self.context = zmq.Context() - self.socket = self.context.socket(zmq.SUB) + # wait for data streaming from the server + timeout = time.time() + 5 + while not self.is_alive(): + time.sleep(0.01) + if time.time() > timeout: + self.disconnect() + return RC_ERR("*** [subscriber] - no data flow from server at : " + self.tr) + + return RC_OK() + + + # disconnect + def disconnect (self): + if not self.connected: + return + + # signal that the context was destroyed (exit the thread loop) + self.context.term() + + # mark for join and join + self.active = False + self.t.join() + + # done + self.connected = False + + # thread function + def _run (self): + + + # socket must be created on the same thread self.socket.connect(self.tr) self.socket.setsockopt(zmq.SUBSCRIBE, '') + self.socket.setsockopt(zmq.RCVTIMEO, 5000) + + got_data = False while self.active: - line = self.socket.recv_string() + try: + + line = self.socket.recv_string() + self.last_data_recv_ts = time.time() + + # signal once + if not got_data: + self.stateless_client.on_async_alive() + got_data = True + + + # got a timeout - mark as not alive and retry + except zmq.Again: + + # signal once + if got_data: + self.stateless_client.on_async_dead() + got_data = False + + continue + + except zmq.ContextTerminated: + # outside thread signaled us to exit + break + msg = json.loads(line) name = msg['name'] @@ -193,7 +271,19 @@ class CTRexAsyncClient(): self.__dispatch(name, type, data) - def get_stats(self): + + # closing of socket must be from the same thread + self.socket.close(linger = 0) + + + # did we get info for the last 3 seconds ? + def is_alive (self): + if self.last_data_recv_ts == None: + return False + + return ( (time.time() - self.last_data_recv_ts) < 3 ) + + def get_stats (self): return self.stats def get_raw_snapshot (self): @@ -203,7 +293,6 @@ class CTRexAsyncClient(): def __dispatch (self, name, type, data): # stats if name == "trex-global": - # self.stats.update(data) self.stateless_client.handle_async_stats_update(data) # events elif name == "trex-event": @@ -212,10 +301,3 @@ class CTRexAsyncClient(): pass - def stop (self): - self.active = False - self.t.join() - - -if __name__ == "__main__": - pass
\ No newline at end of file |