summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane/server/zmq_monitor_thread.py')
-rwxr-xr-xscripts/automation/trex_control_plane/server/zmq_monitor_thread.py20
1 files changed, 9 insertions, 11 deletions
diff --git a/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py b/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py
index 28e154ee..7a278af8 100755
--- a/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py
+++ b/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py
@@ -13,25 +13,23 @@ from common.trex_status_e import TRexStatus
CCustomLogger.setup_custom_logger('TRexServer')
logger = logging.getLogger('TRexServer')
+
class ZmqMonitorSession(threading.Thread):
def __init__(self, trexObj , zmq_port):
super(ZmqMonitorSession, self).__init__()
self.stoprequest = threading.Event()
-# self.terminateFlag = False
self.first_dump = True
self.zmq_port = zmq_port
- self.zmq_publisher = "tcp://localhost:{port}".format( port = self.zmq_port )
-# self.context = zmq.Context()
-# self.socket = self.context.socket(zmq.SUB)
+ self.zmq_publisher = "tcp://localhost:{port}".format(port=self.zmq_port)
self.trexObj = trexObj
self.expect_trex = self.trexObj.expect_trex # used to signal if T-Rex is expected to run and if data should be considered
self.decoder = JSONDecoder()
logger.info("ZMQ monitor initialization finished")
- def run (self):
+ def run(self):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
- logger.info("ZMQ monitor started listening @ {pub}".format( pub = self.zmq_publisher ) )
+ logger.info("ZMQ monitor started listening @ {pub}".format(pub=self.zmq_publisher))
self.socket.connect(self.zmq_publisher)
self.socket.setsockopt(zmq.SUBSCRIBE, '')
@@ -46,10 +44,10 @@ class ZmqMonitorSession(threading.Thread):
# allow this exception since it comes from ZMQ monitor termination
pass
else:
- logger.error("ZMQ monitor thrown an exception. Received exception: {ex}".format(ex = e))
+ logger.error("ZMQ monitor thrown an exception. Received exception: {ex}".format(ex=e))
raise
- def join (self, timeout = None):
+ def join(self, timeout=None):
self.stoprequest.set()
logger.debug("Handling termination of ZMQ monitor thread")
self.socket.close()
@@ -57,15 +55,15 @@ class ZmqMonitorSession(threading.Thread):
logger.info("ZMQ monitor resources has been freed.")
super(ZmqMonitorSession, self).join(timeout)
- def parse_and_update_zmq_dump (self, zmq_dump):
+ def parse_and_update_zmq_dump(self, zmq_dump):
try:
dict_obj = self.decoder.decode(zmq_dump)
except ValueError:
- logger.error("ZMQ dump failed JSON-RPC decode. Ignoring. Bad dump was: {dump}".format(dump = zmq_dump))
+ logger.error("ZMQ dump failed JSON-RPC decode. Ignoring. Bad dump was: {dump}".format(dump=zmq_dump))
dict_obj = None
# add to trex_obj zmq latest dump, based on its 'name' header
- if dict_obj is not None and dict_obj!={}:
+ if dict_obj is not None and dict_obj != {}:
self.trexObj.zmq_dump[dict_obj['name']] = dict_obj
if self.first_dump:
# change TRexStatus from starting to Running once the first ZMQ dump is obtained and parsed successfully