diff options
12 files changed, 335 insertions, 45 deletions
diff --git a/linux/ws_main.py b/linux/ws_main.py index 5636965b..ce7140b9 100755 --- a/linux/ws_main.py +++ b/linux/ws_main.py @@ -170,6 +170,7 @@ rpc_server_src = SrcGroup(dir='src/rpc-server/', 'trex_rpc_jsonrpc_v2_parser.cpp', 'trex_rpc_cmds_table.cpp', 'trex_rpc_cmd.cpp', + 'trex_rpc_zip.cpp', 'commands/trex_rpc_cmd_test.cpp', 'commands/trex_rpc_cmd_general.cpp', @@ -428,6 +429,7 @@ def build_prog (bld, build_obj): linkflags = build_obj.get_link_flags(), source = build_obj.get_src(), use = build_obj.get_use_libs(), + lib = ['z'], rpath = bld.env.RPATH + build_obj.get_rpath(), target = build_obj.get_target()) diff --git a/linux_dpdk/ws_main.py b/linux_dpdk/ws_main.py index 81bc6b28..aec325d0 100755 --- a/linux_dpdk/ws_main.py +++ b/linux_dpdk/ws_main.py @@ -153,6 +153,7 @@ rpc_server_src = SrcGroup(dir='src/rpc-server/', 'trex_rpc_jsonrpc_v2_parser.cpp', 'trex_rpc_cmds_table.cpp', 'trex_rpc_cmd.cpp', + 'trex_rpc_zip.cpp', 'commands/trex_rpc_cmd_test.cpp', 'commands/trex_rpc_cmd_general.cpp', @@ -696,7 +697,7 @@ def build_prog (bld, build_obj): includes =includes_path, cxxflags =(build_obj.get_cxx_flags()+['-std=gnu++11',]), linkflags = build_obj.get_link_flags() , - lib=['pthread','dl'], + lib=['pthread','dl', 'z'], use =[build_obj.get_dpdk_target(),'zmq'], source = bp.file_list(top), target = build_obj.get_target()) diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index 5096e33d..46d33e3e 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -2006,6 +2006,7 @@ class STLClient(object): try: # pcap injection removes all previous streams from the ports self.remove_all_streams(ports = opts.ports) + profile = STLProfile.load_pcap(opts.file[0], opts.ipg_usec, opts.speedup, @@ -2018,4 +2019,5 @@ class STLClient(object): print e.brief() return + return True diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py index ab3c7282..96d1854d 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py @@ -3,10 +3,12 @@ import zmq import json import re -from time import sleep from collections import namedtuple from trex_stl_types import * from utils.common import random_id_gen +import zlib +import struct + class bcolors: BLUE = '\033[94m' @@ -35,12 +37,15 @@ class BatchMessage(object): msg = json.dumps(self.batch_list) - return self.rpc_client.send_raw_msg(msg) + return self.rpc_client.send_msg(msg) # JSON RPC v2.0 client class JsonRpcClient(object): + MSG_COMPRESS_THRESHOLD = 4096 + MSG_COMPRESS_HEADER_MAGIC = 0xABE85CEA + def __init__ (self, default_server, default_port, logger): self.logger = logger self.connected = False @@ -109,14 +114,64 @@ class JsonRpcClient(object): id, msg = self.create_jsonrpc_v2(method_name, params) - return self.send_raw_msg(msg) + return self.send_msg(msg) + + + def compress_msg (self, msg): + # compress + compressed = zlib.compress(msg) + new_msg = struct.pack(">II", self.MSG_COMPRESS_HEADER_MAGIC, len(msg)) + compressed + return new_msg + + + def decompress_msg (self, msg): + if len(msg) < 8: + return None + + t = struct.unpack(">II", msg[:8]) + if (t[0] != self.MSG_COMPRESS_HEADER_MAGIC): + return None + + x = zlib.decompress(msg[8:]) + if len(x) != t[1]: + return None + + return x + + def send_msg (self, msg): + # print before + if self.logger.check_verbose(self.logger.VERBOSE_HIGH): + self.verbose_msg("Sending Request To Server:\n\n" + self.pretty_json(msg) + "\n") + + if len(msg) > self.MSG_COMPRESS_THRESHOLD: + response = self.send_raw_msg(self.compress_msg(msg)) + if response: + response = self.decompress_msg(response) + else: + response = self.send_raw_msg(msg) + + if response == None: + return RC_ERR("*** [RPC] - Failed to decode response from server") + + + # print after + if self.logger.check_verbose(self.logger.VERBOSE_HIGH): + self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n") + + # process response (batch and regular) + + response_json = json.loads(response) + + if isinstance(response_json, list): + return self.process_batch_response(response_json) + else: + return self.process_single_response(response_json) + # low level send of string message def send_raw_msg (self, msg): - self.verbose_msg("Sending Request To Server:\n\n" + self.pretty_json(msg) + "\n") - tries = 0 while True: try: @@ -141,26 +196,11 @@ class JsonRpcClient(object): return RC_ERR("*** [RPC] - Failed to get server response at {0}".format(self.transport)) - self.verbose_msg("Server Response:\n\n" + self.pretty_json(response) + "\n") - - # decode - - # batch ? - response_json = json.loads(response) - - if isinstance(response_json, list): - rc_batch = RC() - - for single_response in response_json: - rc = self.process_single_response(single_response) - rc_batch.add(rc) - - return rc_batch - - else: - return self.process_single_response(response_json) - + return response + + + # processs a single response from server def process_single_response (self, response_json): if (response_json.get("jsonrpc") != "2.0"): @@ -182,6 +222,17 @@ class JsonRpcClient(object): + # process a batch response + def process_batch_response (self, response_json): + rc_batch = RC() + + for single_response in response_json: + rc = self.process_single_response(single_response) + rc_batch.add(rc) + + return rc_batch + + def disconnect (self): if self.connected: self.socket.close(linger = 0) diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py index 56657e22..ce7a630c 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py @@ -3,6 +3,7 @@ from collections import namedtuple, OrderedDict import trex_stl_stats from trex_stl_types import * +import time StreamOnPort = namedtuple('StreamOnPort', ['compiled_stream', 'metadata']) @@ -199,14 +200,13 @@ class Port(object): batch.append(cmd) # meta data for show streams - self.streams[stream.get_id()] = StreamOnPort(stream.to_json(), - Port._generate_stream_metadata(stream)) + #self.streams[stream.get_id()] = StreamOnPort(stream.to_json(), + # Port._generate_stream_metadata(stream)) rc = self.transmit_batch(batch) if not rc: return self.err(rc.err()) - # the only valid state now self.state = self.STATE_STREAMS diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_sim.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_sim.py index 4d720aac..086e46af 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_sim.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_sim.py @@ -27,7 +27,8 @@ from trex_stl_client import STLClient import re import json - +import zlib +import struct import argparse @@ -221,7 +222,12 @@ class STLSim(object): # write to temp file f = tempfile.NamedTemporaryFile(delete = False) - f.write(json.dumps(cmds_json)) + + msg = json.dumps(cmds_json) + compressed = zlib.compress(msg) + new_msg = struct.pack(">II", 0xABE85CEA, len(msg)) + compressed + + f.write(new_msg) f.close() # launch bp-sim diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py index 1a46aae7..efeb5c8a 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_streams.py @@ -157,7 +157,10 @@ class STLStream(object): # packet and VM self.fields['packet'] = packet.dump_pkt() self.fields['vm'] = packet.get_vm_data() - self.packet_desc = packet.pkt_layers_desc() + + + # this is heavy, calculate lazy + self.packet_desc = None if not rx_stats: self.fields['rx_stats'] = {} @@ -194,6 +197,9 @@ class STLStream(object): return self.next def get_pkt_type (self): + if self.packet_desc == None: + self.packet_desc = CScapyTRexPktBuilder.pkt_layers_desc_from_buffer(base64.b64decode(self.fields['packet']['binary'])) + return self.packet_desc def get_pkt_len (self, count_crc = True): @@ -432,14 +438,17 @@ class STLProfile(object): else: next = i + 1 + streams.append(STLStream(name = i, packet = CScapyTRexPktBuilder(pkt_buffer = cap), mode = STLTXSingleBurst(total_pkts = 1), self_start = True if (i == 1) else False, isg = (ts_usec - last_ts_usec), # seconds to usec next = next)) + last_ts_usec = ts_usec + return STLProfile(streams) diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_types.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_types.py index d387ac9c..0a6e64fb 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_types.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_types.py @@ -4,6 +4,7 @@ from utils.text_opts import * from trex_stl_exceptions import * RpcCmdData = namedtuple('RpcCmdData', ['method', 'params']) +TupleRC = namedtuple('RC', ['rc', 'data', 'is_warn']) class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])): __slots__ = () @@ -19,8 +20,7 @@ class RC(): self.rc_list = [] if (rc != None): - tuple_rc = namedtuple('RC', ['rc', 'data', 'is_warn']) - self.rc_list.append(tuple_rc(rc, data, is_warn)) + self.rc_list.append(TupleRC(rc, data, is_warn)) def __nonzero__ (self): return self.good() diff --git a/src/rpc-server/trex_rpc_req_resp_server.cpp b/src/rpc-server/trex_rpc_req_resp_server.cpp index 1e8e177d..da7e8c55 100644 --- a/src/rpc-server/trex_rpc_req_resp_server.cpp +++ b/src/rpc-server/trex_rpc_req_resp_server.cpp @@ -22,6 +22,7 @@ limitations under the License. #include <trex_rpc_server_api.h> #include <trex_rpc_req_resp_server.h> #include <trex_rpc_jsonrpc_v2_parser.h> +#include <trex_rpc_zip.h> #include <unistd.h> #include <sstream> @@ -138,20 +139,33 @@ void TrexRpcServerReqRes::_stop_rpc_thread() { * respondes to the request */ void TrexRpcServerReqRes::handle_request(const std::string &request) { - std::string response_str = process_request(request); - zmq_send(m_socket, response_str.c_str(), response_str.size(), 0); + std::string response; + + process_request(request, response); + + zmq_send(m_socket, response.c_str(), response.size(), 0); +} + +void TrexRpcServerReqRes::process_request(const std::string &request, std::string &response) { + + if (TrexRpcZip::is_compressed(request)) { + process_zipped_request(request, response); + } else { + process_request_raw(request, response); + } + } /** * main processing of the request * */ -std::string TrexRpcServerReqRes::process_request(const std::string &request) { +void TrexRpcServerReqRes::process_request_raw(const std::string &request, std::string &response) { std::vector<TrexJsonRpcV2ParsedObject *> commands; Json::FastWriter writer; - Json::Value response; + Json::Value response_json; /* first parse the request using JSON RPC V2 parser */ TrexJsonRpcV2Parser rpc_request(request); @@ -171,7 +185,7 @@ std::string TrexRpcServerReqRes::process_request(const std::string &request) { command->execute(single_response); delete command; - response[index++] = single_response; + response_json[index++] = single_response; } @@ -181,17 +195,32 @@ std::string TrexRpcServerReqRes::process_request(const std::string &request) { } /* write the JSON to string and sever on ZMQ */ - std::string response_str; if (response.size() == 1) { - response_str = writer.write(response[0]); + response = writer.write(response_json[0]); } else { - response_str = writer.write(response); + response = writer.write(response_json); } - verbose_json("Server Replied: ", response_str); + verbose_json("Server Replied: ", response); + +} + +void TrexRpcServerReqRes::process_zipped_request(const std::string &request, std::string &response) { + std::string unzipped; + + /* try to uncomrpess - if fails, last shot is the JSON RPC */ + bool rc = TrexRpcZip::uncompress(request, unzipped); + if (!rc) { + return process_request_raw(request, response); + } + + /* process the request */ + std::string raw_response; + process_request_raw(unzipped, raw_response); + + TrexRpcZip::compress(raw_response, response); - return response_str; } /** @@ -218,7 +247,11 @@ TrexRpcServerReqRes::handle_server_error(const std::string &specific_err) { std::string TrexRpcServerReqRes::test_inject_request(const std::string &req) { - return process_request(req); + std::string response; + + process_request(req, response); + + return response; } diff --git a/src/rpc-server/trex_rpc_req_resp_server.h b/src/rpc-server/trex_rpc_req_resp_server.h index 97efbe08..979bf9af 100644 --- a/src/rpc-server/trex_rpc_req_resp_server.h +++ b/src/rpc-server/trex_rpc_req_resp_server.h @@ -45,7 +45,9 @@ protected: bool fetch_one_request(std::string &msg); void handle_request(const std::string &request); - std::string process_request(const std::string &request); + void process_request(const std::string &request, std::string &response); + void process_request_raw(const std::string &request, std::string &response); + void process_zipped_request(const std::string &request, std::string &response); void handle_server_error(const std::string &specific_err); diff --git a/src/rpc-server/trex_rpc_zip.cpp b/src/rpc-server/trex_rpc_zip.cpp new file mode 100644 index 00000000..ef5c4834 --- /dev/null +++ b/src/rpc-server/trex_rpc_zip.cpp @@ -0,0 +1,124 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include "trex_rpc_zip.h" +#include <zlib.h> +#include <arpa/inet.h> +#include <iostream> + +bool +TrexRpcZip::is_compressed(const std::string &input) { + /* check for minimum size */ + if (input.size() < sizeof(header_st)) { + return false; + } + + /* cast */ + const header_st *header = (header_st *)input.c_str(); + + /* check magic */ + uint32_t magic = ntohl(header->magic); + if (magic != G_HEADER_MAGIC) { + return false; + } + + return true; +} + +bool +TrexRpcZip::uncompress(const std::string &input, std::string &output) { + + /* sanity check first */ + if (!is_compressed(input)) { + return false; + } + + /* cast */ + const header_st *header = (header_st *)input.c_str(); + + /* original size */ + uint32_t uncmp_size = ntohl(header->uncmp_size); + + /* alocate dynamic space for the uncomrpessed buffer */ + Bytef *u_buffer = new Bytef[uncmp_size]; + if (!u_buffer) { + return false; + } + + /* set the target buffer size */ + uLongf dest_len = uncmp_size; + + /* try to uncompress */ + int z_err = ::uncompress(u_buffer, + &dest_len, + (const Bytef *)header->data, + (uLong)input.size() - sizeof(header_st)); + + if (z_err != Z_OK) { + delete [] u_buffer; + return false; + } + + output.append((const char *)u_buffer, dest_len); + + delete [] u_buffer; + return true; +} + + +bool +TrexRpcZip::compress(const std::string &input, std::string &output) { + + /* get a bound */ + int bound_size = compressBound((uLong)input.size()) + sizeof(header_st); + + /* alocate dynamic space for the uncomrpessed buffer */ + char *buffer = new char[bound_size]; + if (!buffer) { + return false; + } + + header_st *header = (header_st *)buffer; + uLongf destLen = bound_size; + + int z_err = ::compress((Bytef *)header->data, + &destLen, + (const Bytef *)input.c_str(), + (uLong)input.size()); + + if (z_err != Z_OK) { + delete [] buffer; + return false; + } + + /* terminate string */ + header->data[destLen] = 0; + + /* add the header */ + header->magic = htonl(G_HEADER_MAGIC); + header->uncmp_size = htonl(input.size()); + + output.append((const char *)header, bound_size); + + delete [] buffer; + + return true; +} diff --git a/src/rpc-server/trex_rpc_zip.h b/src/rpc-server/trex_rpc_zip.h new file mode 100644 index 00000000..4b9930b4 --- /dev/null +++ b/src/rpc-server/trex_rpc_zip.h @@ -0,0 +1,60 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#include <string> + +class TrexRpcZip { +public: + + /** + * return true if message is compressed + * + */ + static bool is_compressed(const std::string &input); + + /** + * uncompress an 'input' to 'output' + * on success return true else returns false + */ + static bool uncompress(const std::string &input, std::string &output); + + /** + * compress 'input' to 'output' + * on success return true else returns false + */ + static bool compress(const std::string &input, std::string &output); + +private: + + /** + * packed header for reading binary compressed messages + * + * @author imarom (15-Feb-16) + */ + struct header_st { + uint32_t magic; + uint32_t uncmp_size; + char data[0]; + } __attribute__((packed)); + + + static const uint32_t G_HEADER_MAGIC = 0xABE85CEA; +}; + |