From d1b0fd96d72b7daaa88e21b619d04084c7617ad1 Mon Sep 17 00:00:00 2001 From: Yaroslav Brustinov Date: Wed, 3 Aug 2016 17:24:59 +0300 Subject: stf daemon: add support for zipped zmq --- .../automation/trex_control_plane/server/zipmsg.py | 32 ++++++++++++++++++++++ .../server/zmq_monitor_thread.py | 16 +++++++---- 2 files changed, 42 insertions(+), 6 deletions(-) create mode 100644 scripts/automation/trex_control_plane/server/zipmsg.py (limited to 'scripts/automation/trex_control_plane/server') diff --git a/scripts/automation/trex_control_plane/server/zipmsg.py b/scripts/automation/trex_control_plane/server/zipmsg.py new file mode 100644 index 00000000..397ada16 --- /dev/null +++ b/scripts/automation/trex_control_plane/server/zipmsg.py @@ -0,0 +1,32 @@ +import zlib +import struct + +class ZippedMsg: + + MSG_COMPRESS_THRESHOLD = 256 + MSG_COMPRESS_HEADER_MAGIC = 0xABE85CEA + + def check_threshold (self, msg): + return len(msg) >= self.MSG_COMPRESS_THRESHOLD + + def compress (self, msg): + # compress + compressed = zlib.compress(msg) + new_msg = struct.pack(">II", self.MSG_COMPRESS_HEADER_MAGIC, len(msg)) + compressed + return new_msg + + + def decompress (self, msg): + if len(msg) < 8: + return None + + t = struct.unpack(">II", msg[:8]) + if (t[0] != self.MSG_COMPRESS_HEADER_MAGIC): + return None + + x = zlib.decompress(msg[8:]) + if len(x) != t[1]: + return None + + return x + diff --git a/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py b/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py index 4fc263df..f559ebc1 100755 --- a/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py +++ b/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py @@ -6,6 +6,7 @@ import zmq import threading import logging import CCustomLogger +import zipmsg from json import JSONDecoder from common.trex_status_e import TRexStatus @@ -24,6 +25,7 @@ class ZmqMonitorSession(threading.Thread): self.trexObj = trexObj self.expect_trex = self.trexObj.expect_trex # used to signal if TRex is expected to run and if data should be considered self.decoder = JSONDecoder() + self.zipped = zipmsg.ZippedMsg() logger.info("ZMQ monitor initialization finished") def run(self): @@ -60,14 +62,16 @@ class ZmqMonitorSession(threading.Thread): super(ZmqMonitorSession, self).join(timeout) def parse_and_update_zmq_dump(self, zmq_dump): - try: - dict_obj = self.decoder.decode(zmq_dump.decode(errors = 'replace')) - except ValueError: - logger.error("ZMQ dump failed JSON-RPC decode. Ignoring. Bad dump was: {dump}".format(dump=zmq_dump)) - dict_obj = None + unzipped = self.zipped.decompress(zmq_dump) + if unzipped: + zmq_dump = unzipped + dict_obj = self.decoder.decode(zmq_dump.decode(errors = 'replace')) + + if type(dict_obj) is not dict: + raise Exception('Expected ZMQ dump of type dict, got: %s' % type(dict_obj)) # add to trex_obj zmq latest dump, based on its 'name' header - if dict_obj is not None and dict_obj != {}: + if dict_obj != {}: self.trexObj.zmq_dump[dict_obj['name']] = dict_obj if self.first_dump: # change TRexStatus from starting to Running once the first ZMQ dump is obtained and parsed successfully -- cgit 1.2.3-korg