diff options
author | imarom <imarom@cisco.com> | 2016-06-01 15:52:00 +0300 |
---|---|---|
committer | imarom <imarom@cisco.com> | 2016-06-02 13:45:12 +0300 |
commit | 3c4a29e15f3663f6413fbee2562d7d0aa4e2f80d (patch) | |
tree | c9742549ad7a8013f43077dceb5fa9eacf0aaadf /src/rpc-server/trex_rpc_req_resp_server.cpp | |
parent | b639fb458fb2388164adaf45c4e947a2af2ca0e1 (diff) |
watchdog phase 2
Diffstat (limited to 'src/rpc-server/trex_rpc_req_resp_server.cpp')
-rw-r--r-- | src/rpc-server/trex_rpc_req_resp_server.cpp | 36 |
1 files changed, 30 insertions, 6 deletions
diff --git a/src/rpc-server/trex_rpc_req_resp_server.cpp b/src/rpc-server/trex_rpc_req_resp_server.cpp index 5c587e0f..d36753d4 100644 --- a/src/rpc-server/trex_rpc_req_resp_server.cpp +++ b/src/rpc-server/trex_rpc_req_resp_server.cpp @@ -32,11 +32,13 @@ limitations under the License. #include <zmq.h> #include <json/json.h> +#include "trex_watchdog.h" + /** * ZMQ based request-response server * */ -TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock) : TrexRpcServerInterface(cfg, "req resp", lock) { +TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg) : TrexRpcServerInterface(cfg, "ZMQ sync request-response") { } @@ -52,11 +54,19 @@ void TrexRpcServerReqRes::_prepare() { */ void TrexRpcServerReqRes::_rpc_thread_cb() { std::stringstream ss; + int zmq_rc; + + m_watchdog_handle = m_watchdog->register_monitor(m_name, 1); /* create a socket based on the configuration */ m_socket = zmq_socket (m_context, ZMQ_REP); + /* to make sure the watchdog gets tickles form time to time we give a timeout of 500ms */ + int timeout = 500; + zmq_rc = zmq_setsockopt (m_socket, ZMQ_RCVTIMEO, &timeout, sizeof(int)); + assert(zmq_rc == 0); + switch (m_cfg.get_protocol()) { case TrexRpcServerConfig::RPC_PROT_TCP: ss << "tcp://*:"; @@ -68,8 +78,8 @@ void TrexRpcServerReqRes::_rpc_thread_cb() { ss << m_cfg.get_port(); /* bind the scoket */ - int rc = zmq_bind (m_socket, ss.str().c_str()); - if (rc != 0) { + zmq_rc = zmq_bind (m_socket, ss.str().c_str()); + if (zmq_rc != 0) { throw TrexRpcException("Unable to start ZMQ server at: " + ss.str()); } @@ -101,10 +111,22 @@ TrexRpcServerReqRes::fetch_one_request(std::string &msg) { rc = zmq_msg_init(&zmq_msg); assert(rc == 0); - rc = zmq_msg_recv (&zmq_msg, m_socket, 0); + while (true) { + m_watchdog->tickle(m_watchdog_handle); + + rc = zmq_msg_recv (&zmq_msg, m_socket, 0); + if (rc != -1) { + break; + } + + /* timeout ? */ + if (errno == EAGAIN) { + continue; + } - if (rc == -1) { + /* error ! */ zmq_msg_close(&zmq_msg); + /* normal shutdown and zmq_term was called */ if (errno == ETERM) { return false; @@ -113,7 +135,9 @@ TrexRpcServerReqRes::fetch_one_request(std::string &msg) { } } - const char *data = (const char *)zmq_msg_data(&zmq_msg); + + + const char *data = (const char *)zmq_msg_data(&zmq_msg); size_t len = zmq_msg_size(&zmq_msg); msg.append(data, len); |