From 8b52a31ed2c299b759f330c4f976b9c70f5765f4 Mon Sep 17 00:00:00 2001 From: Hanoh Haim Date: Wed, 24 Jun 2015 14:03:29 +0300 Subject: first version --- .../server/zmq_monitor_thread.py | 80 ++++++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100755 scripts/automation/trex_control_plane/server/zmq_monitor_thread.py (limited to 'scripts/automation/trex_control_plane/server/zmq_monitor_thread.py') diff --git a/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py b/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py new file mode 100755 index 00000000..28e154ee --- /dev/null +++ b/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py @@ -0,0 +1,80 @@ +#!/router/bin/python + +import os +import outer_packages +import zmq +import threading +import logging +import CCustomLogger +from json import JSONDecoder +from common.trex_status_e import TRexStatus + +# setup the logger +CCustomLogger.setup_custom_logger('TRexServer') +logger = logging.getLogger('TRexServer') + +class ZmqMonitorSession(threading.Thread): + def __init__(self, trexObj , zmq_port): + super(ZmqMonitorSession, self).__init__() + self.stoprequest = threading.Event() +# self.terminateFlag = False + self.first_dump = True + self.zmq_port = zmq_port + self.zmq_publisher = "tcp://localhost:{port}".format( port = self.zmq_port ) +# self.context = zmq.Context() +# self.socket = self.context.socket(zmq.SUB) + self.trexObj = trexObj + self.expect_trex = self.trexObj.expect_trex # used to signal if T-Rex is expected to run and if data should be considered + self.decoder = JSONDecoder() + logger.info("ZMQ monitor initialization finished") + + def run (self): + self.context = zmq.Context() + self.socket = self.context.socket(zmq.SUB) + logger.info("ZMQ monitor started listening @ {pub}".format( pub = self.zmq_publisher ) ) + self.socket.connect(self.zmq_publisher) + self.socket.setsockopt(zmq.SUBSCRIBE, '') + + while not self.stoprequest.is_set(): + try: + zmq_dump = self.socket.recv() # This call is BLOCKING until data received! + if self.expect_trex.is_set(): + self.parse_and_update_zmq_dump(zmq_dump) + logger.debug("ZMQ dump received on socket, and saved to trexObject.") + except Exception as e: + if self.stoprequest.is_set(): + # allow this exception since it comes from ZMQ monitor termination + pass + else: + logger.error("ZMQ monitor thrown an exception. Received exception: {ex}".format(ex = e)) + raise + + def join (self, timeout = None): + self.stoprequest.set() + logger.debug("Handling termination of ZMQ monitor thread") + self.socket.close() + self.context.term() + logger.info("ZMQ monitor resources has been freed.") + super(ZmqMonitorSession, self).join(timeout) + + def parse_and_update_zmq_dump (self, zmq_dump): + try: + dict_obj = self.decoder.decode(zmq_dump) + except ValueError: + logger.error("ZMQ dump failed JSON-RPC decode. Ignoring. Bad dump was: {dump}".format(dump = zmq_dump)) + dict_obj = None + + # add to trex_obj zmq latest dump, based on its 'name' header + if dict_obj is not None and dict_obj!={}: + self.trexObj.zmq_dump[dict_obj['name']] = dict_obj + if self.first_dump: + # change TRexStatus from starting to Running once the first ZMQ dump is obtained and parsed successfully + self.first_dump = False + self.trexObj.set_status(TRexStatus.Running) + self.trexObj.set_verbose_status("T-Rex is Running") + logger.info("First ZMQ dump received and successfully parsed. TRex running state changed to 'Running'.") + + +if __name__ == "__main__": + pass + -- cgit 1.2.3-korg