From 552626cf9ccf43636b48675caf61a1169853943e Mon Sep 17 00:00:00 2001
From: imarom <imarom@cisco.com>
Date: Tue, 15 Nov 2016 15:45:03 +0200
Subject: https://trex-tgn.cisco.com/youtrack/issue/trex-271

Signed-off-by: imarom <imarom@cisco.com>
---
 .../trex_control_plane/stl/console/trex_tui.py     |  4 +-
 .../stl/trex_stl_lib/trex_stl_async_client.py      | 61 ++++++++++++++++------
 .../stl/trex_stl_lib/trex_stl_client.py            |  6 ++-
 3 files changed, 50 insertions(+), 21 deletions(-)

(limited to 'scripts/automation/trex_control_plane/stl')

diff --git a/scripts/automation/trex_control_plane/stl/console/trex_tui.py b/scripts/automation/trex_control_plane/stl/console/trex_tui.py
index d7db6d30..bf6ed164 100644
--- a/scripts/automation/trex_control_plane/stl/console/trex_tui.py
+++ b/scripts/automation/trex_control_plane/stl/console/trex_tui.py
@@ -645,14 +645,14 @@ class TrexTUI():
        # regular state
         if self.state == self.STATE_ACTIVE:
             # if no connectivity - move to lost connecitivty
-            if not self.stateless_client.async_client.is_alive():
+            if not self.stateless_client.async_client.is_active():
                 self.stateless_client._invalidate_stats(self.pm.ports)
                 self.state = self.STATE_LOST_CONT
 
 
         # lost connectivity
         elif self.state == self.STATE_LOST_CONT:
-            # got it back
+            # if the async is alive (might be zomibe, but alive) try to reconnect
             if self.stateless_client.async_client.is_alive():
                 # move to state reconnect
                 self.state = self.STATE_RECONNECT
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py
index 2c95844b..11e87592 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py
@@ -137,6 +137,10 @@ class CTRexAsyncStatsManager():
 
 
 class CTRexAsyncClient():
+    THREAD_STATE_ACTIVE = 1
+    THREAD_STATE_ZOMBIE = 2
+    THREAD_STATE_DEAD   = 3
+    
     def __init__ (self, server, port, stateless_client):
 
         self.port = port
@@ -159,7 +163,10 @@ class CTRexAsyncClient():
         self.connected = False
 
         self.zipped = ZippedMsg()
-
+        
+        self.t_state = self.THREAD_STATE_DEAD
+        
+        
     # connects the async channel
     def connect (self):
 
@@ -173,8 +180,8 @@ class CTRexAsyncClient():
         self.socket = self.context.socket(zmq.SUB)
 
 
-        # before running the thread - mark as active
-        self.active = True
+        # before running the thread - mark as active    
+        self.t_state = self.THREAD_STATE_ACTIVE
         self.t = threading.Thread(target = self._run)
 
         # kill this thread on exit and don't add it to the join list
@@ -198,26 +205,26 @@ class CTRexAsyncClient():
         return RC_OK()
 
     
-
-
     # disconnect
     def disconnect (self):
         if not self.connected:
             return
 
         # mark for join
-        self.active = False
-
-        # signal that the context was destroyed (exit the thread loop)
+        self.t_state = self.THREAD_STATE_DEAD
         self.context.term()
-
-        # join
         self.t.join()
 
+
         # done
         self.connected = False
 
 
+    # set the thread as a zombie (in case of server death)
+    def set_as_zombie (self):
+        self.last_data_recv_ts = None
+        self.t_state = self.THREAD_STATE_ZOMBIE
+        
     # thread function
     def _run (self):
 
@@ -231,12 +238,19 @@ class CTRexAsyncClient():
         self.monitor.reset()
 
 
-        while self.active:
+        while self.t_state != self.THREAD_STATE_DEAD:
             try:
 
                 with self.monitor:
                     line = self.socket.recv()
                 
+                # last data recv.
+                self.last_data_recv_ts = time.time()
+                
+                # if thread was marked as zomibe - it does nothing besides fetching messages
+                if self.t_state == self.THREAD_STATE_ZOMBIE:
+                    continue
+                    
                 self.monitor.on_recv_msg(line)
 
                 # try to decomrpess
@@ -246,7 +260,6 @@ class CTRexAsyncClient():
 
                 line = line.decode()
 
-                self.last_data_recv_ts = time.time()
 
                 # signal once
                 if not got_data:
@@ -259,13 +272,14 @@ class CTRexAsyncClient():
                 # signal once
                 if got_data:
                     self.event_handler.on_async_dead()
+                    self.set_as_zombie()
                     got_data = False
 
                 continue
 
             except zmq.ContextTerminated:
                 # outside thread signaled us to exit
-                assert(not self.active)
+                assert(self.t_state != self.THREAD_STATE_ACTIVE)
                 break
 
             msg = json.loads(line)
@@ -283,16 +297,29 @@ class CTRexAsyncClient():
         # closing of socket must be from the same thread
         self.socket.close(linger = 0)
 
-    def is_thread_alive (self):
-        return self.t.is_alive()
-
-    # did we get info for the last 3 seconds ?
+        
+    # return True if the subscriber got data in the last 3 seconds
+    # even if zombie - will return true if got data
     def is_alive (self):
+
+        # maybe the thread has exited with exception
+        if not self.t.is_alive():
+            return False
+
+        # simply no data
         if self.last_data_recv_ts == None:
             return False
 
+        # timeout of data
         return ( (time.time() - self.last_data_recv_ts) < 3 )
 
+
+    # more granular than active - it means that thread state is active we get info
+    # zomibes will return false
+    def is_active (self):
+        return self.is_alive() and self.t_state == self.THREAD_STATE_ACTIVE
+        
+        
     def get_stats (self):
         return self.stats
 
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
index a390534b..7351f207 100755
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
@@ -175,8 +175,8 @@ class EventsHandler(object):
     def on_async_dead (self):
         if self.client.connected:
             msg = 'Lost connection to server'
-            self.__add_event_log('local', 'info', msg, True)
             self.client.connected = False
+            self.__add_event_log('local', 'info', msg, True)
 
 
     def on_async_alive (self):
@@ -341,6 +341,8 @@ class EventsHandler(object):
         # server stopped
         elif (event_type == 100):
             ev = "Server has stopped"
+            # to avoid any new messages on async
+            self.client.async_client.set_as_zombie()
             self.__async_event_server_stopped()
             show_event = True
 
@@ -2613,7 +2615,7 @@ class STLClient(object):
         while set(self.get_active_ports()).intersection(ports):
 
             # make sure ASYNC thread is still alive - otherwise we will be stuck forever
-            if not self.async_client.is_thread_alive():
+            if not self.async_client.is_active():
                 raise STLError("subscriber thread is dead")
 
             time.sleep(0.01)
-- 
cgit