summaryrefslogtreecommitdiffstats
path: root/src/rpc-server
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc-server')
-rw-r--r--src/rpc-server/trex_rpc_async_server.cpp4
-rw-r--r--src/rpc-server/trex_rpc_async_server.h1
-rw-r--r--src/rpc-server/trex_rpc_cmd.cpp6
-rw-r--r--src/rpc-server/trex_rpc_cmd_api.h12
-rw-r--r--src/rpc-server/trex_rpc_req_resp_server.cpp56
-rw-r--r--src/rpc-server/trex_rpc_req_resp_server.h26
-rw-r--r--src/rpc-server/trex_rpc_server.cpp23
-rw-r--r--src/rpc-server/trex_rpc_server_api.h25
8 files changed, 141 insertions, 12 deletions
diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp
index 46fe499b..6e5fbfc6 100644
--- a/src/rpc-server/trex_rpc_async_server.cpp
+++ b/src/rpc-server/trex_rpc_async_server.cpp
@@ -41,6 +41,10 @@ TrexRpcServerAsync::TrexRpcServerAsync(const TrexRpcServerConfig &cfg, std::mute
m_context = zmq_ctx_new();
}
+void
+TrexRpcServerAsync::_prepare() {
+}
+
/**
* publisher thread
*
diff --git a/src/rpc-server/trex_rpc_async_server.h b/src/rpc-server/trex_rpc_async_server.h
index 02d1490e..80d92c2f 100644
--- a/src/rpc-server/trex_rpc_async_server.h
+++ b/src/rpc-server/trex_rpc_async_server.h
@@ -36,6 +36,7 @@ public:
TrexRpcServerAsync(const TrexRpcServerConfig &cfg, std::mutex *lock = NULL);
protected:
+ void _prepare();
void _rpc_thread_cb();
void _stop_rpc_thread();
diff --git a/src/rpc-server/trex_rpc_cmd.cpp b/src/rpc-server/trex_rpc_cmd.cpp
index b5dd121c..aea7980b 100644
--- a/src/rpc-server/trex_rpc_cmd.cpp
+++ b/src/rpc-server/trex_rpc_cmd.cpp
@@ -32,7 +32,7 @@ TrexRpcCommand::run(const Json::Value &params, Json::Value &result) {
check_param_count(params, m_param_count, result);
- if (m_needs_ownership) {
+ if (m_needs_ownership && !g_test_override_ownership) {
verify_ownership(params, result);
}
@@ -372,3 +372,7 @@ TrexRpcCommand::generate_execute_err(Json::Value &result, const std::string &msg
throw (TrexRpcCommandException(TREX_RPC_CMD_EXECUTE_ERR));
}
+/**
+ * by default this is off
+ */
+bool TrexRpcCommand::g_test_override_ownership = false;
diff --git a/src/rpc-server/trex_rpc_cmd_api.h b/src/rpc-server/trex_rpc_cmd_api.h
index 7cbdf4ff..675d2900 100644
--- a/src/rpc-server/trex_rpc_cmd_api.h
+++ b/src/rpc-server/trex_rpc_cmd_api.h
@@ -89,6 +89,16 @@ public:
return m_name;
}
+ /**
+ * on test we enable this override
+ *
+ *
+ * @param enable
+ */
+ static void test_set_override_ownership(bool enable) {
+ g_test_override_ownership = enable;
+ }
+
virtual ~TrexRpcCommand() {}
protected:
@@ -241,6 +251,8 @@ protected:
std::string m_name;
int m_param_count;
bool m_needs_ownership;
+
+ static bool g_test_override_ownership;
};
#endif /* __TREX_RPC_CMD_API_H__ */
diff --git a/src/rpc-server/trex_rpc_req_resp_server.cpp b/src/rpc-server/trex_rpc_req_resp_server.cpp
index eb7825ac..1e8e177d 100644
--- a/src/rpc-server/trex_rpc_req_resp_server.cpp
+++ b/src/rpc-server/trex_rpc_req_resp_server.cpp
@@ -36,7 +36,10 @@ limitations under the License.
*
*/
TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock) : TrexRpcServerInterface(cfg, "req resp", lock) {
- /* ZMQ is not thread safe - this should be outside */
+
+}
+
+void TrexRpcServerReqRes::_prepare() {
m_context = zmq_ctx_new();
}
@@ -123,15 +126,28 @@ TrexRpcServerReqRes::fetch_one_request(std::string &msg) {
*/
void TrexRpcServerReqRes::_stop_rpc_thread() {
/* by calling zmq_term we signal the blocked thread to exit */
- zmq_term(m_context);
+ if (m_context) {
+ zmq_term(m_context);
+ }
}
+
/**
* handles a request given to the server
* respondes to the request
*/
void TrexRpcServerReqRes::handle_request(const std::string &request) {
+ std::string response_str = process_request(request);
+ zmq_send(m_socket, response_str.c_str(), response_str.size(), 0);
+}
+
+/**
+ * main processing of the request
+ *
+ */
+std::string TrexRpcServerReqRes::process_request(const std::string &request) {
+
std::vector<TrexJsonRpcV2ParsedObject *> commands;
Json::FastWriter writer;
@@ -175,8 +191,7 @@ void TrexRpcServerReqRes::handle_request(const std::string &request) {
verbose_json("Server Replied: ", response_str);
- zmq_send(m_socket, response_str.c_str(), response_str.size(), 0);
-
+ return response_str;
}
/**
@@ -198,3 +213,36 @@ TrexRpcServerReqRes::handle_server_error(const std::string &specific_err) {
zmq_send(m_socket, response_str.c_str(), response_str.size(), 0);
}
+
+
+
+std::string
+TrexRpcServerReqRes::test_inject_request(const std::string &req) {
+ return process_request(req);
+}
+
+
+/**
+ * MOCK req resp server
+ */
+TrexRpcServerReqResMock::TrexRpcServerReqResMock(const TrexRpcServerConfig &cfg) : TrexRpcServerReqRes(cfg) {
+}
+
+/**
+ * override start
+ *
+ */
+void
+TrexRpcServerReqResMock::start() {
+
+}
+
+
+/**
+ * override stop
+ */
+void
+TrexRpcServerReqResMock::stop() {
+
+}
+
diff --git a/src/rpc-server/trex_rpc_req_resp_server.h b/src/rpc-server/trex_rpc_req_resp_server.h
index 2876206c..97efbe08 100644
--- a/src/rpc-server/trex_rpc_req_resp_server.h
+++ b/src/rpc-server/trex_rpc_req_resp_server.h
@@ -34,13 +34,19 @@ public:
TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock = NULL);
+ /* for test purposes - bypass the ZMQ and inject a message */
+ std::string test_inject_request(const std::string &req);
+
protected:
+
+ void _prepare();
void _rpc_thread_cb();
void _stop_rpc_thread();
-private:
bool fetch_one_request(std::string &msg);
void handle_request(const std::string &request);
+ std::string process_request(const std::string &request);
+
void handle_server_error(const std::string &specific_err);
void *m_context;
@@ -48,4 +54,22 @@ private:
};
+/**
+ * a mock req resp server (for tests)
+ *
+ * @author imarom (03-Jan-16)
+ */
+class TrexRpcServerReqResMock : public TrexRpcServerReqRes {
+
+public:
+ TrexRpcServerReqResMock(const TrexRpcServerConfig &cfg);
+
+ /* override the interface functions */
+ virtual void start();
+ virtual void stop();
+
+
+};
+
#endif /* __TREX_RPC_REQ_RESP_API_H__ */
+
diff --git a/src/rpc-server/trex_rpc_server.cpp b/src/rpc-server/trex_rpc_server.cpp
index a14e6f97..1dfc4494 100644
--- a/src/rpc-server/trex_rpc_server.cpp
+++ b/src/rpc-server/trex_rpc_server.cpp
@@ -63,6 +63,9 @@ void TrexRpcServerInterface::start() {
verbose_msg("Starting RPC Server");
+ /* prepare for run */
+ _prepare();
+
m_thread = new std::thread(&TrexRpcServerInterface::_rpc_thread_cb, this);
if (!m_thread) {
throw TrexRpcException("unable to create RPC thread");
@@ -117,9 +120,18 @@ TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg,
const TrexRpcServerConfig *async_cfg,
std::mutex *lock) {
+ m_req_resp = NULL;
+
/* add the request response server */
if (req_resp_cfg) {
- m_servers.push_back(new TrexRpcServerReqRes(*req_resp_cfg, lock));
+
+ if (req_resp_cfg->get_protocol() == TrexRpcServerConfig::RPC_PROT_MOCK) {
+ m_req_resp = new TrexRpcServerReqResMock(*req_resp_cfg);
+ } else {
+ m_req_resp = new TrexRpcServerReqRes(*req_resp_cfg, lock);
+ }
+
+ m_servers.push_back(m_req_resp);
}
/* add async publisher */
@@ -166,3 +178,12 @@ void TrexRpcServer::set_verbose(bool verbose) {
}
}
+
+std::string
+TrexRpcServer::test_inject_request(const std::string &req_str) {
+ if (m_req_resp) {
+ return m_req_resp->test_inject_request(req_str);
+ } else {
+ return "";
+ }
+}
diff --git a/src/rpc-server/trex_rpc_server_api.h b/src/rpc-server/trex_rpc_server_api.h
index ff876ac4..1ab5dce9 100644
--- a/src/rpc-server/trex_rpc_server_api.h
+++ b/src/rpc-server/trex_rpc_server_api.h
@@ -29,8 +29,10 @@ limitations under the License.
#include <string>
#include <stdexcept>
#include <trex_rpc_exception_api.h>
+#include <json/json.h>
class TrexRpcServerInterface;
+class TrexRpcServerReqRes;
/**
* defines a configuration of generic RPC server
@@ -41,18 +43,19 @@ class TrexRpcServerConfig {
public:
enum rpc_prot_e {
- RPC_PROT_TCP
+ RPC_PROT_TCP,
+ RPC_PROT_MOCK
};
TrexRpcServerConfig(rpc_prot_e protocol, uint16_t port) : m_protocol(protocol), m_port(port) {
}
- uint16_t get_port() {
+ uint16_t get_port() const {
return m_port;
}
- rpc_prot_e get_protocol() {
+ rpc_prot_e get_protocol() const {
return m_protocol;
}
@@ -76,13 +79,13 @@ public:
* starts the server
*
*/
- void start();
+ virtual void start();
/**
* stops the server
*
*/
- void stop();
+ virtual void stop();
/**
* set verbose on or off
@@ -107,6 +110,7 @@ protected:
* instances implement this
*
*/
+ virtual void _prepare() = 0;
virtual void _rpc_thread_cb() = 0;
virtual void _stop_rpc_thread() = 0;
@@ -169,12 +173,23 @@ public:
}
+ /**
+ * allow injecting of a JSON and get a response
+ *
+ * @author imarom (27-Dec-15)
+ *
+ * @return std::string
+ */
+ std::string test_inject_request(const std::string &request_str);
private:
static std::string generate_handler();
std::vector<TrexRpcServerInterface *> m_servers;
+ // an alias to the req resp server
+ TrexRpcServerReqRes *m_req_resp;
+
bool m_verbose;
static const std::string s_server_uptime;