summaryrefslogtreecommitdiffstats
path: root/src/rpc-server/trex_rpc_req_resp_server.cpp
diff options
context:
space:
mode:
authorHanoh Haim <hhaim@cisco.com>2016-06-07 13:02:08 +0300
committerHanoh Haim <hhaim@cisco.com>2016-06-07 13:02:08 +0300
commit38bcd9c376add0f94e3f660bbcf0558c55f31135 (patch)
treef5d6fd309f7995b94d0558ea4ec29ba00faa1c73 /src/rpc-server/trex_rpc_req_resp_server.cpp
parent0814f72d66600967c9bf5f1c743ce0ee64e5c1f2 (diff)
parent918ae3cda75a01c8a4769df79bf6bfd0b270a41f (diff)
Merge branch 'master' of csi-sceasr-b94:/auto/proj-pcube-b/apps/PL-b/tools/repo//trex-core
Diffstat (limited to 'src/rpc-server/trex_rpc_req_resp_server.cpp')
-rw-r--r--src/rpc-server/trex_rpc_req_resp_server.cpp39
1 files changed, 33 insertions, 6 deletions
diff --git a/src/rpc-server/trex_rpc_req_resp_server.cpp b/src/rpc-server/trex_rpc_req_resp_server.cpp
index 5c587e0f..033f265c 100644
--- a/src/rpc-server/trex_rpc_req_resp_server.cpp
+++ b/src/rpc-server/trex_rpc_req_resp_server.cpp
@@ -32,11 +32,13 @@ limitations under the License.
#include <zmq.h>
#include <json/json.h>
+#include "trex_watchdog.h"
+
/**
* ZMQ based request-response server
*
*/
-TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock) : TrexRpcServerInterface(cfg, "req resp", lock) {
+TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg) : TrexRpcServerInterface(cfg, "ZMQ sync request-response") {
}
@@ -52,11 +54,19 @@ void TrexRpcServerReqRes::_prepare() {
*/
void TrexRpcServerReqRes::_rpc_thread_cb() {
std::stringstream ss;
+ int zmq_rc;
+
+ m_watchdog_handle = m_watchdog->register_monitor(m_name, 1);
/* create a socket based on the configuration */
m_socket = zmq_socket (m_context, ZMQ_REP);
+ /* to make sure the watchdog gets tickles form time to time we give a timeout of 500ms */
+ int timeout = 500;
+ zmq_rc = zmq_setsockopt (m_socket, ZMQ_RCVTIMEO, &timeout, sizeof(int));
+ assert(zmq_rc == 0);
+
switch (m_cfg.get_protocol()) {
case TrexRpcServerConfig::RPC_PROT_TCP:
ss << "tcp://*:";
@@ -68,8 +78,8 @@ void TrexRpcServerReqRes::_rpc_thread_cb() {
ss << m_cfg.get_port();
/* bind the scoket */
- int rc = zmq_bind (m_socket, ss.str().c_str());
- if (rc != 0) {
+ zmq_rc = zmq_bind (m_socket, ss.str().c_str());
+ if (zmq_rc != 0) {
throw TrexRpcException("Unable to start ZMQ server at: " + ss.str());
}
@@ -90,6 +100,9 @@ void TrexRpcServerReqRes::_rpc_thread_cb() {
/* must be done from the same thread */
zmq_close(m_socket);
+
+ /* done */
+ m_watchdog->disable_monitor(m_watchdog_handle);
}
bool
@@ -101,10 +114,22 @@ TrexRpcServerReqRes::fetch_one_request(std::string &msg) {
rc = zmq_msg_init(&zmq_msg);
assert(rc == 0);
- rc = zmq_msg_recv (&zmq_msg, m_socket, 0);
+ while (true) {
+ m_watchdog->tickle(m_watchdog_handle);
- if (rc == -1) {
+ rc = zmq_msg_recv (&zmq_msg, m_socket, 0);
+ if (rc != -1) {
+ break;
+ }
+
+ /* timeout ? */
+ if (errno == EAGAIN) {
+ continue;
+ }
+
+ /* error ! */
zmq_msg_close(&zmq_msg);
+
/* normal shutdown and zmq_term was called */
if (errno == ETERM) {
return false;
@@ -113,7 +138,9 @@ TrexRpcServerReqRes::fetch_one_request(std::string &msg) {
}
}
- const char *data = (const char *)zmq_msg_data(&zmq_msg);
+
+
+ const char *data = (const char *)zmq_msg_data(&zmq_msg);
size_t len = zmq_msg_size(&zmq_msg);
msg.append(data, len);