diff options
9 files changed, 88 insertions, 40 deletions
diff --git a/scripts/automation/trex_control_plane/stl/console/trex_console.py b/scripts/automation/trex_control_plane/stl/console/trex_console.py index 110457d6..5d23d8da 100755 --- a/scripts/automation/trex_control_plane/stl/console/trex_console.py +++ b/scripts/automation/trex_control_plane/stl/console/trex_console.py @@ -440,6 +440,7 @@ class TRexConsole(TRexGeneralCmd): if (l > 2) and (s[l - 2] in file_flags): return TRexConsole.tree_autocomplete(s[l - 1]) + complete_push = complete_start @verify_connected def do_start(self, line): diff --git a/scripts/automation/trex_control_plane/stl/console/trex_tui.py b/scripts/automation/trex_control_plane/stl/console/trex_tui.py index a69c4165..0ac2f6a2 100644 --- a/scripts/automation/trex_control_plane/stl/console/trex_tui.py +++ b/scripts/automation/trex_control_plane/stl/console/trex_tui.py @@ -547,6 +547,7 @@ class TrexTUI(): try: self.stateless_client.connect() + self.stateless_client.acquire() self.state = self.STATE_ACTIVE except STLError: self.state = self.STATE_LOST_CONT @@ -720,8 +721,10 @@ class AsyncKeysEngineConsole: self.ac = {'start' : client.start_line, 'stop' : client.stop_line, 'pause' : client.pause_line, + 'clear' : client.clear_stats_line, 'push' : client.push_line, 'resume' : client.resume_line, + 'reset' : client.reset_line, 'update' : client.update_line, 'connect' : client.connect_line, 'disconnect' : client.disconnect_line, diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py index 0f73792a..2c95844b 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py @@ -13,6 +13,7 @@ from .trex_stl_jsonrpc_client import JsonRpcClient, BatchMessage from .utils.text_opts import * from .trex_stl_stats import * from .trex_stl_types import * +from .utils.zipmsg import ZippedMsg # basic async stats class class CTRexAsyncStats(object): @@ -156,7 +157,9 @@ class CTRexAsyncClient(): self.monitor = AsyncUtil() self.connected = False - + + self.zipped = ZippedMsg() + # connects the async channel def connect (self): @@ -214,7 +217,7 @@ class CTRexAsyncClient(): # done self.connected = False - + # thread function def _run (self): @@ -232,10 +235,17 @@ class CTRexAsyncClient(): try: with self.monitor: - line = self.socket.recv_string() + line = self.socket.recv() self.monitor.on_recv_msg(line) + # try to decomrpess + unzipped = self.zipped.decompress(line) + if unzipped: + line = unzipped + + line = line.decode() + self.last_data_recv_ts = time.time() # signal once diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index 4e3d3092..70be3b75 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -511,7 +511,7 @@ class STLClient(object): self.connected = False # API classes - self.api_vers = [ {'type': 'core', 'major': 1, 'minor': 3 } ] + self.api_vers = [ {'type': 'core', 'major': 2, 'minor': 3 } ] self.api_h = {'core': None} # logger @@ -1018,7 +1018,7 @@ class STLClient(object): try: ret = f(*args, **kwargs) except KeyboardInterrupt as e: - raise STLError("Test was interrupted by a keyboard signal (probably ctrl + c)") + raise STLError("Interrupted by a keyboard signal (probably ctrl + c)") return ret return wrap2 @@ -2834,7 +2834,7 @@ class STLClient(object): self.clear_stats(opts.ports) - + return RC_OK() @__console diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py index 065a1442..609ea076 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py @@ -9,6 +9,7 @@ import struct from .trex_stl_types import * from .utils.common import random_id_gen +from .utils.zipmsg import ZippedMsg class bcolors: BLUE = '\033[94m' @@ -43,9 +44,6 @@ class BatchMessage(object): # JSON RPC v2.0 client class JsonRpcClient(object): - MSG_COMPRESS_THRESHOLD = 4096 - MSG_COMPRESS_HEADER_MAGIC = 0xABE85CEA - def __init__ (self, default_server, default_port, client): self.client_api = client.api_h self.logger = client.logger @@ -56,7 +54,7 @@ class JsonRpcClient(object): self.server = default_server self.id_gen = random_id_gen() - + self.zipper = ZippedMsg() def get_connection_details (self): rc = {} @@ -121,28 +119,7 @@ class JsonRpcClient(object): return self.send_msg(msg) - - def compress_msg (self, msg): - # compress - compressed = zlib.compress(msg) - new_msg = struct.pack(">II", self.MSG_COMPRESS_HEADER_MAGIC, len(msg)) + compressed - return new_msg - - - def decompress_msg (self, msg): - if len(msg) < 8: - return None - - t = struct.unpack(">II", msg[:8]) - if (t[0] != self.MSG_COMPRESS_HEADER_MAGIC): - return None - - x = zlib.decompress(msg[8:]) - if len(x) != t[1]: - return None - - return x - + def send_msg (self, msg): # print before if self.logger.check_verbose(self.logger.VERBOSE_HIGH): @@ -151,10 +128,10 @@ class JsonRpcClient(object): # encode string to buffer buffer = msg.encode() - if len(buffer) > self.MSG_COMPRESS_THRESHOLD: - response = self.send_raw_msg(self.compress_msg(buffer)) + if self.zipper.check_threshold(buffer): + response = self.send_raw_msg(self.zipper.compress(buffer)) if response: - response = self.decompress_msg(response) + response = self.zipper.decompress(response) else: response = self.send_raw_msg(buffer) diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/zipmsg.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/zipmsg.py new file mode 100644 index 00000000..397ada16 --- /dev/null +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/zipmsg.py @@ -0,0 +1,32 @@ +import zlib +import struct + +class ZippedMsg: + + MSG_COMPRESS_THRESHOLD = 256 + MSG_COMPRESS_HEADER_MAGIC = 0xABE85CEA + + def check_threshold (self, msg): + return len(msg) >= self.MSG_COMPRESS_THRESHOLD + + def compress (self, msg): + # compress + compressed = zlib.compress(msg) + new_msg = struct.pack(">II", self.MSG_COMPRESS_HEADER_MAGIC, len(msg)) + compressed + return new_msg + + + def decompress (self, msg): + if len(msg) < 8: + return None + + t = struct.unpack(">II", msg[:8]) + if (t[0] != self.MSG_COMPRESS_HEADER_MAGIC): + return None + + x = zlib.decompress(msg[8:]) + if len(x) != t[1]: + return None + + return x + diff --git a/src/publisher/trex_publisher.cpp b/src/publisher/trex_publisher.cpp index f56d56df..850b5955 100644 --- a/src/publisher/trex_publisher.cpp +++ b/src/publisher/trex_publisher.cpp @@ -20,6 +20,7 @@ limitations under the License. */ #include "trex_publisher.h" +#include "trex_rpc_zip.h" #include <zmq.h> #include <assert.h> #include <sstream> @@ -73,13 +74,32 @@ TrexPublisher::Delete(){ void -TrexPublisher::publish_json(const std::string &s){ +TrexPublisher::publish_json(const std::string &s, uint32_t zip_threshold){ + if (m_publisher) { - int size = zmq_send (m_publisher, s.c_str(), s.length(), 0); - assert(size == s.length()); + if ( (zip_threshold > 0) && (s.size() > zip_threshold) ) { + publish_zipped_json(s); + } else { + publish_raw_json(s); + } } } +void +TrexPublisher::publish_zipped_json(const std::string &s) { + std::string compressed_msg; + + TrexRpcZip::compress(s, compressed_msg); + int size = zmq_send (m_publisher, compressed_msg.c_str(), compressed_msg.length(), 0); + assert(size == compressed_msg.length()); +} + +void +TrexPublisher::publish_raw_json(const std::string &s) { + int size = zmq_send (m_publisher, s.c_str(), s.length(), 0); + assert(size == s.length()); +} + void TrexPublisher::publish_event(event_type_e type, const Json::Value &data) { Json::FastWriter writer; diff --git a/src/publisher/trex_publisher.h b/src/publisher/trex_publisher.h index 1d283478..fb7226c4 100644 --- a/src/publisher/trex_publisher.h +++ b/src/publisher/trex_publisher.h @@ -38,7 +38,7 @@ public: virtual bool Create(uint16_t port, bool disable); virtual void Delete(); - virtual void publish_json(const std::string &s); + virtual void publish_json(const std::string &s, uint32_t zip_threshold = MSG_COMPRESS_THRESHOLD); enum event_type_e { EVENT_PORT_STARTED = 0, @@ -71,9 +71,14 @@ public: private: void show_zmq_last_error(const std::string &err); + void publish_zipped_json(const std::string &s); + void publish_raw_json(const std::string &s); + private: void * m_context; void * m_publisher; + + static const int MSG_COMPRESS_THRESHOLD = 256; }; #endif /* __TREX_PUBLISHER_H__ */ diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index 6d80539c..22389d6a 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -54,7 +54,7 @@ TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) { m_publisher = cfg.m_publisher; /* API core version */ - const int API_VER_MAJOR = 1; + const int API_VER_MAJOR = 2; const int API_VER_MINOR = 3; m_api_classes[APIClass::API_CLASS_TYPE_CORE].init(APIClass::API_CLASS_TYPE_CORE, API_VER_MAJOR, |