diff options
4 files changed, 87 insertions, 31 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 6793a4ca..e38c6ca7 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -159,39 +159,94 @@ class CTRexAsyncClient(): self.stats = TrexAsyncStatsManager() + self.connected = False + + # connects the async channel + def connect (self): + + if self.connected: + self.disconnect() self.tr = "tcp://{0}:{1}".format(self.server, self.port) - print "\nConnecting To ZMQ Publisher At {0}".format(self.tr) + print "\nConnecting To ZMQ Publisher On {0}".format(self.tr) + # Socket to talk to server + self.context = zmq.Context() + self.socket = self.context.socket(zmq.SUB) + + + # before running the thread - mark as active self.active = True - self.t = threading.Thread(target = self.run) + self.alive = False + self.t = threading.Thread(target = self._run) # kill this thread on exit and don't add it to the join list self.t.setDaemon(True) self.t.start() + self.connected = True - def run (self): + # wait for data streaming from the server + timeout = time.time() + 5 + while not self.alive: + time.sleep(0.01) + if time.time() > timeout: + self.disconnect() + return False, "*** [subscriber] - no data flow from server at : " + self.tr - # Socket to talk to server - self.context = zmq.Context() - self.socket = self.context.socket(zmq.SUB) + return True, "" + + + # disconnect + def disconnect (self): + if not self.connected: + return + + # signal that the context was destroyed (exit the thread loop) + self.context.term() + + # mark for join and join + self.active = False + self.t.join() + + # done + self.connected = False + + # thread function + def _run (self): + + # no data yet... + self.alive = False + # socket must be created on the same thread self.socket.connect(self.tr) self.socket.setsockopt(zmq.SUBSCRIBE, '') - self.socket.setsockopt(zmq.RCVTIMEO, 3000) + self.socket.setsockopt(zmq.RCVTIMEO, 5000) while self.active: try: line = self.socket.recv_string(); - self.stateless_client.on_async_alive() + if not self.alive: + self.stateless_client.on_async_alive() + self.alive = True + + # got a timeout - mark as not alive and retry except zmq.Again: - self.stateless_client.on_async_dead() + + if self.alive: + self.stateless_client.on_async_dead() + self.alive = False + continue + except zmq.ContextTerminated: + # outside thread signaled us to exit + self.alive = False + break + msg = json.loads(line) name = msg['name'] @@ -201,6 +256,10 @@ class CTRexAsyncClient(): self.__dispatch(name, type, data) + + # closing of socket must be from the same thread + self.socket.close(linger = 0) + def get_stats (self): return self.stats @@ -220,8 +279,3 @@ class CTRexAsyncClient(): else: pass - - def stop (self): - self.active = False - self.t.join() - diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 6082863e..3a9d74a9 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -445,14 +445,13 @@ class CTRexStatelessClient(object): self._server_version = None self.__err_log = None - self._async_client = CTRexAsyncClient(server, async_port, self) + self.async_client = CTRexAsyncClient(server, async_port, self) self.streams_db = CStreamsDB() - self.connected = False - self.events = [] + self.connected = False ################# events handler ###################### def add_event_log (self, msg, ev_type, show = False): @@ -515,7 +514,8 @@ class CTRexStatelessClient(object): self.ports[port_id].async_event_port_stopped() def async_event_server_stopped (self): - self.disconnect() + self.connected = False + def get_events (self): return self.events @@ -582,13 +582,19 @@ class CTRexStatelessClient(object): # connection sequence def connect(self): + # clear this flag self.connected = False - # connect + # connect sync channel rc, data = self.comm_link.connect() if not rc: return RC_ERR(data) + # connect async channel + rc, data = self.async_client.connect() + if not rc: + return RC_ERR(data) + # version rc, data = self.transmit("get_version") if not rc: @@ -634,24 +640,19 @@ class CTRexStatelessClient(object): def disconnect(self): - self.connected = False self.comm_link.disconnect() + self.async_client.disconnect() return RC_OK() def on_async_dead (self): - if self.is_connected(): + if self.connected: msg = 'lost connection to server' self.add_event_log(msg, 'local', True) - - self.disconnect() + self.connected = False def on_async_alive (self): - if not self.is_connected(): - msg = 'server connection restored' - self.add_event_log(msg, 'local', True) - - self.cmd_connect() + pass ########### cached queries (no server traffic) ########### @@ -675,7 +676,7 @@ class CTRexStatelessClient(object): return port_ids def get_stats_async (self): - return self._async_client.get_stats() + return self.async_client.get_stats() def get_connection_port (self): return self.comm_link.port diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py index a5789c46..90d7f8e8 100755 --- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py +++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py @@ -133,7 +133,7 @@ class JsonRpcClient(object): tries += 1 if tries > 10: self.disconnect() - return CmdResponse(False, "Failed to send message to server") + return CmdResponse(False, "*** [RPC] - Failed to send message to server") tries = 0 @@ -145,7 +145,7 @@ class JsonRpcClient(object): tries += 1 if tries > 10: self.disconnect() - return CmdResponse(False, "Failed to get server response") + return CmdResponse(False, "*** [RPC] - Failed to get server response") self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n") diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py index 73f4f612..9d855f98 100755 --- a/scripts/automation/trex_control_plane/console/trex_console.py +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -153,6 +153,7 @@ class TRexConsole(TRexGeneralCmd): self.__dict__[name] = getattr(self.trex_console, name) def postcmd(self, stop, line): + if self.stateless_client.is_connected(): self.prompt = "TRex > " else: |