diff options
Diffstat (limited to 'src/rpc-server')
-rw-r--r-- | src/rpc-server/trex_rpc_async_server.cpp | 4 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_async_server.h | 1 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_cmd.cpp | 6 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_cmd_api.h | 12 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_req_resp_server.cpp | 56 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_req_resp_server.h | 26 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_server.cpp | 23 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_server_api.h | 25 |
8 files changed, 141 insertions, 12 deletions
diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp index 46fe499b..6e5fbfc6 100644 --- a/src/rpc-server/trex_rpc_async_server.cpp +++ b/src/rpc-server/trex_rpc_async_server.cpp @@ -41,6 +41,10 @@ TrexRpcServerAsync::TrexRpcServerAsync(const TrexRpcServerConfig &cfg, std::mute m_context = zmq_ctx_new(); } +void +TrexRpcServerAsync::_prepare() { +} + /** * publisher thread * diff --git a/src/rpc-server/trex_rpc_async_server.h b/src/rpc-server/trex_rpc_async_server.h index 02d1490e..80d92c2f 100644 --- a/src/rpc-server/trex_rpc_async_server.h +++ b/src/rpc-server/trex_rpc_async_server.h @@ -36,6 +36,7 @@ public: TrexRpcServerAsync(const TrexRpcServerConfig &cfg, std::mutex *lock = NULL); protected: + void _prepare(); void _rpc_thread_cb(); void _stop_rpc_thread(); diff --git a/src/rpc-server/trex_rpc_cmd.cpp b/src/rpc-server/trex_rpc_cmd.cpp index b5dd121c..aea7980b 100644 --- a/src/rpc-server/trex_rpc_cmd.cpp +++ b/src/rpc-server/trex_rpc_cmd.cpp @@ -32,7 +32,7 @@ TrexRpcCommand::run(const Json::Value ¶ms, Json::Value &result) { check_param_count(params, m_param_count, result); - if (m_needs_ownership) { + if (m_needs_ownership && !g_test_override_ownership) { verify_ownership(params, result); } @@ -372,3 +372,7 @@ TrexRpcCommand::generate_execute_err(Json::Value &result, const std::string &msg throw (TrexRpcCommandException(TREX_RPC_CMD_EXECUTE_ERR)); } +/** + * by default this is off + */ +bool TrexRpcCommand::g_test_override_ownership = false; diff --git a/src/rpc-server/trex_rpc_cmd_api.h b/src/rpc-server/trex_rpc_cmd_api.h index 7cbdf4ff..675d2900 100644 --- a/src/rpc-server/trex_rpc_cmd_api.h +++ b/src/rpc-server/trex_rpc_cmd_api.h @@ -89,6 +89,16 @@ public: return m_name; } + /** + * on test we enable this override + * + * + * @param enable + */ + static void test_set_override_ownership(bool enable) { + g_test_override_ownership = enable; + } + virtual ~TrexRpcCommand() {} protected: @@ -241,6 +251,8 @@ protected: std::string m_name; int m_param_count; bool m_needs_ownership; + + static bool g_test_override_ownership; }; #endif /* __TREX_RPC_CMD_API_H__ */ diff --git a/src/rpc-server/trex_rpc_req_resp_server.cpp b/src/rpc-server/trex_rpc_req_resp_server.cpp index eb7825ac..1e8e177d 100644 --- a/src/rpc-server/trex_rpc_req_resp_server.cpp +++ b/src/rpc-server/trex_rpc_req_resp_server.cpp @@ -36,7 +36,10 @@ limitations under the License. * */ TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock) : TrexRpcServerInterface(cfg, "req resp", lock) { - /* ZMQ is not thread safe - this should be outside */ + +} + +void TrexRpcServerReqRes::_prepare() { m_context = zmq_ctx_new(); } @@ -123,15 +126,28 @@ TrexRpcServerReqRes::fetch_one_request(std::string &msg) { */ void TrexRpcServerReqRes::_stop_rpc_thread() { /* by calling zmq_term we signal the blocked thread to exit */ - zmq_term(m_context); + if (m_context) { + zmq_term(m_context); + } } + /** * handles a request given to the server * respondes to the request */ void TrexRpcServerReqRes::handle_request(const std::string &request) { + std::string response_str = process_request(request); + zmq_send(m_socket, response_str.c_str(), response_str.size(), 0); +} + +/** + * main processing of the request + * + */ +std::string TrexRpcServerReqRes::process_request(const std::string &request) { + std::vector<TrexJsonRpcV2ParsedObject *> commands; Json::FastWriter writer; @@ -175,8 +191,7 @@ void TrexRpcServerReqRes::handle_request(const std::string &request) { verbose_json("Server Replied: ", response_str); - zmq_send(m_socket, response_str.c_str(), response_str.size(), 0); - + return response_str; } /** @@ -198,3 +213,36 @@ TrexRpcServerReqRes::handle_server_error(const std::string &specific_err) { zmq_send(m_socket, response_str.c_str(), response_str.size(), 0); } + + + +std::string +TrexRpcServerReqRes::test_inject_request(const std::string &req) { + return process_request(req); +} + + +/** + * MOCK req resp server + */ +TrexRpcServerReqResMock::TrexRpcServerReqResMock(const TrexRpcServerConfig &cfg) : TrexRpcServerReqRes(cfg) { +} + +/** + * override start + * + */ +void +TrexRpcServerReqResMock::start() { + +} + + +/** + * override stop + */ +void +TrexRpcServerReqResMock::stop() { + +} + diff --git a/src/rpc-server/trex_rpc_req_resp_server.h b/src/rpc-server/trex_rpc_req_resp_server.h index 2876206c..97efbe08 100644 --- a/src/rpc-server/trex_rpc_req_resp_server.h +++ b/src/rpc-server/trex_rpc_req_resp_server.h @@ -34,13 +34,19 @@ public: TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock = NULL); + /* for test purposes - bypass the ZMQ and inject a message */ + std::string test_inject_request(const std::string &req); + protected: + + void _prepare(); void _rpc_thread_cb(); void _stop_rpc_thread(); -private: bool fetch_one_request(std::string &msg); void handle_request(const std::string &request); + std::string process_request(const std::string &request); + void handle_server_error(const std::string &specific_err); void *m_context; @@ -48,4 +54,22 @@ private: }; +/** + * a mock req resp server (for tests) + * + * @author imarom (03-Jan-16) + */ +class TrexRpcServerReqResMock : public TrexRpcServerReqRes { + +public: + TrexRpcServerReqResMock(const TrexRpcServerConfig &cfg); + + /* override the interface functions */ + virtual void start(); + virtual void stop(); + + +}; + #endif /* __TREX_RPC_REQ_RESP_API_H__ */ + diff --git a/src/rpc-server/trex_rpc_server.cpp b/src/rpc-server/trex_rpc_server.cpp index a14e6f97..1dfc4494 100644 --- a/src/rpc-server/trex_rpc_server.cpp +++ b/src/rpc-server/trex_rpc_server.cpp @@ -63,6 +63,9 @@ void TrexRpcServerInterface::start() { verbose_msg("Starting RPC Server"); + /* prepare for run */ + _prepare(); + m_thread = new std::thread(&TrexRpcServerInterface::_rpc_thread_cb, this); if (!m_thread) { throw TrexRpcException("unable to create RPC thread"); @@ -117,9 +120,18 @@ TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg, const TrexRpcServerConfig *async_cfg, std::mutex *lock) { + m_req_resp = NULL; + /* add the request response server */ if (req_resp_cfg) { - m_servers.push_back(new TrexRpcServerReqRes(*req_resp_cfg, lock)); + + if (req_resp_cfg->get_protocol() == TrexRpcServerConfig::RPC_PROT_MOCK) { + m_req_resp = new TrexRpcServerReqResMock(*req_resp_cfg); + } else { + m_req_resp = new TrexRpcServerReqRes(*req_resp_cfg, lock); + } + + m_servers.push_back(m_req_resp); } /* add async publisher */ @@ -166,3 +178,12 @@ void TrexRpcServer::set_verbose(bool verbose) { } } + +std::string +TrexRpcServer::test_inject_request(const std::string &req_str) { + if (m_req_resp) { + return m_req_resp->test_inject_request(req_str); + } else { + return ""; + } +} diff --git a/src/rpc-server/trex_rpc_server_api.h b/src/rpc-server/trex_rpc_server_api.h index ff876ac4..1ab5dce9 100644 --- a/src/rpc-server/trex_rpc_server_api.h +++ b/src/rpc-server/trex_rpc_server_api.h @@ -29,8 +29,10 @@ limitations under the License. #include <string> #include <stdexcept> #include <trex_rpc_exception_api.h> +#include <json/json.h> class TrexRpcServerInterface; +class TrexRpcServerReqRes; /** * defines a configuration of generic RPC server @@ -41,18 +43,19 @@ class TrexRpcServerConfig { public: enum rpc_prot_e { - RPC_PROT_TCP + RPC_PROT_TCP, + RPC_PROT_MOCK }; TrexRpcServerConfig(rpc_prot_e protocol, uint16_t port) : m_protocol(protocol), m_port(port) { } - uint16_t get_port() { + uint16_t get_port() const { return m_port; } - rpc_prot_e get_protocol() { + rpc_prot_e get_protocol() const { return m_protocol; } @@ -76,13 +79,13 @@ public: * starts the server * */ - void start(); + virtual void start(); /** * stops the server * */ - void stop(); + virtual void stop(); /** * set verbose on or off @@ -107,6 +110,7 @@ protected: * instances implement this * */ + virtual void _prepare() = 0; virtual void _rpc_thread_cb() = 0; virtual void _stop_rpc_thread() = 0; @@ -169,12 +173,23 @@ public: } + /** + * allow injecting of a JSON and get a response + * + * @author imarom (27-Dec-15) + * + * @return std::string + */ + std::string test_inject_request(const std::string &request_str); private: static std::string generate_handler(); std::vector<TrexRpcServerInterface *> m_servers; + // an alias to the req resp server + TrexRpcServerReqRes *m_req_resp; + bool m_verbose; static const std::string s_server_uptime; |