summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane/server/zmq_monitor_thread.py')
-rwxr-xr-xscripts/automation/trex_control_plane/server/zmq_monitor_thread.py44
1 files changed, 24 insertions, 20 deletions
diff --git a/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py b/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py
index db9bf7da..4fc263df 100755
--- a/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py
+++ b/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py
@@ -27,25 +27,29 @@ class ZmqMonitorSession(threading.Thread):
logger.info("ZMQ monitor initialization finished")
def run(self):
- self.context = zmq.Context()
- self.socket = self.context.socket(zmq.SUB)
- logger.info("ZMQ monitor started listening @ {pub}".format(pub=self.zmq_publisher))
- self.socket.connect(self.zmq_publisher)
- self.socket.setsockopt(zmq.SUBSCRIBE, '')
-
- while not self.stoprequest.is_set():
- try:
- zmq_dump = self.socket.recv() # This call is BLOCKING until data received!
- if self.expect_trex.is_set():
- self.parse_and_update_zmq_dump(zmq_dump)
- logger.debug("ZMQ dump received on socket, and saved to trexObject.")
- except Exception as e:
- if self.stoprequest.is_set():
- # allow this exception since it comes from ZMQ monitor termination
- pass
- else:
- logger.error("ZMQ monitor thrown an exception. Received exception: {ex}".format(ex=e))
- raise
+ try:
+ self.context = zmq.Context()
+ self.socket = self.context.socket(zmq.SUB)
+ logger.info("ZMQ monitor started listening @ {pub}".format(pub=self.zmq_publisher))
+ self.socket.connect(self.zmq_publisher)
+ self.socket.setsockopt(zmq.SUBSCRIBE, b'')
+
+ while not self.stoprequest.is_set():
+ try:
+ zmq_dump = self.socket.recv() # This call is BLOCKING until data received!
+ if self.expect_trex.is_set():
+ self.parse_and_update_zmq_dump(zmq_dump)
+ logger.debug("ZMQ dump received on socket, and saved to trexObject.")
+ except Exception as e:
+ if self.stoprequest.is_set():
+ # allow this exception since it comes from ZMQ monitor termination
+ pass
+ else:
+ logger.error("ZMQ monitor thrown an exception. Received exception: {ex}".format(ex=e))
+ raise
+ except Exception as e:
+ logger.error('ZMQ monitor error: %s' % e)
+ self.trexObj.zmq_error = e
def join(self, timeout=None):
self.stoprequest.set()
@@ -57,7 +61,7 @@ class ZmqMonitorSession(threading.Thread):
def parse_and_update_zmq_dump(self, zmq_dump):
try:
- dict_obj = self.decoder.decode(zmq_dump)
+ dict_obj = self.decoder.decode(zmq_dump.decode(errors = 'replace'))
except ValueError:
logger.error("ZMQ dump failed JSON-RPC decode. Ignoring. Bad dump was: {dump}".format(dump=zmq_dump))
dict_obj = None