diff options
Diffstat (limited to 'scripts/automation/trex_control_plane/client/trex_async_client.py')
-rw-r--r-- | scripts/automation/trex_control_plane/client/trex_async_client.py | 40 |
1 files changed, 29 insertions, 11 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 8fdf7c9b..00304886 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -152,16 +152,19 @@ class CTRexAsyncStatsManager(): class CTRexAsyncClient(): - def __init__ (self, server, port, stateless_client): + def __init__ (self, server, port, stateless_client, prn_func = None): self.port = port self.server = server self.stateless_client = stateless_client + self.prn_func = prn_func self.raw_snapshot = {} self.stats = CTRexAsyncStatsManager() + self.last_data_recv_ts = 0 + self.connected = False # connects the async channel @@ -171,7 +174,13 @@ class CTRexAsyncClient(): self.disconnect() self.tr = "tcp://{0}:{1}".format(self.server, self.port) - print "\nConnecting To ZMQ Publisher On {0}".format(self.tr) + + msg = "\nConnecting To ZMQ Publisher On {0}".format(self.tr) + + if self.prn_func: + self.prn_func(msg) + else: + print msg # Socket to talk to server self.context = zmq.Context() @@ -180,7 +189,6 @@ class CTRexAsyncClient(): # before running the thread - mark as active self.active = True - self.alive = False self.t = threading.Thread(target = self._run) # kill this thread on exit and don't add it to the join list @@ -192,7 +200,7 @@ class CTRexAsyncClient(): # wait for data streaming from the server timeout = time.time() + 5 - while not self.alive: + while not self.is_alive(): time.sleep(0.01) if time.time() > timeout: self.disconnect() @@ -219,35 +227,38 @@ class CTRexAsyncClient(): # thread function def _run (self): - # no data yet... - self.alive = False # socket must be created on the same thread self.socket.connect(self.tr) self.socket.setsockopt(zmq.SUBSCRIBE, '') self.socket.setsockopt(zmq.RCVTIMEO, 5000) + got_data = False + while self.active: try: line = self.socket.recv_string() + self.last_data_recv_ts = time.time() - if not self.alive: + # signal once + if not got_data: self.stateless_client.on_async_alive() - self.alive = True + got_data = True + # got a timeout - mark as not alive and retry except zmq.Again: - if self.alive: + # signal once + if got_data: self.stateless_client.on_async_dead() - self.alive = False + got_data = False continue except zmq.ContextTerminated: # outside thread signaled us to exit - self.alive = False break msg = json.loads(line) @@ -264,6 +275,13 @@ class CTRexAsyncClient(): self.socket.close(linger = 0) + # did we get info for the last 3 seconds ? + def is_alive (self): + if self.last_data_recv_ts == None: + return False + + return ( (time.time() - self.last_data_recv_ts) < 3 ) + def get_stats (self): return self.stats |