diff options
author | 2016-01-04 23:31:31 +0200 | |
---|---|---|
committer | 2016-01-04 23:31:31 +0200 | |
commit | 629b54c4c9df9c718d818a004ecf15c2cf6c770a (patch) | |
tree | 7dfc3c64c7561032d690ce6188130e80d344054e /src/rpc-server | |
parent | 3757099103ed1bf56f85ccf5bb861a331287cbbb (diff) | |
parent | 857bdcf05a920b99e1cf180c700176b04801da00 (diff) |
Merge branch 'master' into dan_stateless
Diffstat (limited to 'src/rpc-server')
-rw-r--r-- | src/rpc-server/commands/trex_rpc_cmd_stream.cpp | 23 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_async_server.cpp | 4 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_async_server.h | 1 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_cmd.cpp | 6 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_cmd_api.h | 12 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_req_resp_server.cpp | 56 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_req_resp_server.h | 26 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_server.cpp | 23 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_server_api.h | 25 |
9 files changed, 159 insertions, 17 deletions
diff --git a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp index d8f7e772..51db0b20 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp @@ -82,7 +82,7 @@ TrexRpcCmdAddStream::_run(const Json::Value ¶ms, Json::Value &result) { stream->m_pkt.meta = parse_string(pkt, "meta", result); /* parse VM */ - const Json::Value &vm = parse_array(section ,"vm", result); + const Json::Value &vm = parse_object(section ,"vm", result); parse_vm(vm, stream, result); /* parse RX info */ @@ -230,8 +230,7 @@ TrexRpcCmdAddStream::parse_vm_instr_flow_var(const Json::Value &inst, TrexStream op_type, init_value, min_value, - max_value - )); + max_value)); } void @@ -249,9 +248,12 @@ TrexRpcCmdAddStream::parse_vm_instr_write_flow_var(const Json::Value &inst, Trex void TrexRpcCmdAddStream::parse_vm(const Json::Value &vm, TrexStream *stream, Json::Value &result) { + + const Json::Value &instructions = parse_array(vm ,"instructions", result); + /* array of VM instructions on vm */ - for (int i = 0; i < vm.size(); i++) { - const Json::Value & inst = parse_object(vm, i, result); + for (int i = 0; i < instructions.size(); i++) { + const Json::Value & inst = parse_object(instructions, i, result); auto vm_types = {"fix_checksum_ipv4", "flow_var", "write_flow_var","tuple_flow_var"}; std::string vm_type = parse_choice(inst, "type", vm_types, result); @@ -273,6 +275,17 @@ TrexRpcCmdAddStream::parse_vm(const Json::Value &vm, TrexStream *stream, Json::V throw TrexRpcException("internal error"); } } + + const std::string &var_name = parse_string(vm, "split_by_var", result); + if (var_name != "") { + StreamVmInstructionVar *instr = stream->m_vm.lookup_var_by_name(var_name); + if (!instr) { + std::stringstream ss; + ss << "VM: request to split by variable '" << var_name << "' but does not exists"; + generate_parse_err(result, ss.str()); + } + stream->m_vm.set_split_instruction(instr); + } } void diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp index 46fe499b..6e5fbfc6 100644 --- a/src/rpc-server/trex_rpc_async_server.cpp +++ b/src/rpc-server/trex_rpc_async_server.cpp @@ -41,6 +41,10 @@ TrexRpcServerAsync::TrexRpcServerAsync(const TrexRpcServerConfig &cfg, std::mute m_context = zmq_ctx_new(); } +void +TrexRpcServerAsync::_prepare() { +} + /** * publisher thread * diff --git a/src/rpc-server/trex_rpc_async_server.h b/src/rpc-server/trex_rpc_async_server.h index 02d1490e..80d92c2f 100644 --- a/src/rpc-server/trex_rpc_async_server.h +++ b/src/rpc-server/trex_rpc_async_server.h @@ -36,6 +36,7 @@ public: TrexRpcServerAsync(const TrexRpcServerConfig &cfg, std::mutex *lock = NULL); protected: + void _prepare(); void _rpc_thread_cb(); void _stop_rpc_thread(); diff --git a/src/rpc-server/trex_rpc_cmd.cpp b/src/rpc-server/trex_rpc_cmd.cpp index b5dd121c..aea7980b 100644 --- a/src/rpc-server/trex_rpc_cmd.cpp +++ b/src/rpc-server/trex_rpc_cmd.cpp @@ -32,7 +32,7 @@ TrexRpcCommand::run(const Json::Value ¶ms, Json::Value &result) { check_param_count(params, m_param_count, result); - if (m_needs_ownership) { + if (m_needs_ownership && !g_test_override_ownership) { verify_ownership(params, result); } @@ -372,3 +372,7 @@ TrexRpcCommand::generate_execute_err(Json::Value &result, const std::string &msg throw (TrexRpcCommandException(TREX_RPC_CMD_EXECUTE_ERR)); } +/** + * by default this is off + */ +bool TrexRpcCommand::g_test_override_ownership = false; diff --git a/src/rpc-server/trex_rpc_cmd_api.h b/src/rpc-server/trex_rpc_cmd_api.h index 7cbdf4ff..675d2900 100644 --- a/src/rpc-server/trex_rpc_cmd_api.h +++ b/src/rpc-server/trex_rpc_cmd_api.h @@ -89,6 +89,16 @@ public: return m_name; } + /** + * on test we enable this override + * + * + * @param enable + */ + static void test_set_override_ownership(bool enable) { + g_test_override_ownership = enable; + } + virtual ~TrexRpcCommand() {} protected: @@ -241,6 +251,8 @@ protected: std::string m_name; int m_param_count; bool m_needs_ownership; + + static bool g_test_override_ownership; }; #endif /* __TREX_RPC_CMD_API_H__ */ diff --git a/src/rpc-server/trex_rpc_req_resp_server.cpp b/src/rpc-server/trex_rpc_req_resp_server.cpp index eb7825ac..1e8e177d 100644 --- a/src/rpc-server/trex_rpc_req_resp_server.cpp +++ b/src/rpc-server/trex_rpc_req_resp_server.cpp @@ -36,7 +36,10 @@ limitations under the License. * */ TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock) : TrexRpcServerInterface(cfg, "req resp", lock) { - /* ZMQ is not thread safe - this should be outside */ + +} + +void TrexRpcServerReqRes::_prepare() { m_context = zmq_ctx_new(); } @@ -123,15 +126,28 @@ TrexRpcServerReqRes::fetch_one_request(std::string &msg) { */ void TrexRpcServerReqRes::_stop_rpc_thread() { /* by calling zmq_term we signal the blocked thread to exit */ - zmq_term(m_context); + if (m_context) { + zmq_term(m_context); + } } + /** * handles a request given to the server * respondes to the request */ void TrexRpcServerReqRes::handle_request(const std::string &request) { + std::string response_str = process_request(request); + zmq_send(m_socket, response_str.c_str(), response_str.size(), 0); +} + +/** + * main processing of the request + * + */ +std::string TrexRpcServerReqRes::process_request(const std::string &request) { + std::vector<TrexJsonRpcV2ParsedObject *> commands; Json::FastWriter writer; @@ -175,8 +191,7 @@ void TrexRpcServerReqRes::handle_request(const std::string &request) { verbose_json("Server Replied: ", response_str); - zmq_send(m_socket, response_str.c_str(), response_str.size(), 0); - + return response_str; } /** @@ -198,3 +213,36 @@ TrexRpcServerReqRes::handle_server_error(const std::string &specific_err) { zmq_send(m_socket, response_str.c_str(), response_str.size(), 0); } + + + +std::string +TrexRpcServerReqRes::test_inject_request(const std::string &req) { + return process_request(req); +} + + +/** + * MOCK req resp server + */ +TrexRpcServerReqResMock::TrexRpcServerReqResMock(const TrexRpcServerConfig &cfg) : TrexRpcServerReqRes(cfg) { +} + +/** + * override start + * + */ +void +TrexRpcServerReqResMock::start() { + +} + + +/** + * override stop + */ +void +TrexRpcServerReqResMock::stop() { + +} + diff --git a/src/rpc-server/trex_rpc_req_resp_server.h b/src/rpc-server/trex_rpc_req_resp_server.h index 2876206c..97efbe08 100644 --- a/src/rpc-server/trex_rpc_req_resp_server.h +++ b/src/rpc-server/trex_rpc_req_resp_server.h @@ -34,13 +34,19 @@ public: TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock = NULL); + /* for test purposes - bypass the ZMQ and inject a message */ + std::string test_inject_request(const std::string &req); + protected: + + void _prepare(); void _rpc_thread_cb(); void _stop_rpc_thread(); -private: bool fetch_one_request(std::string &msg); void handle_request(const std::string &request); + std::string process_request(const std::string &request); + void handle_server_error(const std::string &specific_err); void *m_context; @@ -48,4 +54,22 @@ private: }; +/** + * a mock req resp server (for tests) + * + * @author imarom (03-Jan-16) + */ +class TrexRpcServerReqResMock : public TrexRpcServerReqRes { + +public: + TrexRpcServerReqResMock(const TrexRpcServerConfig &cfg); + + /* override the interface functions */ + virtual void start(); + virtual void stop(); + + +}; + #endif /* __TREX_RPC_REQ_RESP_API_H__ */ + diff --git a/src/rpc-server/trex_rpc_server.cpp b/src/rpc-server/trex_rpc_server.cpp index a14e6f97..1dfc4494 100644 --- a/src/rpc-server/trex_rpc_server.cpp +++ b/src/rpc-server/trex_rpc_server.cpp @@ -63,6 +63,9 @@ void TrexRpcServerInterface::start() { verbose_msg("Starting RPC Server"); + /* prepare for run */ + _prepare(); + m_thread = new std::thread(&TrexRpcServerInterface::_rpc_thread_cb, this); if (!m_thread) { throw TrexRpcException("unable to create RPC thread"); @@ -117,9 +120,18 @@ TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg, const TrexRpcServerConfig *async_cfg, std::mutex *lock) { + m_req_resp = NULL; + /* add the request response server */ if (req_resp_cfg) { - m_servers.push_back(new TrexRpcServerReqRes(*req_resp_cfg, lock)); + + if (req_resp_cfg->get_protocol() == TrexRpcServerConfig::RPC_PROT_MOCK) { + m_req_resp = new TrexRpcServerReqResMock(*req_resp_cfg); + } else { + m_req_resp = new TrexRpcServerReqRes(*req_resp_cfg, lock); + } + + m_servers.push_back(m_req_resp); } /* add async publisher */ @@ -166,3 +178,12 @@ void TrexRpcServer::set_verbose(bool verbose) { } } + +std::string +TrexRpcServer::test_inject_request(const std::string &req_str) { + if (m_req_resp) { + return m_req_resp->test_inject_request(req_str); + } else { + return ""; + } +} diff --git a/src/rpc-server/trex_rpc_server_api.h b/src/rpc-server/trex_rpc_server_api.h index ff876ac4..1ab5dce9 100644 --- a/src/rpc-server/trex_rpc_server_api.h +++ b/src/rpc-server/trex_rpc_server_api.h @@ -29,8 +29,10 @@ limitations under the License. #include <string> #include <stdexcept> #include <trex_rpc_exception_api.h> +#include <json/json.h> class TrexRpcServerInterface; +class TrexRpcServerReqRes; /** * defines a configuration of generic RPC server @@ -41,18 +43,19 @@ class TrexRpcServerConfig { public: enum rpc_prot_e { - RPC_PROT_TCP + RPC_PROT_TCP, + RPC_PROT_MOCK }; TrexRpcServerConfig(rpc_prot_e protocol, uint16_t port) : m_protocol(protocol), m_port(port) { } - uint16_t get_port() { + uint16_t get_port() const { return m_port; } - rpc_prot_e get_protocol() { + rpc_prot_e get_protocol() const { return m_protocol; } @@ -76,13 +79,13 @@ public: * starts the server * */ - void start(); + virtual void start(); /** * stops the server * */ - void stop(); + virtual void stop(); /** * set verbose on or off @@ -107,6 +110,7 @@ protected: * instances implement this * */ + virtual void _prepare() = 0; virtual void _rpc_thread_cb() = 0; virtual void _stop_rpc_thread() = 0; @@ -169,12 +173,23 @@ public: } + /** + * allow injecting of a JSON and get a response + * + * @author imarom (27-Dec-15) + * + * @return std::string + */ + std::string test_inject_request(const std::string &request_str); private: static std::string generate_handler(); std::vector<TrexRpcServerInterface *> m_servers; + // an alias to the req resp server + TrexRpcServerReqRes *m_req_resp; + bool m_verbose; static const std::string s_server_uptime; |