From 3c4a29e15f3663f6413fbee2562d7d0aa4e2f80d Mon Sep 17 00:00:00 2001 From: imarom Date: Wed, 1 Jun 2016 15:52:00 +0300 Subject: watchdog phase 2 --- src/rpc-server/trex_rpc_req_resp_server.cpp | 36 ++++++++++++++++++++++++----- 1 file changed, 30 insertions(+), 6 deletions(-) (limited to 'src/rpc-server/trex_rpc_req_resp_server.cpp') diff --git a/src/rpc-server/trex_rpc_req_resp_server.cpp b/src/rpc-server/trex_rpc_req_resp_server.cpp index 5c587e0f..d36753d4 100644 --- a/src/rpc-server/trex_rpc_req_resp_server.cpp +++ b/src/rpc-server/trex_rpc_req_resp_server.cpp @@ -32,11 +32,13 @@ limitations under the License. #include #include +#include "trex_watchdog.h" + /** * ZMQ based request-response server * */ -TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock) : TrexRpcServerInterface(cfg, "req resp", lock) { +TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg) : TrexRpcServerInterface(cfg, "ZMQ sync request-response") { } @@ -52,11 +54,19 @@ void TrexRpcServerReqRes::_prepare() { */ void TrexRpcServerReqRes::_rpc_thread_cb() { std::stringstream ss; + int zmq_rc; + + m_watchdog_handle = m_watchdog->register_monitor(m_name, 1); /* create a socket based on the configuration */ m_socket = zmq_socket (m_context, ZMQ_REP); + /* to make sure the watchdog gets tickles form time to time we give a timeout of 500ms */ + int timeout = 500; + zmq_rc = zmq_setsockopt (m_socket, ZMQ_RCVTIMEO, &timeout, sizeof(int)); + assert(zmq_rc == 0); + switch (m_cfg.get_protocol()) { case TrexRpcServerConfig::RPC_PROT_TCP: ss << "tcp://*:"; @@ -68,8 +78,8 @@ void TrexRpcServerReqRes::_rpc_thread_cb() { ss << m_cfg.get_port(); /* bind the scoket */ - int rc = zmq_bind (m_socket, ss.str().c_str()); - if (rc != 0) { + zmq_rc = zmq_bind (m_socket, ss.str().c_str()); + if (zmq_rc != 0) { throw TrexRpcException("Unable to start ZMQ server at: " + ss.str()); } @@ -101,10 +111,22 @@ TrexRpcServerReqRes::fetch_one_request(std::string &msg) { rc = zmq_msg_init(&zmq_msg); assert(rc == 0); - rc = zmq_msg_recv (&zmq_msg, m_socket, 0); + while (true) { + m_watchdog->tickle(m_watchdog_handle); + + rc = zmq_msg_recv (&zmq_msg, m_socket, 0); + if (rc != -1) { + break; + } + + /* timeout ? */ + if (errno == EAGAIN) { + continue; + } - if (rc == -1) { + /* error ! */ zmq_msg_close(&zmq_msg); + /* normal shutdown and zmq_term was called */ if (errno == ETERM) { return false; @@ -113,7 +135,9 @@ TrexRpcServerReqRes::fetch_one_request(std::string &msg) { } } - const char *data = (const char *)zmq_msg_data(&zmq_msg); + + + const char *data = (const char *)zmq_msg_data(&zmq_msg); size_t len = zmq_msg_size(&zmq_msg); msg.append(data, len); -- cgit From 8feef53b5a272ec7a72f05d1a633d2a18071b775 Mon Sep 17 00:00:00 2001 From: imarom Date: Thu, 2 Jun 2016 13:40:02 +0300 Subject: WATCHDOG - disable monitors when done to avoid crash when joining on other threads --- src/rpc-server/trex_rpc_req_resp_server.cpp | 3 +++ 1 file changed, 3 insertions(+) (limited to 'src/rpc-server/trex_rpc_req_resp_server.cpp') diff --git a/src/rpc-server/trex_rpc_req_resp_server.cpp b/src/rpc-server/trex_rpc_req_resp_server.cpp index d36753d4..033f265c 100644 --- a/src/rpc-server/trex_rpc_req_resp_server.cpp +++ b/src/rpc-server/trex_rpc_req_resp_server.cpp @@ -100,6 +100,9 @@ void TrexRpcServerReqRes::_rpc_thread_cb() { /* must be done from the same thread */ zmq_close(m_socket); + + /* done */ + m_watchdog->disable_monitor(m_watchdog_handle); } bool -- cgit