summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2015-10-08 10:23:33 +0200
committerimarom <imarom@cisco.com>2015-10-08 10:23:56 +0200
commit09c9d77dc2f5a89924bd27226727220801a5df13 (patch)
tree3db008dbcc1aba670e2ec691051082949d9c6492
parent74b648a86c16933680b03a736afe3f0305b4f6d2 (diff)
fixed some bugs in the async server
also added affinity to the stateless main object
-rw-r--r--src/gtest/rpc_test.cpp10
-rw-r--r--src/rpc-server/trex_rpc_async_server.cpp6
-rw-r--r--src/rpc-server/trex_rpc_server.cpp13
-rw-r--r--src/rpc-server/trex_rpc_server_api.h4
-rw-r--r--src/rpc-server/trex_rpc_server_mock.cpp35
-rw-r--r--src/stateless/cp/trex_stateless.cpp63
-rw-r--r--src/stateless/cp/trex_stateless.h44
7 files changed, 129 insertions, 46 deletions
diff --git a/src/gtest/rpc_test.cpp b/src/gtest/rpc_test.cpp
index 4084b664..250d5342 100644
--- a/src/gtest/rpc_test.cpp
+++ b/src/gtest/rpc_test.cpp
@@ -42,12 +42,6 @@ protected:
m_verbose = false;
- TrexRpcServerConfig req_resp_cfg = TrexRpcServerConfig(TrexRpcServerConfig::RPC_PROT_TCP, 5050);
- TrexRpcServerConfig async_cfg = TrexRpcServerConfig(TrexRpcServerConfig::RPC_PROT_TCP, 5051);
-
- m_rpc = new TrexRpcServer(req_resp_cfg, async_cfg);
- m_rpc->start();
-
m_context = zmq_ctx_new ();
m_socket = zmq_socket (m_context, ZMQ_REQ);
zmq_connect (m_socket, "tcp://localhost:5050");
@@ -55,9 +49,6 @@ protected:
}
virtual void TearDown() {
- m_rpc->stop();
-
- delete m_rpc;
zmq_close(m_socket);
zmq_term(m_context);
}
@@ -658,3 +649,4 @@ TEST_F(RpcTestOwned, start_stop_traffic) {
send_request(request, response);
EXPECT_EQ(response["result"], "ACK");
}
+
diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp
index 01f03af3..3313e42e 100644
--- a/src/rpc-server/trex_rpc_async_server.cpp
+++ b/src/rpc-server/trex_rpc_async_server.cpp
@@ -87,13 +87,13 @@ TrexRpcServerAsync::_rpc_thread_cb() {
/* relax for some time */
std::this_thread::sleep_for (std::chrono::milliseconds(1000));
-
}
+
+ /* must be closed from the same thread */
+ zmq_close(m_socket);
}
void
TrexRpcServerAsync::_stop_rpc_thread() {
- m_is_running = false;
- this->m_thread->join();
zmq_term(m_context);
}
diff --git a/src/rpc-server/trex_rpc_server.cpp b/src/rpc-server/trex_rpc_server.cpp
index 18265a0e..8749c9b4 100644
--- a/src/rpc-server/trex_rpc_server.cpp
+++ b/src/rpc-server/trex_rpc_server.cpp
@@ -113,13 +113,18 @@ get_current_date_time() {
const std::string TrexRpcServer::s_server_uptime = get_current_date_time();
-TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig &req_resp_cfg,
- const TrexRpcServerConfig &async_cfg) {
+TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg,
+ const TrexRpcServerConfig *async_cfg) {
/* add the request response server */
- m_servers.push_back(new TrexRpcServerReqRes(req_resp_cfg));
+ if (req_resp_cfg) {
+ m_servers.push_back(new TrexRpcServerReqRes(*req_resp_cfg));
+ }
+
/* add async publisher */
- m_servers.push_back(new TrexRpcServerAsync(async_cfg));
+ if (async_cfg) {
+ m_servers.push_back(new TrexRpcServerAsync(*async_cfg));
+ }
}
TrexRpcServer::~TrexRpcServer() {
diff --git a/src/rpc-server/trex_rpc_server_api.h b/src/rpc-server/trex_rpc_server_api.h
index 5a7cad48..327c6eaa 100644
--- a/src/rpc-server/trex_rpc_server_api.h
+++ b/src/rpc-server/trex_rpc_server_api.h
@@ -140,8 +140,8 @@ class TrexRpcServer {
public:
/* creates the collection of servers using configurations */
- TrexRpcServer(const TrexRpcServerConfig &req_resp_cfg,
- const TrexRpcServerConfig &async_cfg);
+ TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg,
+ const TrexRpcServerConfig *async_cfg);
~TrexRpcServer();
diff --git a/src/rpc-server/trex_rpc_server_mock.cpp b/src/rpc-server/trex_rpc_server_mock.cpp
index 32635c75..8dae42d7 100644
--- a/src/rpc-server/trex_rpc_server_mock.cpp
+++ b/src/rpc-server/trex_rpc_server_mock.cpp
@@ -44,31 +44,39 @@ int gtest_main(int argc, char **argv);
int main(int argc, char *argv[]) {
- /* configure the stateless object with 4 ports */
- TrexStateless::configure(4);
+ bool is_gtest = false;
- // gtest ?
+ // gtest ?
if (argc > 1) {
if (string(argv[1]) != "--ut") {
cout << "\n[Usage] " << argv[0] << ": " << " [--ut]\n\n";
exit(-1);
}
- return gtest_main(argc, argv);
+ is_gtest = true;
}
- cout << "\n-= Starting RPC Server Mock =-\n\n";
- cout << "Listening on tcp://localhost:5050 [ZMQ]\n\n";
+ /* configure the stateless object with 4 ports */
+ TrexStatelessCfg cfg;
TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, 5050);
TrexRpcServerConfig rpc_async_cfg(TrexRpcServerConfig::RPC_PROT_TCP, 5051);
- TrexRpcServer rpc(rpc_req_resp_cfg, rpc_async_cfg);
+ cfg.m_port_count = 4;
+ cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg;
+ cfg.m_rpc_async_cfg = &rpc_async_cfg;
+ cfg.m_rpc_server_verbose = (is_gtest ? false : true);
- /* init the RPC server */
- rpc.start();
+ TrexStateless::create(cfg);
- cout << "Setting Server To Full Verbose\n\n";
- rpc.set_verbose(true);
+ /* gtest handling */
+ if (is_gtest) {
+ int rc = gtest_main(argc, argv);
+ TrexStateless::destroy();
+ return rc;
+ }
+
+ cout << "\n-= Starting RPC Server Mock =-\n\n";
+ cout << "Listening on tcp://localhost:5050 [ZMQ]\n\n";
cout << "Server Started\n\n";
@@ -76,7 +84,6 @@ int main(int argc, char *argv[]) {
sleep(1);
}
- rpc.stop();
-
-
+ TrexStateless::destroy();
}
+
diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp
index 92f54bc4..1e7e2dfa 100644
--- a/src/stateless/cp/trex_stateless.cpp
+++ b/src/stateless/cp/trex_stateless.cpp
@@ -21,6 +21,8 @@ limitations under the License.
#include <trex_stateless.h>
#include <trex_stateless_port.h>
+#include <sched.h>
+
using namespace std;
/***********************************************************
@@ -31,36 +33,79 @@ TrexStateless::TrexStateless() {
m_is_configured = false;
}
+
/**
- * one time configuration of the stateless object
+ * creates the singleton stateless object
*
*/
-void TrexStateless::configure(uint8_t port_count) {
+void TrexStateless::create(const TrexStatelessCfg &cfg) {
TrexStateless& instance = get_instance_internal();
+ /* check status */
if (instance.m_is_configured) {
throw TrexException("re-configuration of stateless object is not allowed");
}
- instance.m_port_count = port_count;
- instance.m_ports = new TrexStatelessPort*[port_count];
+ /* pin this process to the current running CPU
+ any new thread will be called on the same CPU
+ (control plane restriction)
+ */
+ cpu_set_t mask;
+ CPU_ZERO(&mask);
+ CPU_SET(sched_getcpu(), &mask);
+ sched_setaffinity(0, sizeof(mask), &mask);
+
+ /* start RPC servers */
+ instance.m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_rpc_async_cfg);
+ instance.m_rpc_server->set_verbose(cfg.m_rpc_server_verbose);
+ instance.m_rpc_server->start();
+
+ /* configure ports */
+
+ instance.m_port_count = cfg.m_port_count;
for (int i = 0; i < instance.m_port_count; i++) {
- instance.m_ports[i] = new TrexStatelessPort(i);
+ instance.m_ports.push_back(new TrexStatelessPort(i));
}
+ /* done */
instance.m_is_configured = true;
}
-TrexStateless::~TrexStateless() {
- for (int i = 0; i < m_port_count; i++) {
- delete m_ports[i];
+/**
+ * destroy the singleton and release all memory
+ *
+ * @author imarom (08-Oct-15)
+ */
+void
+TrexStateless::destroy() {
+ TrexStateless& instance = get_instance_internal();
+
+ if (!instance.m_is_configured) {
+ return;
}
- delete [] m_ports;
+ /* release memory for ports */
+ for (auto port : instance.m_ports) {
+ delete port;
+ }
+ instance.m_ports.clear();
+
+ /* stops the RPC server */
+ instance.m_rpc_server->stop();
+ delete instance.m_rpc_server;
+
+ instance.m_rpc_server = NULL;
+
+ /* done */
+ instance.m_is_configured = false;
}
+/**
+ * fetch a port by ID
+ *
+ */
TrexStatelessPort * TrexStateless::get_port_by_id(uint8_t port_id) {
if (port_id >= m_port_count) {
throw TrexException("index out of range");
diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h
index 74e88846..02eda7e2 100644
--- a/src/stateless/cp/trex_stateless.h
+++ b/src/stateless/cp/trex_stateless.h
@@ -26,6 +26,7 @@ limitations under the License.
#include <stdexcept>
#include <trex_stream.h>
+#include <trex_rpc_server_api.h>
/**
* generic exception for errors
@@ -74,6 +75,27 @@ public:
};
/**
+ * config object for stateless object
+ *
+ * @author imarom (08-Oct-15)
+ */
+class TrexStatelessCfg {
+public:
+ /* default values */
+ TrexStatelessCfg() {
+ m_port_count = 0;
+ m_rpc_req_resp_cfg = NULL;
+ m_rpc_async_cfg = NULL;
+ m_rpc_server_verbose = false;
+ }
+
+ const TrexRpcServerConfig *m_rpc_req_resp_cfg;
+ const TrexRpcServerConfig *m_rpc_async_cfg;
+ bool m_rpc_server_verbose;
+ uint8_t m_port_count;
+};
+
+/**
* defines the T-Rex stateless operation mode
*
*/
@@ -85,7 +107,13 @@ public:
* reconfiguration is not allowed
* an exception will be thrown
*/
- static void configure(uint8_t port_count);
+ static void create(const TrexStatelessCfg &cfg);
+
+ /**
+ * destroy the instance
+ *
+ */
+ static void destroy();
/**
* singleton public get instance
@@ -115,12 +143,11 @@ public:
* fetch all the stats
*
*/
- void encode_stats(Json::Value &global);
+ void encode_stats(Json::Value &global);
protected:
TrexStateless();
- ~TrexStateless();
static TrexStateless& get_instance_internal () {
static TrexStateless instance;
@@ -131,10 +158,17 @@ protected:
TrexStateless(TrexStateless const&) = delete;
void operator=(TrexStateless const&) = delete;
+ /* status */
bool m_is_configured;
- TrexStatelessPort **m_ports;
- uint8_t m_port_count;
+ /* RPC server array */
+ TrexRpcServer *m_rpc_server;
+
+ /* ports */
+ std::vector <TrexStatelessPort *> m_ports;
+ uint8_t m_port_count;
+
+ /* stats */
TrexStatelessStats m_stats;
};