summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane/server/zmq_monitor_thread.py')
-rwxr-xr-xscripts/automation/trex_control_plane/server/zmq_monitor_thread.py80
1 files changed, 80 insertions, 0 deletions
diff --git a/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py b/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py
new file mode 100755
index 00000000..28e154ee
--- /dev/null
+++ b/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py
@@ -0,0 +1,80 @@
+#!/router/bin/python
+
+import os
+import outer_packages
+import zmq
+import threading
+import logging
+import CCustomLogger
+from json import JSONDecoder
+from common.trex_status_e import TRexStatus
+
+# setup the logger
+CCustomLogger.setup_custom_logger('TRexServer')
+logger = logging.getLogger('TRexServer')
+
+class ZmqMonitorSession(threading.Thread):
+ def __init__(self, trexObj , zmq_port):
+ super(ZmqMonitorSession, self).__init__()
+ self.stoprequest = threading.Event()
+# self.terminateFlag = False
+ self.first_dump = True
+ self.zmq_port = zmq_port
+ self.zmq_publisher = "tcp://localhost:{port}".format( port = self.zmq_port )
+# self.context = zmq.Context()
+# self.socket = self.context.socket(zmq.SUB)
+ self.trexObj = trexObj
+ self.expect_trex = self.trexObj.expect_trex # used to signal if T-Rex is expected to run and if data should be considered
+ self.decoder = JSONDecoder()
+ logger.info("ZMQ monitor initialization finished")
+
+ def run (self):
+ self.context = zmq.Context()
+ self.socket = self.context.socket(zmq.SUB)
+ logger.info("ZMQ monitor started listening @ {pub}".format( pub = self.zmq_publisher ) )
+ self.socket.connect(self.zmq_publisher)
+ self.socket.setsockopt(zmq.SUBSCRIBE, '')
+
+ while not self.stoprequest.is_set():
+ try:
+ zmq_dump = self.socket.recv() # This call is BLOCKING until data received!
+ if self.expect_trex.is_set():
+ self.parse_and_update_zmq_dump(zmq_dump)
+ logger.debug("ZMQ dump received on socket, and saved to trexObject.")
+ except Exception as e:
+ if self.stoprequest.is_set():
+ # allow this exception since it comes from ZMQ monitor termination
+ pass
+ else:
+ logger.error("ZMQ monitor thrown an exception. Received exception: {ex}".format(ex = e))
+ raise
+
+ def join (self, timeout = None):
+ self.stoprequest.set()
+ logger.debug("Handling termination of ZMQ monitor thread")
+ self.socket.close()
+ self.context.term()
+ logger.info("ZMQ monitor resources has been freed.")
+ super(ZmqMonitorSession, self).join(timeout)
+
+ def parse_and_update_zmq_dump (self, zmq_dump):
+ try:
+ dict_obj = self.decoder.decode(zmq_dump)
+ except ValueError:
+ logger.error("ZMQ dump failed JSON-RPC decode. Ignoring. Bad dump was: {dump}".format(dump = zmq_dump))
+ dict_obj = None
+
+ # add to trex_obj zmq latest dump, based on its 'name' header
+ if dict_obj is not None and dict_obj!={}:
+ self.trexObj.zmq_dump[dict_obj['name']] = dict_obj
+ if self.first_dump:
+ # change TRexStatus from starting to Running once the first ZMQ dump is obtained and parsed successfully
+ self.first_dump = False
+ self.trexObj.set_status(TRexStatus.Running)
+ self.trexObj.set_verbose_status("T-Rex is Running")
+ logger.info("First ZMQ dump received and successfully parsed. TRex running state changed to 'Running'.")
+
+
+if __name__ == "__main__":
+ pass
+