summaryrefslogtreecommitdiffstats
path: root/src/rpc-server
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc-server')
-rw-r--r--src/rpc-server/trex_rpc_async_server.cpp2
-rw-r--r--src/rpc-server/trex_rpc_async_server.h2
-rw-r--r--src/rpc-server/trex_rpc_req_resp_server.cpp39
-rw-r--r--src/rpc-server/trex_rpc_req_resp_server.h2
-rw-r--r--src/rpc-server/trex_rpc_server.cpp17
-rw-r--r--src/rpc-server/trex_rpc_server_api.h19
6 files changed, 63 insertions, 18 deletions
diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp
index aee92539..82c42458 100644
--- a/src/rpc-server/trex_rpc_async_server.cpp
+++ b/src/rpc-server/trex_rpc_async_server.cpp
@@ -36,7 +36,7 @@ limitations under the License.
* ZMQ based publisher server
*
*/
-TrexRpcServerAsync::TrexRpcServerAsync(const TrexRpcServerConfig &cfg, std::mutex *lock) : TrexRpcServerInterface(cfg, "publisher", lock) {
+TrexRpcServerAsync::TrexRpcServerAsync(const TrexRpcServerConfig &cfg) : TrexRpcServerInterface(cfg, "publisher") {
/* ZMQ is not thread safe - this should be outside */
m_context = zmq_ctx_new();
}
diff --git a/src/rpc-server/trex_rpc_async_server.h b/src/rpc-server/trex_rpc_async_server.h
index 80d92c2f..daefa174 100644
--- a/src/rpc-server/trex_rpc_async_server.h
+++ b/src/rpc-server/trex_rpc_async_server.h
@@ -33,7 +33,7 @@ limitations under the License.
class TrexRpcServerAsync : public TrexRpcServerInterface {
public:
- TrexRpcServerAsync(const TrexRpcServerConfig &cfg, std::mutex *lock = NULL);
+ TrexRpcServerAsync(const TrexRpcServerConfig &cfg);
protected:
void _prepare();
diff --git a/src/rpc-server/trex_rpc_req_resp_server.cpp b/src/rpc-server/trex_rpc_req_resp_server.cpp
index 5c587e0f..033f265c 100644
--- a/src/rpc-server/trex_rpc_req_resp_server.cpp
+++ b/src/rpc-server/trex_rpc_req_resp_server.cpp
@@ -32,11 +32,13 @@ limitations under the License.
#include <zmq.h>
#include <json/json.h>
+#include "trex_watchdog.h"
+
/**
* ZMQ based request-response server
*
*/
-TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock) : TrexRpcServerInterface(cfg, "req resp", lock) {
+TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg) : TrexRpcServerInterface(cfg, "ZMQ sync request-response") {
}
@@ -52,11 +54,19 @@ void TrexRpcServerReqRes::_prepare() {
*/
void TrexRpcServerReqRes::_rpc_thread_cb() {
std::stringstream ss;
+ int zmq_rc;
+
+ m_watchdog_handle = m_watchdog->register_monitor(m_name, 1);
/* create a socket based on the configuration */
m_socket = zmq_socket (m_context, ZMQ_REP);
+ /* to make sure the watchdog gets tickles form time to time we give a timeout of 500ms */
+ int timeout = 500;
+ zmq_rc = zmq_setsockopt (m_socket, ZMQ_RCVTIMEO, &timeout, sizeof(int));
+ assert(zmq_rc == 0);
+
switch (m_cfg.get_protocol()) {
case TrexRpcServerConfig::RPC_PROT_TCP:
ss << "tcp://*:";
@@ -68,8 +78,8 @@ void TrexRpcServerReqRes::_rpc_thread_cb() {
ss << m_cfg.get_port();
/* bind the scoket */
- int rc = zmq_bind (m_socket, ss.str().c_str());
- if (rc != 0) {
+ zmq_rc = zmq_bind (m_socket, ss.str().c_str());
+ if (zmq_rc != 0) {
throw TrexRpcException("Unable to start ZMQ server at: " + ss.str());
}
@@ -90,6 +100,9 @@ void TrexRpcServerReqRes::_rpc_thread_cb() {
/* must be done from the same thread */
zmq_close(m_socket);
+
+ /* done */
+ m_watchdog->disable_monitor(m_watchdog_handle);
}
bool
@@ -101,10 +114,22 @@ TrexRpcServerReqRes::fetch_one_request(std::string &msg) {
rc = zmq_msg_init(&zmq_msg);
assert(rc == 0);
- rc = zmq_msg_recv (&zmq_msg, m_socket, 0);
+ while (true) {
+ m_watchdog->tickle(m_watchdog_handle);
- if (rc == -1) {
+ rc = zmq_msg_recv (&zmq_msg, m_socket, 0);
+ if (rc != -1) {
+ break;
+ }
+
+ /* timeout ? */
+ if (errno == EAGAIN) {
+ continue;
+ }
+
+ /* error ! */
zmq_msg_close(&zmq_msg);
+
/* normal shutdown and zmq_term was called */
if (errno == ETERM) {
return false;
@@ -113,7 +138,9 @@ TrexRpcServerReqRes::fetch_one_request(std::string &msg) {
}
}
- const char *data = (const char *)zmq_msg_data(&zmq_msg);
+
+
+ const char *data = (const char *)zmq_msg_data(&zmq_msg);
size_t len = zmq_msg_size(&zmq_msg);
msg.append(data, len);
diff --git a/src/rpc-server/trex_rpc_req_resp_server.h b/src/rpc-server/trex_rpc_req_resp_server.h
index 26b3248f..92d51a2a 100644
--- a/src/rpc-server/trex_rpc_req_resp_server.h
+++ b/src/rpc-server/trex_rpc_req_resp_server.h
@@ -32,7 +32,7 @@ limitations under the License.
class TrexRpcServerReqRes : public TrexRpcServerInterface {
public:
- TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock = NULL);
+ TrexRpcServerReqRes(const TrexRpcServerConfig &cfg);
/* for test purposes - bypass the ZMQ and inject a message */
std::string test_inject_request(const std::string &req);
diff --git a/src/rpc-server/trex_rpc_server.cpp b/src/rpc-server/trex_rpc_server.cpp
index 7d2e31a5..e4ca95c3 100644
--- a/src/rpc-server/trex_rpc_server.cpp
+++ b/src/rpc-server/trex_rpc_server.cpp
@@ -28,11 +28,20 @@ limitations under the License.
#include <sstream>
#include <iostream>
+#include "trex_watchdog.h"
+
/************** RPC server interface ***************/
-TrexRpcServerInterface::TrexRpcServerInterface(const TrexRpcServerConfig &cfg, const std::string &name, std::mutex *lock) : m_cfg(cfg), m_name(name), m_lock(lock) {
+TrexRpcServerInterface::TrexRpcServerInterface(const TrexRpcServerConfig &cfg, const std::string &name) : m_cfg(cfg) {
+ m_name = name;
+
+ m_lock = cfg.m_lock;
+ m_watchdog = cfg.m_watchdog;
+ m_watchdog_handle = -1;
+
m_is_running = false;
m_is_verbose = false;
+
if (m_lock == NULL) {
m_lock = &m_dummy_lock;
}
@@ -69,6 +78,7 @@ void TrexRpcServerInterface::start() {
/* prepare for run */
_prepare();
+ m_watchdog->mark_pending_monitor();
m_thread = new std::thread(&TrexRpcServerInterface::_rpc_thread_cb, this);
if (!m_thread) {
throw TrexRpcException("unable to create RPC thread");
@@ -119,8 +129,7 @@ get_current_date_time() {
const std::string TrexRpcServer::s_server_uptime = get_current_date_time();
-TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg,
- std::mutex *lock) {
+TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg) {
m_req_resp = NULL;
@@ -130,7 +139,7 @@ TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg,
if (req_resp_cfg->get_protocol() == TrexRpcServerConfig::RPC_PROT_MOCK) {
m_req_resp = new TrexRpcServerReqResMock(*req_resp_cfg);
} else {
- m_req_resp = new TrexRpcServerReqRes(*req_resp_cfg, lock);
+ m_req_resp = new TrexRpcServerReqRes(*req_resp_cfg);
}
m_servers.push_back(m_req_resp);
diff --git a/src/rpc-server/trex_rpc_server_api.h b/src/rpc-server/trex_rpc_server_api.h
index a02b2cc0..3d9837ef 100644
--- a/src/rpc-server/trex_rpc_server_api.h
+++ b/src/rpc-server/trex_rpc_server_api.h
@@ -33,6 +33,7 @@ limitations under the License.
class TrexRpcServerInterface;
class TrexRpcServerReqRes;
+class TrexWatchDog;
/**
* defines a configuration of generic RPC server
@@ -47,8 +48,11 @@ public:
RPC_PROT_MOCK
};
- TrexRpcServerConfig(rpc_prot_e protocol, uint16_t port) : m_protocol(protocol), m_port(port) {
-
+ TrexRpcServerConfig(rpc_prot_e protocol, uint16_t port, std::mutex *lock, TrexWatchDog *watchdog) {
+ m_protocol = protocol;
+ m_port = port;
+ m_lock = lock;
+ m_watchdog = watchdog;
}
uint16_t get_port() const {
@@ -62,6 +66,10 @@ public:
private:
rpc_prot_e m_protocol;
uint16_t m_port;
+
+public:
+ std::mutex *m_lock;
+ TrexWatchDog *m_watchdog;
};
/**
@@ -72,7 +80,7 @@ private:
class TrexRpcServerInterface {
public:
- TrexRpcServerInterface(const TrexRpcServerConfig &cfg, const std::string &name, std::mutex *m_lock = NULL);
+ TrexRpcServerInterface(const TrexRpcServerConfig &cfg, const std::string &name);
virtual ~TrexRpcServerInterface();
/**
@@ -134,6 +142,8 @@ protected:
std::string m_name;
std::mutex *m_lock;
std::mutex m_dummy_lock;
+ TrexWatchDog *m_watchdog;
+ int m_watchdog_handle;
};
/**
@@ -147,8 +157,7 @@ class TrexRpcServer {
public:
/* creates the collection of servers using configurations */
- TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg,
- std::mutex *m_lock = NULL);
+ TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg);
~TrexRpcServer();