diff options
author | Yaroslav Brustinov <ybrustin@cisco.com> | 2016-08-03 17:24:59 +0300 |
---|---|---|
committer | Yaroslav Brustinov <ybrustin@cisco.com> | 2016-08-03 17:24:59 +0300 |
commit | d1b0fd96d72b7daaa88e21b619d04084c7617ad1 (patch) | |
tree | 092b65bb5ed34188731726c627ccdcd6896261ce /scripts/automation/trex_control_plane/server | |
parent | 775616cd5a8b06c115b5faaf499cbc74fcb8dcc2 (diff) |
stf daemon: add support for zipped zmq
Diffstat (limited to 'scripts/automation/trex_control_plane/server')
-rw-r--r-- | scripts/automation/trex_control_plane/server/zipmsg.py | 32 | ||||
-rwxr-xr-x | scripts/automation/trex_control_plane/server/zmq_monitor_thread.py | 16 |
2 files changed, 42 insertions, 6 deletions
diff --git a/scripts/automation/trex_control_plane/server/zipmsg.py b/scripts/automation/trex_control_plane/server/zipmsg.py new file mode 100644 index 00000000..397ada16 --- /dev/null +++ b/scripts/automation/trex_control_plane/server/zipmsg.py @@ -0,0 +1,32 @@ +import zlib +import struct + +class ZippedMsg: + + MSG_COMPRESS_THRESHOLD = 256 + MSG_COMPRESS_HEADER_MAGIC = 0xABE85CEA + + def check_threshold (self, msg): + return len(msg) >= self.MSG_COMPRESS_THRESHOLD + + def compress (self, msg): + # compress + compressed = zlib.compress(msg) + new_msg = struct.pack(">II", self.MSG_COMPRESS_HEADER_MAGIC, len(msg)) + compressed + return new_msg + + + def decompress (self, msg): + if len(msg) < 8: + return None + + t = struct.unpack(">II", msg[:8]) + if (t[0] != self.MSG_COMPRESS_HEADER_MAGIC): + return None + + x = zlib.decompress(msg[8:]) + if len(x) != t[1]: + return None + + return x + diff --git a/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py b/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py index 4fc263df..f559ebc1 100755 --- a/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py +++ b/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py @@ -6,6 +6,7 @@ import zmq import threading
import logging
import CCustomLogger
+import zipmsg
from json import JSONDecoder
from common.trex_status_e import TRexStatus
@@ -24,6 +25,7 @@ class ZmqMonitorSession(threading.Thread): self.trexObj = trexObj
self.expect_trex = self.trexObj.expect_trex # used to signal if TRex is expected to run and if data should be considered
self.decoder = JSONDecoder()
+ self.zipped = zipmsg.ZippedMsg()
logger.info("ZMQ monitor initialization finished")
def run(self):
@@ -60,14 +62,16 @@ class ZmqMonitorSession(threading.Thread): super(ZmqMonitorSession, self).join(timeout)
def parse_and_update_zmq_dump(self, zmq_dump):
- try:
- dict_obj = self.decoder.decode(zmq_dump.decode(errors = 'replace'))
- except ValueError:
- logger.error("ZMQ dump failed JSON-RPC decode. Ignoring. Bad dump was: {dump}".format(dump=zmq_dump))
- dict_obj = None
+ unzipped = self.zipped.decompress(zmq_dump)
+ if unzipped:
+ zmq_dump = unzipped
+ dict_obj = self.decoder.decode(zmq_dump.decode(errors = 'replace'))
+
+ if type(dict_obj) is not dict:
+ raise Exception('Expected ZMQ dump of type dict, got: %s' % type(dict_obj))
# add to trex_obj zmq latest dump, based on its 'name' header
- if dict_obj is not None and dict_obj != {}:
+ if dict_obj != {}:
self.trexObj.zmq_dump[dict_obj['name']] = dict_obj
if self.first_dump:
# change TRexStatus from starting to Running once the first ZMQ dump is obtained and parsed successfully
|