summaryrefslogtreecommitdiffstats
path: root/scripts/automation
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2016-11-15 15:45:03 +0200
committerimarom <imarom@cisco.com>2016-11-15 15:52:32 +0200
commit552626cf9ccf43636b48675caf61a1169853943e (patch)
treeee9bbf8d22350c171935d6a7e7bdfa5ced94a202 /scripts/automation
parent453dc59d5257096a9b2273162ad2f80315097951 (diff)
https://trex-tgn.cisco.com/youtrack/issue/trex-271
Signed-off-by: imarom <imarom@cisco.com>
Diffstat (limited to 'scripts/automation')
-rw-r--r--scripts/automation/trex_control_plane/stl/console/trex_tui.py4
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py61
-rwxr-xr-xscripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py6
3 files changed, 50 insertions, 21 deletions
diff --git a/scripts/automation/trex_control_plane/stl/console/trex_tui.py b/scripts/automation/trex_control_plane/stl/console/trex_tui.py
index d7db6d30..bf6ed164 100644
--- a/scripts/automation/trex_control_plane/stl/console/trex_tui.py
+++ b/scripts/automation/trex_control_plane/stl/console/trex_tui.py
@@ -645,14 +645,14 @@ class TrexTUI():
# regular state
if self.state == self.STATE_ACTIVE:
# if no connectivity - move to lost connecitivty
- if not self.stateless_client.async_client.is_alive():
+ if not self.stateless_client.async_client.is_active():
self.stateless_client._invalidate_stats(self.pm.ports)
self.state = self.STATE_LOST_CONT
# lost connectivity
elif self.state == self.STATE_LOST_CONT:
- # got it back
+ # if the async is alive (might be zomibe, but alive) try to reconnect
if self.stateless_client.async_client.is_alive():
# move to state reconnect
self.state = self.STATE_RECONNECT
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py
index 2c95844b..11e87592 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py
@@ -137,6 +137,10 @@ class CTRexAsyncStatsManager():
class CTRexAsyncClient():
+ THREAD_STATE_ACTIVE = 1
+ THREAD_STATE_ZOMBIE = 2
+ THREAD_STATE_DEAD = 3
+
def __init__ (self, server, port, stateless_client):
self.port = port
@@ -159,7 +163,10 @@ class CTRexAsyncClient():
self.connected = False
self.zipped = ZippedMsg()
-
+
+ self.t_state = self.THREAD_STATE_DEAD
+
+
# connects the async channel
def connect (self):
@@ -173,8 +180,8 @@ class CTRexAsyncClient():
self.socket = self.context.socket(zmq.SUB)
- # before running the thread - mark as active
- self.active = True
+ # before running the thread - mark as active
+ self.t_state = self.THREAD_STATE_ACTIVE
self.t = threading.Thread(target = self._run)
# kill this thread on exit and don't add it to the join list
@@ -198,26 +205,26 @@ class CTRexAsyncClient():
return RC_OK()
-
-
# disconnect
def disconnect (self):
if not self.connected:
return
# mark for join
- self.active = False
-
- # signal that the context was destroyed (exit the thread loop)
+ self.t_state = self.THREAD_STATE_DEAD
self.context.term()
-
- # join
self.t.join()
+
# done
self.connected = False
+ # set the thread as a zombie (in case of server death)
+ def set_as_zombie (self):
+ self.last_data_recv_ts = None
+ self.t_state = self.THREAD_STATE_ZOMBIE
+
# thread function
def _run (self):
@@ -231,12 +238,19 @@ class CTRexAsyncClient():
self.monitor.reset()
- while self.active:
+ while self.t_state != self.THREAD_STATE_DEAD:
try:
with self.monitor:
line = self.socket.recv()
+ # last data recv.
+ self.last_data_recv_ts = time.time()
+
+ # if thread was marked as zomibe - it does nothing besides fetching messages
+ if self.t_state == self.THREAD_STATE_ZOMBIE:
+ continue
+
self.monitor.on_recv_msg(line)
# try to decomrpess
@@ -246,7 +260,6 @@ class CTRexAsyncClient():
line = line.decode()
- self.last_data_recv_ts = time.time()
# signal once
if not got_data:
@@ -259,13 +272,14 @@ class CTRexAsyncClient():
# signal once
if got_data:
self.event_handler.on_async_dead()
+ self.set_as_zombie()
got_data = False
continue
except zmq.ContextTerminated:
# outside thread signaled us to exit
- assert(not self.active)
+ assert(self.t_state != self.THREAD_STATE_ACTIVE)
break
msg = json.loads(line)
@@ -283,16 +297,29 @@ class CTRexAsyncClient():
# closing of socket must be from the same thread
self.socket.close(linger = 0)
- def is_thread_alive (self):
- return self.t.is_alive()
-
- # did we get info for the last 3 seconds ?
+
+ # return True if the subscriber got data in the last 3 seconds
+ # even if zombie - will return true if got data
def is_alive (self):
+
+ # maybe the thread has exited with exception
+ if not self.t.is_alive():
+ return False
+
+ # simply no data
if self.last_data_recv_ts == None:
return False
+ # timeout of data
return ( (time.time() - self.last_data_recv_ts) < 3 )
+
+ # more granular than active - it means that thread state is active we get info
+ # zomibes will return false
+ def is_active (self):
+ return self.is_alive() and self.t_state == self.THREAD_STATE_ACTIVE
+
+
def get_stats (self):
return self.stats
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
index a390534b..7351f207 100755
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
@@ -175,8 +175,8 @@ class EventsHandler(object):
def on_async_dead (self):
if self.client.connected:
msg = 'Lost connection to server'
- self.__add_event_log('local', 'info', msg, True)
self.client.connected = False
+ self.__add_event_log('local', 'info', msg, True)
def on_async_alive (self):
@@ -341,6 +341,8 @@ class EventsHandler(object):
# server stopped
elif (event_type == 100):
ev = "Server has stopped"
+ # to avoid any new messages on async
+ self.client.async_client.set_as_zombie()
self.__async_event_server_stopped()
show_event = True
@@ -2613,7 +2615,7 @@ class STLClient(object):
while set(self.get_active_ports()).intersection(ports):
# make sure ASYNC thread is still alive - otherwise we will be stuck forever
- if not self.async_client.is_thread_alive():
+ if not self.async_client.is_active():
raise STLError("subscriber thread is dead")
time.sleep(0.01)