From 27a7103d501e9a0bf005d657cb3f7c51a72eca6b Mon Sep 17 00:00:00 2001 From: imarom Date: Thu, 26 Nov 2015 09:00:58 -0500 Subject: when connection is lost - identify this on the console --- .../automation/trex_control_plane/client/trex_async_client.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) (limited to 'scripts/automation/trex_control_plane/client/trex_async_client.py') diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 7641a1e3..6793a4ca 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -180,9 +180,18 @@ class CTRexAsyncClient(): self.socket.connect(self.tr) self.socket.setsockopt(zmq.SUBSCRIBE, '') + self.socket.setsockopt(zmq.RCVTIMEO, 3000) while self.active: - line = self.socket.recv_string(); + try: + + line = self.socket.recv_string(); + self.stateless_client.on_async_alive() + + except zmq.Again: + self.stateless_client.on_async_dead() + continue + msg = json.loads(line) name = msg['name'] -- cgit From 026f949fbafbb00fd7a21f3d84a632f5745003ea Mon Sep 17 00:00:00 2001 From: imarom Date: Sun, 6 Dec 2015 08:11:42 -0500 Subject: ZMQ bug - connect / disconnect fron another thread on pyhton (not safe !) fixed and more hardening --- .../trex_control_plane/client/trex_async_client.py | 82 ++++++++++++++++++---- 1 file changed, 68 insertions(+), 14 deletions(-) (limited to 'scripts/automation/trex_control_plane/client/trex_async_client.py') diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 6793a4ca..e38c6ca7 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -159,39 +159,94 @@ class CTRexAsyncClient(): self.stats = TrexAsyncStatsManager() + self.connected = False + + # connects the async channel + def connect (self): + + if self.connected: + self.disconnect() self.tr = "tcp://{0}:{1}".format(self.server, self.port) - print "\nConnecting To ZMQ Publisher At {0}".format(self.tr) + print "\nConnecting To ZMQ Publisher On {0}".format(self.tr) + # Socket to talk to server + self.context = zmq.Context() + self.socket = self.context.socket(zmq.SUB) + + + # before running the thread - mark as active self.active = True - self.t = threading.Thread(target = self.run) + self.alive = False + self.t = threading.Thread(target = self._run) # kill this thread on exit and don't add it to the join list self.t.setDaemon(True) self.t.start() + self.connected = True - def run (self): + # wait for data streaming from the server + timeout = time.time() + 5 + while not self.alive: + time.sleep(0.01) + if time.time() > timeout: + self.disconnect() + return False, "*** [subscriber] - no data flow from server at : " + self.tr - # Socket to talk to server - self.context = zmq.Context() - self.socket = self.context.socket(zmq.SUB) + return True, "" + + + # disconnect + def disconnect (self): + if not self.connected: + return + + # signal that the context was destroyed (exit the thread loop) + self.context.term() + + # mark for join and join + self.active = False + self.t.join() + + # done + self.connected = False + + # thread function + def _run (self): + + # no data yet... + self.alive = False + # socket must be created on the same thread self.socket.connect(self.tr) self.socket.setsockopt(zmq.SUBSCRIBE, '') - self.socket.setsockopt(zmq.RCVTIMEO, 3000) + self.socket.setsockopt(zmq.RCVTIMEO, 5000) while self.active: try: line = self.socket.recv_string(); - self.stateless_client.on_async_alive() + if not self.alive: + self.stateless_client.on_async_alive() + self.alive = True + + # got a timeout - mark as not alive and retry except zmq.Again: - self.stateless_client.on_async_dead() + + if self.alive: + self.stateless_client.on_async_dead() + self.alive = False + continue + except zmq.ContextTerminated: + # outside thread signaled us to exit + self.alive = False + break + msg = json.loads(line) name = msg['name'] @@ -201,6 +256,10 @@ class CTRexAsyncClient(): self.__dispatch(name, type, data) + + # closing of socket must be from the same thread + self.socket.close(linger = 0) + def get_stats (self): return self.stats @@ -220,8 +279,3 @@ class CTRexAsyncClient(): else: pass - - def stop (self): - self.active = False - self.t.join() - -- cgit From bae48d6cf8dd59158ffcb488391af8a96fc2e037 Mon Sep 17 00:00:00 2001 From: imarom Date: Mon, 14 Dec 2015 04:47:35 -0500 Subject: TUI v2.0 - now no flickering, state machine for lost of connectivity and TUI can be started in xterm using tui -x --- .../trex_control_plane/client/trex_async_client.py | 40 ++++++++++++++++------ 1 file changed, 29 insertions(+), 11 deletions(-) (limited to 'scripts/automation/trex_control_plane/client/trex_async_client.py') diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 8fdf7c9b..00304886 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -152,16 +152,19 @@ class CTRexAsyncStatsManager(): class CTRexAsyncClient(): - def __init__ (self, server, port, stateless_client): + def __init__ (self, server, port, stateless_client, prn_func = None): self.port = port self.server = server self.stateless_client = stateless_client + self.prn_func = prn_func self.raw_snapshot = {} self.stats = CTRexAsyncStatsManager() + self.last_data_recv_ts = 0 + self.connected = False # connects the async channel @@ -171,7 +174,13 @@ class CTRexAsyncClient(): self.disconnect() self.tr = "tcp://{0}:{1}".format(self.server, self.port) - print "\nConnecting To ZMQ Publisher On {0}".format(self.tr) + + msg = "\nConnecting To ZMQ Publisher On {0}".format(self.tr) + + if self.prn_func: + self.prn_func(msg) + else: + print msg # Socket to talk to server self.context = zmq.Context() @@ -180,7 +189,6 @@ class CTRexAsyncClient(): # before running the thread - mark as active self.active = True - self.alive = False self.t = threading.Thread(target = self._run) # kill this thread on exit and don't add it to the join list @@ -192,7 +200,7 @@ class CTRexAsyncClient(): # wait for data streaming from the server timeout = time.time() + 5 - while not self.alive: + while not self.is_alive(): time.sleep(0.01) if time.time() > timeout: self.disconnect() @@ -219,35 +227,38 @@ class CTRexAsyncClient(): # thread function def _run (self): - # no data yet... - self.alive = False # socket must be created on the same thread self.socket.connect(self.tr) self.socket.setsockopt(zmq.SUBSCRIBE, '') self.socket.setsockopt(zmq.RCVTIMEO, 5000) + got_data = False + while self.active: try: line = self.socket.recv_string() + self.last_data_recv_ts = time.time() - if not self.alive: + # signal once + if not got_data: self.stateless_client.on_async_alive() - self.alive = True + got_data = True + # got a timeout - mark as not alive and retry except zmq.Again: - if self.alive: + # signal once + if got_data: self.stateless_client.on_async_dead() - self.alive = False + got_data = False continue except zmq.ContextTerminated: # outside thread signaled us to exit - self.alive = False break msg = json.loads(line) @@ -264,6 +275,13 @@ class CTRexAsyncClient(): self.socket.close(linger = 0) + # did we get info for the last 3 seconds ? + def is_alive (self): + if self.last_data_recv_ts == None: + return False + + return ( (time.time() - self.last_data_recv_ts) < 3 ) + def get_stats (self): return self.stats -- cgit From bfb15053ea5d21bc0502f3102cd83407fafddf75 Mon Sep 17 00:00:00 2001 From: imarom Date: Tue, 15 Dec 2015 04:26:21 -0500 Subject: moved all return code values to RC types (better for batch rc) also few fixes --- scripts/automation/trex_control_plane/client/trex_async_client.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'scripts/automation/trex_control_plane/client/trex_async_client.py') diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 00304886..459d6915 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -19,6 +19,7 @@ import re from common.trex_stats import * from common.trex_streams import * +from common.trex_types import * # basic async stats class class CTRexAsyncStats(object): @@ -204,9 +205,9 @@ class CTRexAsyncClient(): time.sleep(0.01) if time.time() > timeout: self.disconnect() - return False, "*** [subscriber] - no data flow from server at : " + self.tr + return RC_ERR("*** [subscriber] - no data flow from server at : " + self.tr) - return True, "" + return RC_OK() # disconnect -- cgit