summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/server
diff options
context:
space:
mode:
authorYaroslav Brustinov <ybrustin@cisco.com>2016-08-03 17:24:59 +0300
committerYaroslav Brustinov <ybrustin@cisco.com>2016-08-03 17:24:59 +0300
commitd1b0fd96d72b7daaa88e21b619d04084c7617ad1 (patch)
tree092b65bb5ed34188731726c627ccdcd6896261ce /scripts/automation/trex_control_plane/server
parent775616cd5a8b06c115b5faaf499cbc74fcb8dcc2 (diff)
stf daemon: add support for zipped zmq
Diffstat (limited to 'scripts/automation/trex_control_plane/server')
-rw-r--r--scripts/automation/trex_control_plane/server/zipmsg.py32
-rwxr-xr-xscripts/automation/trex_control_plane/server/zmq_monitor_thread.py16
2 files changed, 42 insertions, 6 deletions
diff --git a/scripts/automation/trex_control_plane/server/zipmsg.py b/scripts/automation/trex_control_plane/server/zipmsg.py
new file mode 100644
index 00000000..397ada16
--- /dev/null
+++ b/scripts/automation/trex_control_plane/server/zipmsg.py
@@ -0,0 +1,32 @@
+import zlib
+import struct
+
+class ZippedMsg:
+
+ MSG_COMPRESS_THRESHOLD = 256
+ MSG_COMPRESS_HEADER_MAGIC = 0xABE85CEA
+
+ def check_threshold (self, msg):
+ return len(msg) >= self.MSG_COMPRESS_THRESHOLD
+
+ def compress (self, msg):
+ # compress
+ compressed = zlib.compress(msg)
+ new_msg = struct.pack(">II", self.MSG_COMPRESS_HEADER_MAGIC, len(msg)) + compressed
+ return new_msg
+
+
+ def decompress (self, msg):
+ if len(msg) < 8:
+ return None
+
+ t = struct.unpack(">II", msg[:8])
+ if (t[0] != self.MSG_COMPRESS_HEADER_MAGIC):
+ return None
+
+ x = zlib.decompress(msg[8:])
+ if len(x) != t[1]:
+ return None
+
+ return x
+
diff --git a/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py b/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py
index 4fc263df..f559ebc1 100755
--- a/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py
+++ b/scripts/automation/trex_control_plane/server/zmq_monitor_thread.py
@@ -6,6 +6,7 @@ import zmq
import threading
import logging
import CCustomLogger
+import zipmsg
from json import JSONDecoder
from common.trex_status_e import TRexStatus
@@ -24,6 +25,7 @@ class ZmqMonitorSession(threading.Thread):
self.trexObj = trexObj
self.expect_trex = self.trexObj.expect_trex # used to signal if TRex is expected to run and if data should be considered
self.decoder = JSONDecoder()
+ self.zipped = zipmsg.ZippedMsg()
logger.info("ZMQ monitor initialization finished")
def run(self):
@@ -60,14 +62,16 @@ class ZmqMonitorSession(threading.Thread):
super(ZmqMonitorSession, self).join(timeout)
def parse_and_update_zmq_dump(self, zmq_dump):
- try:
- dict_obj = self.decoder.decode(zmq_dump.decode(errors = 'replace'))
- except ValueError:
- logger.error("ZMQ dump failed JSON-RPC decode. Ignoring. Bad dump was: {dump}".format(dump=zmq_dump))
- dict_obj = None
+ unzipped = self.zipped.decompress(zmq_dump)
+ if unzipped:
+ zmq_dump = unzipped
+ dict_obj = self.decoder.decode(zmq_dump.decode(errors = 'replace'))
+
+ if type(dict_obj) is not dict:
+ raise Exception('Expected ZMQ dump of type dict, got: %s' % type(dict_obj))
# add to trex_obj zmq latest dump, based on its 'name' header
- if dict_obj is not None and dict_obj != {}:
+ if dict_obj != {}:
self.trexObj.zmq_dump[dict_obj['name']] = dict_obj
if self.first_dump:
# change TRexStatus from starting to Running once the first ZMQ dump is obtained and parsed successfully