diff options
Diffstat (limited to 'src/rpc-server')
-rw-r--r-- | src/rpc-server/trex_rpc_async_server.cpp | 2 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_async_server.h | 2 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_req_resp_server.cpp | 39 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_req_resp_server.h | 2 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_server.cpp | 17 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_server_api.h | 19 |
6 files changed, 63 insertions, 18 deletions
diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp index aee92539..82c42458 100644 --- a/src/rpc-server/trex_rpc_async_server.cpp +++ b/src/rpc-server/trex_rpc_async_server.cpp @@ -36,7 +36,7 @@ limitations under the License. * ZMQ based publisher server * */ -TrexRpcServerAsync::TrexRpcServerAsync(const TrexRpcServerConfig &cfg, std::mutex *lock) : TrexRpcServerInterface(cfg, "publisher", lock) { +TrexRpcServerAsync::TrexRpcServerAsync(const TrexRpcServerConfig &cfg) : TrexRpcServerInterface(cfg, "publisher") { /* ZMQ is not thread safe - this should be outside */ m_context = zmq_ctx_new(); } diff --git a/src/rpc-server/trex_rpc_async_server.h b/src/rpc-server/trex_rpc_async_server.h index 80d92c2f..daefa174 100644 --- a/src/rpc-server/trex_rpc_async_server.h +++ b/src/rpc-server/trex_rpc_async_server.h @@ -33,7 +33,7 @@ limitations under the License. class TrexRpcServerAsync : public TrexRpcServerInterface { public: - TrexRpcServerAsync(const TrexRpcServerConfig &cfg, std::mutex *lock = NULL); + TrexRpcServerAsync(const TrexRpcServerConfig &cfg); protected: void _prepare(); diff --git a/src/rpc-server/trex_rpc_req_resp_server.cpp b/src/rpc-server/trex_rpc_req_resp_server.cpp index 5c587e0f..033f265c 100644 --- a/src/rpc-server/trex_rpc_req_resp_server.cpp +++ b/src/rpc-server/trex_rpc_req_resp_server.cpp @@ -32,11 +32,13 @@ limitations under the License. #include <zmq.h> #include <json/json.h> +#include "trex_watchdog.h" + /** * ZMQ based request-response server * */ -TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock) : TrexRpcServerInterface(cfg, "req resp", lock) { +TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg) : TrexRpcServerInterface(cfg, "ZMQ sync request-response") { } @@ -52,11 +54,19 @@ void TrexRpcServerReqRes::_prepare() { */ void TrexRpcServerReqRes::_rpc_thread_cb() { std::stringstream ss; + int zmq_rc; + + m_watchdog_handle = m_watchdog->register_monitor(m_name, 1); /* create a socket based on the configuration */ m_socket = zmq_socket (m_context, ZMQ_REP); + /* to make sure the watchdog gets tickles form time to time we give a timeout of 500ms */ + int timeout = 500; + zmq_rc = zmq_setsockopt (m_socket, ZMQ_RCVTIMEO, &timeout, sizeof(int)); + assert(zmq_rc == 0); + switch (m_cfg.get_protocol()) { case TrexRpcServerConfig::RPC_PROT_TCP: ss << "tcp://*:"; @@ -68,8 +78,8 @@ void TrexRpcServerReqRes::_rpc_thread_cb() { ss << m_cfg.get_port(); /* bind the scoket */ - int rc = zmq_bind (m_socket, ss.str().c_str()); - if (rc != 0) { + zmq_rc = zmq_bind (m_socket, ss.str().c_str()); + if (zmq_rc != 0) { throw TrexRpcException("Unable to start ZMQ server at: " + ss.str()); } @@ -90,6 +100,9 @@ void TrexRpcServerReqRes::_rpc_thread_cb() { /* must be done from the same thread */ zmq_close(m_socket); + + /* done */ + m_watchdog->disable_monitor(m_watchdog_handle); } bool @@ -101,10 +114,22 @@ TrexRpcServerReqRes::fetch_one_request(std::string &msg) { rc = zmq_msg_init(&zmq_msg); assert(rc == 0); - rc = zmq_msg_recv (&zmq_msg, m_socket, 0); + while (true) { + m_watchdog->tickle(m_watchdog_handle); - if (rc == -1) { + rc = zmq_msg_recv (&zmq_msg, m_socket, 0); + if (rc != -1) { + break; + } + + /* timeout ? */ + if (errno == EAGAIN) { + continue; + } + + /* error ! */ zmq_msg_close(&zmq_msg); + /* normal shutdown and zmq_term was called */ if (errno == ETERM) { return false; @@ -113,7 +138,9 @@ TrexRpcServerReqRes::fetch_one_request(std::string &msg) { } } - const char *data = (const char *)zmq_msg_data(&zmq_msg); + + + const char *data = (const char *)zmq_msg_data(&zmq_msg); size_t len = zmq_msg_size(&zmq_msg); msg.append(data, len); diff --git a/src/rpc-server/trex_rpc_req_resp_server.h b/src/rpc-server/trex_rpc_req_resp_server.h index 26b3248f..92d51a2a 100644 --- a/src/rpc-server/trex_rpc_req_resp_server.h +++ b/src/rpc-server/trex_rpc_req_resp_server.h @@ -32,7 +32,7 @@ limitations under the License. class TrexRpcServerReqRes : public TrexRpcServerInterface { public: - TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock = NULL); + TrexRpcServerReqRes(const TrexRpcServerConfig &cfg); /* for test purposes - bypass the ZMQ and inject a message */ std::string test_inject_request(const std::string &req); diff --git a/src/rpc-server/trex_rpc_server.cpp b/src/rpc-server/trex_rpc_server.cpp index 7d2e31a5..e4ca95c3 100644 --- a/src/rpc-server/trex_rpc_server.cpp +++ b/src/rpc-server/trex_rpc_server.cpp @@ -28,11 +28,20 @@ limitations under the License. #include <sstream> #include <iostream> +#include "trex_watchdog.h" + /************** RPC server interface ***************/ -TrexRpcServerInterface::TrexRpcServerInterface(const TrexRpcServerConfig &cfg, const std::string &name, std::mutex *lock) : m_cfg(cfg), m_name(name), m_lock(lock) { +TrexRpcServerInterface::TrexRpcServerInterface(const TrexRpcServerConfig &cfg, const std::string &name) : m_cfg(cfg) { + m_name = name; + + m_lock = cfg.m_lock; + m_watchdog = cfg.m_watchdog; + m_watchdog_handle = -1; + m_is_running = false; m_is_verbose = false; + if (m_lock == NULL) { m_lock = &m_dummy_lock; } @@ -69,6 +78,7 @@ void TrexRpcServerInterface::start() { /* prepare for run */ _prepare(); + m_watchdog->mark_pending_monitor(); m_thread = new std::thread(&TrexRpcServerInterface::_rpc_thread_cb, this); if (!m_thread) { throw TrexRpcException("unable to create RPC thread"); @@ -119,8 +129,7 @@ get_current_date_time() { const std::string TrexRpcServer::s_server_uptime = get_current_date_time(); -TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg, - std::mutex *lock) { +TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg) { m_req_resp = NULL; @@ -130,7 +139,7 @@ TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg, if (req_resp_cfg->get_protocol() == TrexRpcServerConfig::RPC_PROT_MOCK) { m_req_resp = new TrexRpcServerReqResMock(*req_resp_cfg); } else { - m_req_resp = new TrexRpcServerReqRes(*req_resp_cfg, lock); + m_req_resp = new TrexRpcServerReqRes(*req_resp_cfg); } m_servers.push_back(m_req_resp); diff --git a/src/rpc-server/trex_rpc_server_api.h b/src/rpc-server/trex_rpc_server_api.h index a02b2cc0..3d9837ef 100644 --- a/src/rpc-server/trex_rpc_server_api.h +++ b/src/rpc-server/trex_rpc_server_api.h @@ -33,6 +33,7 @@ limitations under the License. class TrexRpcServerInterface; class TrexRpcServerReqRes; +class TrexWatchDog; /** * defines a configuration of generic RPC server @@ -47,8 +48,11 @@ public: RPC_PROT_MOCK }; - TrexRpcServerConfig(rpc_prot_e protocol, uint16_t port) : m_protocol(protocol), m_port(port) { - + TrexRpcServerConfig(rpc_prot_e protocol, uint16_t port, std::mutex *lock, TrexWatchDog *watchdog) { + m_protocol = protocol; + m_port = port; + m_lock = lock; + m_watchdog = watchdog; } uint16_t get_port() const { @@ -62,6 +66,10 @@ public: private: rpc_prot_e m_protocol; uint16_t m_port; + +public: + std::mutex *m_lock; + TrexWatchDog *m_watchdog; }; /** @@ -72,7 +80,7 @@ private: class TrexRpcServerInterface { public: - TrexRpcServerInterface(const TrexRpcServerConfig &cfg, const std::string &name, std::mutex *m_lock = NULL); + TrexRpcServerInterface(const TrexRpcServerConfig &cfg, const std::string &name); virtual ~TrexRpcServerInterface(); /** @@ -134,6 +142,8 @@ protected: std::string m_name; std::mutex *m_lock; std::mutex m_dummy_lock; + TrexWatchDog *m_watchdog; + int m_watchdog_handle; }; /** @@ -147,8 +157,7 @@ class TrexRpcServer { public: /* creates the collection of servers using configurations */ - TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg, - std::mutex *m_lock = NULL); + TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg); ~TrexRpcServer(); |