summaryrefslogtreecommitdiffstats
path: root/scripts
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2015-12-06 08:11:42 -0500
committerimarom <imarom@cisco.com>2015-12-06 08:11:42 -0500
commit026f949fbafbb00fd7a21f3d84a632f5745003ea (patch)
tree5b23a41769b81c4ec9b7e1d29b4db181eebfdebf /scripts
parent23e1f07edcd8289f09a1477c416ce260d1a0a804 (diff)
ZMQ bug - connect / disconnect fron another thread on pyhton (not safe !)
fixed and more hardening
Diffstat (limited to 'scripts')
-rw-r--r--scripts/automation/trex_control_plane/client/trex_async_client.py82
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py31
-rwxr-xr-xscripts/automation/trex_control_plane/client_utils/jsonrpc_client.py4
-rwxr-xr-xscripts/automation/trex_control_plane/console/trex_console.py1
4 files changed, 87 insertions, 31 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py
index 6793a4ca..e38c6ca7 100644
--- a/scripts/automation/trex_control_plane/client/trex_async_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_async_client.py
@@ -159,39 +159,94 @@ class CTRexAsyncClient():
self.stats = TrexAsyncStatsManager()
+ self.connected = False
+
+ # connects the async channel
+ def connect (self):
+
+ if self.connected:
+ self.disconnect()
self.tr = "tcp://{0}:{1}".format(self.server, self.port)
- print "\nConnecting To ZMQ Publisher At {0}".format(self.tr)
+ print "\nConnecting To ZMQ Publisher On {0}".format(self.tr)
+ # Socket to talk to server
+ self.context = zmq.Context()
+ self.socket = self.context.socket(zmq.SUB)
+
+
+ # before running the thread - mark as active
self.active = True
- self.t = threading.Thread(target = self.run)
+ self.alive = False
+ self.t = threading.Thread(target = self._run)
# kill this thread on exit and don't add it to the join list
self.t.setDaemon(True)
self.t.start()
+ self.connected = True
- def run (self):
+ # wait for data streaming from the server
+ timeout = time.time() + 5
+ while not self.alive:
+ time.sleep(0.01)
+ if time.time() > timeout:
+ self.disconnect()
+ return False, "*** [subscriber] - no data flow from server at : " + self.tr
- # Socket to talk to server
- self.context = zmq.Context()
- self.socket = self.context.socket(zmq.SUB)
+ return True, ""
+
+
+ # disconnect
+ def disconnect (self):
+ if not self.connected:
+ return
+
+ # signal that the context was destroyed (exit the thread loop)
+ self.context.term()
+
+ # mark for join and join
+ self.active = False
+ self.t.join()
+
+ # done
+ self.connected = False
+
+ # thread function
+ def _run (self):
+
+ # no data yet...
+ self.alive = False
+ # socket must be created on the same thread
self.socket.connect(self.tr)
self.socket.setsockopt(zmq.SUBSCRIBE, '')
- self.socket.setsockopt(zmq.RCVTIMEO, 3000)
+ self.socket.setsockopt(zmq.RCVTIMEO, 5000)
while self.active:
try:
line = self.socket.recv_string();
- self.stateless_client.on_async_alive()
+ if not self.alive:
+ self.stateless_client.on_async_alive()
+ self.alive = True
+
+ # got a timeout - mark as not alive and retry
except zmq.Again:
- self.stateless_client.on_async_dead()
+
+ if self.alive:
+ self.stateless_client.on_async_dead()
+ self.alive = False
+
continue
+ except zmq.ContextTerminated:
+ # outside thread signaled us to exit
+ self.alive = False
+ break
+
msg = json.loads(line)
name = msg['name']
@@ -201,6 +256,10 @@ class CTRexAsyncClient():
self.__dispatch(name, type, data)
+
+ # closing of socket must be from the same thread
+ self.socket.close(linger = 0)
+
def get_stats (self):
return self.stats
@@ -220,8 +279,3 @@ class CTRexAsyncClient():
else:
pass
-
- def stop (self):
- self.active = False
- self.t.join()
-
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index 6082863e..3a9d74a9 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -445,14 +445,13 @@ class CTRexStatelessClient(object):
self._server_version = None
self.__err_log = None
- self._async_client = CTRexAsyncClient(server, async_port, self)
+ self.async_client = CTRexAsyncClient(server, async_port, self)
self.streams_db = CStreamsDB()
- self.connected = False
-
self.events = []
+ self.connected = False
################# events handler ######################
def add_event_log (self, msg, ev_type, show = False):
@@ -515,7 +514,8 @@ class CTRexStatelessClient(object):
self.ports[port_id].async_event_port_stopped()
def async_event_server_stopped (self):
- self.disconnect()
+ self.connected = False
+
def get_events (self):
return self.events
@@ -582,13 +582,19 @@ class CTRexStatelessClient(object):
# connection sequence
def connect(self):
+ # clear this flag
self.connected = False
- # connect
+ # connect sync channel
rc, data = self.comm_link.connect()
if not rc:
return RC_ERR(data)
+ # connect async channel
+ rc, data = self.async_client.connect()
+ if not rc:
+ return RC_ERR(data)
+
# version
rc, data = self.transmit("get_version")
if not rc:
@@ -634,24 +640,19 @@ class CTRexStatelessClient(object):
def disconnect(self):
- self.connected = False
self.comm_link.disconnect()
+ self.async_client.disconnect()
return RC_OK()
def on_async_dead (self):
- if self.is_connected():
+ if self.connected:
msg = 'lost connection to server'
self.add_event_log(msg, 'local', True)
-
- self.disconnect()
+ self.connected = False
def on_async_alive (self):
- if not self.is_connected():
- msg = 'server connection restored'
- self.add_event_log(msg, 'local', True)
-
- self.cmd_connect()
+ pass
########### cached queries (no server traffic) ###########
@@ -675,7 +676,7 @@ class CTRexStatelessClient(object):
return port_ids
def get_stats_async (self):
- return self._async_client.get_stats()
+ return self.async_client.get_stats()
def get_connection_port (self):
return self.comm_link.port
diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
index a5789c46..90d7f8e8 100755
--- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
+++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
@@ -133,7 +133,7 @@ class JsonRpcClient(object):
tries += 1
if tries > 10:
self.disconnect()
- return CmdResponse(False, "Failed to send message to server")
+ return CmdResponse(False, "*** [RPC] - Failed to send message to server")
tries = 0
@@ -145,7 +145,7 @@ class JsonRpcClient(object):
tries += 1
if tries > 10:
self.disconnect()
- return CmdResponse(False, "Failed to get server response")
+ return CmdResponse(False, "*** [RPC] - Failed to get server response")
self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n")
diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py
index 73f4f612..9d855f98 100755
--- a/scripts/automation/trex_control_plane/console/trex_console.py
+++ b/scripts/automation/trex_control_plane/console/trex_console.py
@@ -153,6 +153,7 @@ class TRexConsole(TRexGeneralCmd):
self.__dict__[name] = getattr(self.trex_console, name)
def postcmd(self, stop, line):
+
if self.stateless_client.is_connected():
self.prompt = "TRex > "
else: