summaryrefslogtreecommitdiffstats
path: root/src/rpc-server/trex_rpc_req_resp_server.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc-server/trex_rpc_req_resp_server.cpp')
-rw-r--r--src/rpc-server/trex_rpc_req_resp_server.cpp36
1 files changed, 30 insertions, 6 deletions
diff --git a/src/rpc-server/trex_rpc_req_resp_server.cpp b/src/rpc-server/trex_rpc_req_resp_server.cpp
index 5c587e0f..d36753d4 100644
--- a/src/rpc-server/trex_rpc_req_resp_server.cpp
+++ b/src/rpc-server/trex_rpc_req_resp_server.cpp
@@ -32,11 +32,13 @@ limitations under the License.
#include <zmq.h>
#include <json/json.h>
+#include "trex_watchdog.h"
+
/**
* ZMQ based request-response server
*
*/
-TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock) : TrexRpcServerInterface(cfg, "req resp", lock) {
+TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg) : TrexRpcServerInterface(cfg, "ZMQ sync request-response") {
}
@@ -52,11 +54,19 @@ void TrexRpcServerReqRes::_prepare() {
*/
void TrexRpcServerReqRes::_rpc_thread_cb() {
std::stringstream ss;
+ int zmq_rc;
+
+ m_watchdog_handle = m_watchdog->register_monitor(m_name, 1);
/* create a socket based on the configuration */
m_socket = zmq_socket (m_context, ZMQ_REP);
+ /* to make sure the watchdog gets tickles form time to time we give a timeout of 500ms */
+ int timeout = 500;
+ zmq_rc = zmq_setsockopt (m_socket, ZMQ_RCVTIMEO, &timeout, sizeof(int));
+ assert(zmq_rc == 0);
+
switch (m_cfg.get_protocol()) {
case TrexRpcServerConfig::RPC_PROT_TCP:
ss << "tcp://*:";
@@ -68,8 +78,8 @@ void TrexRpcServerReqRes::_rpc_thread_cb() {
ss << m_cfg.get_port();
/* bind the scoket */
- int rc = zmq_bind (m_socket, ss.str().c_str());
- if (rc != 0) {
+ zmq_rc = zmq_bind (m_socket, ss.str().c_str());
+ if (zmq_rc != 0) {
throw TrexRpcException("Unable to start ZMQ server at: " + ss.str());
}
@@ -101,10 +111,22 @@ TrexRpcServerReqRes::fetch_one_request(std::string &msg) {
rc = zmq_msg_init(&zmq_msg);
assert(rc == 0);
- rc = zmq_msg_recv (&zmq_msg, m_socket, 0);
+ while (true) {
+ m_watchdog->tickle(m_watchdog_handle);
+
+ rc = zmq_msg_recv (&zmq_msg, m_socket, 0);
+ if (rc != -1) {
+ break;
+ }
+
+ /* timeout ? */
+ if (errno == EAGAIN) {
+ continue;
+ }
- if (rc == -1) {
+ /* error ! */
zmq_msg_close(&zmq_msg);
+
/* normal shutdown and zmq_term was called */
if (errno == ETERM) {
return false;
@@ -113,7 +135,9 @@ TrexRpcServerReqRes::fetch_one_request(std::string &msg) {
}
}
- const char *data = (const char *)zmq_msg_data(&zmq_msg);
+
+
+ const char *data = (const char *)zmq_msg_data(&zmq_msg);
size_t len = zmq_msg_size(&zmq_msg);
msg.append(data, len);