From 09c9d77dc2f5a89924bd27226727220801a5df13 Mon Sep 17 00:00:00 2001 From: imarom Date: Thu, 8 Oct 2015 10:23:33 +0200 Subject: fixed some bugs in the async server also added affinity to the stateless main object --- src/gtest/rpc_test.cpp | 10 +---- src/rpc-server/trex_rpc_async_server.cpp | 6 +-- src/rpc-server/trex_rpc_server.cpp | 13 +++++-- src/rpc-server/trex_rpc_server_api.h | 4 +- src/rpc-server/trex_rpc_server_mock.cpp | 35 +++++++++++------- src/stateless/cp/trex_stateless.cpp | 63 +++++++++++++++++++++++++++----- src/stateless/cp/trex_stateless.h | 44 +++++++++++++++++++--- 7 files changed, 129 insertions(+), 46 deletions(-) diff --git a/src/gtest/rpc_test.cpp b/src/gtest/rpc_test.cpp index 4084b664..250d5342 100644 --- a/src/gtest/rpc_test.cpp +++ b/src/gtest/rpc_test.cpp @@ -42,12 +42,6 @@ protected: m_verbose = false; - TrexRpcServerConfig req_resp_cfg = TrexRpcServerConfig(TrexRpcServerConfig::RPC_PROT_TCP, 5050); - TrexRpcServerConfig async_cfg = TrexRpcServerConfig(TrexRpcServerConfig::RPC_PROT_TCP, 5051); - - m_rpc = new TrexRpcServer(req_resp_cfg, async_cfg); - m_rpc->start(); - m_context = zmq_ctx_new (); m_socket = zmq_socket (m_context, ZMQ_REQ); zmq_connect (m_socket, "tcp://localhost:5050"); @@ -55,9 +49,6 @@ protected: } virtual void TearDown() { - m_rpc->stop(); - - delete m_rpc; zmq_close(m_socket); zmq_term(m_context); } @@ -658,3 +649,4 @@ TEST_F(RpcTestOwned, start_stop_traffic) { send_request(request, response); EXPECT_EQ(response["result"], "ACK"); } + diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp index 01f03af3..3313e42e 100644 --- a/src/rpc-server/trex_rpc_async_server.cpp +++ b/src/rpc-server/trex_rpc_async_server.cpp @@ -87,13 +87,13 @@ TrexRpcServerAsync::_rpc_thread_cb() { /* relax for some time */ std::this_thread::sleep_for (std::chrono::milliseconds(1000)); - } + + /* must be closed from the same thread */ + zmq_close(m_socket); } void TrexRpcServerAsync::_stop_rpc_thread() { - m_is_running = false; - this->m_thread->join(); zmq_term(m_context); } diff --git a/src/rpc-server/trex_rpc_server.cpp b/src/rpc-server/trex_rpc_server.cpp index 18265a0e..8749c9b4 100644 --- a/src/rpc-server/trex_rpc_server.cpp +++ b/src/rpc-server/trex_rpc_server.cpp @@ -113,13 +113,18 @@ get_current_date_time() { const std::string TrexRpcServer::s_server_uptime = get_current_date_time(); -TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig &req_resp_cfg, - const TrexRpcServerConfig &async_cfg) { +TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg, + const TrexRpcServerConfig *async_cfg) { /* add the request response server */ - m_servers.push_back(new TrexRpcServerReqRes(req_resp_cfg)); + if (req_resp_cfg) { + m_servers.push_back(new TrexRpcServerReqRes(*req_resp_cfg)); + } + /* add async publisher */ - m_servers.push_back(new TrexRpcServerAsync(async_cfg)); + if (async_cfg) { + m_servers.push_back(new TrexRpcServerAsync(*async_cfg)); + } } TrexRpcServer::~TrexRpcServer() { diff --git a/src/rpc-server/trex_rpc_server_api.h b/src/rpc-server/trex_rpc_server_api.h index 5a7cad48..327c6eaa 100644 --- a/src/rpc-server/trex_rpc_server_api.h +++ b/src/rpc-server/trex_rpc_server_api.h @@ -140,8 +140,8 @@ class TrexRpcServer { public: /* creates the collection of servers using configurations */ - TrexRpcServer(const TrexRpcServerConfig &req_resp_cfg, - const TrexRpcServerConfig &async_cfg); + TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg, + const TrexRpcServerConfig *async_cfg); ~TrexRpcServer(); diff --git a/src/rpc-server/trex_rpc_server_mock.cpp b/src/rpc-server/trex_rpc_server_mock.cpp index 32635c75..8dae42d7 100644 --- a/src/rpc-server/trex_rpc_server_mock.cpp +++ b/src/rpc-server/trex_rpc_server_mock.cpp @@ -44,31 +44,39 @@ int gtest_main(int argc, char **argv); int main(int argc, char *argv[]) { - /* configure the stateless object with 4 ports */ - TrexStateless::configure(4); + bool is_gtest = false; - // gtest ? + // gtest ? if (argc > 1) { if (string(argv[1]) != "--ut") { cout << "\n[Usage] " << argv[0] << ": " << " [--ut]\n\n"; exit(-1); } - return gtest_main(argc, argv); + is_gtest = true; } - cout << "\n-= Starting RPC Server Mock =-\n\n"; - cout << "Listening on tcp://localhost:5050 [ZMQ]\n\n"; + /* configure the stateless object with 4 ports */ + TrexStatelessCfg cfg; TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, 5050); TrexRpcServerConfig rpc_async_cfg(TrexRpcServerConfig::RPC_PROT_TCP, 5051); - TrexRpcServer rpc(rpc_req_resp_cfg, rpc_async_cfg); + cfg.m_port_count = 4; + cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg; + cfg.m_rpc_async_cfg = &rpc_async_cfg; + cfg.m_rpc_server_verbose = (is_gtest ? false : true); - /* init the RPC server */ - rpc.start(); + TrexStateless::create(cfg); - cout << "Setting Server To Full Verbose\n\n"; - rpc.set_verbose(true); + /* gtest handling */ + if (is_gtest) { + int rc = gtest_main(argc, argv); + TrexStateless::destroy(); + return rc; + } + + cout << "\n-= Starting RPC Server Mock =-\n\n"; + cout << "Listening on tcp://localhost:5050 [ZMQ]\n\n"; cout << "Server Started\n\n"; @@ -76,7 +84,6 @@ int main(int argc, char *argv[]) { sleep(1); } - rpc.stop(); - - + TrexStateless::destroy(); } + diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index 92f54bc4..1e7e2dfa 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -21,6 +21,8 @@ limitations under the License. #include #include +#include + using namespace std; /*********************************************************** @@ -31,36 +33,79 @@ TrexStateless::TrexStateless() { m_is_configured = false; } + /** - * one time configuration of the stateless object + * creates the singleton stateless object * */ -void TrexStateless::configure(uint8_t port_count) { +void TrexStateless::create(const TrexStatelessCfg &cfg) { TrexStateless& instance = get_instance_internal(); + /* check status */ if (instance.m_is_configured) { throw TrexException("re-configuration of stateless object is not allowed"); } - instance.m_port_count = port_count; - instance.m_ports = new TrexStatelessPort*[port_count]; + /* pin this process to the current running CPU + any new thread will be called on the same CPU + (control plane restriction) + */ + cpu_set_t mask; + CPU_ZERO(&mask); + CPU_SET(sched_getcpu(), &mask); + sched_setaffinity(0, sizeof(mask), &mask); + + /* start RPC servers */ + instance.m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_rpc_async_cfg); + instance.m_rpc_server->set_verbose(cfg.m_rpc_server_verbose); + instance.m_rpc_server->start(); + + /* configure ports */ + + instance.m_port_count = cfg.m_port_count; for (int i = 0; i < instance.m_port_count; i++) { - instance.m_ports[i] = new TrexStatelessPort(i); + instance.m_ports.push_back(new TrexStatelessPort(i)); } + /* done */ instance.m_is_configured = true; } -TrexStateless::~TrexStateless() { - for (int i = 0; i < m_port_count; i++) { - delete m_ports[i]; +/** + * destroy the singleton and release all memory + * + * @author imarom (08-Oct-15) + */ +void +TrexStateless::destroy() { + TrexStateless& instance = get_instance_internal(); + + if (!instance.m_is_configured) { + return; } - delete [] m_ports; + /* release memory for ports */ + for (auto port : instance.m_ports) { + delete port; + } + instance.m_ports.clear(); + + /* stops the RPC server */ + instance.m_rpc_server->stop(); + delete instance.m_rpc_server; + + instance.m_rpc_server = NULL; + + /* done */ + instance.m_is_configured = false; } +/** + * fetch a port by ID + * + */ TrexStatelessPort * TrexStateless::get_port_by_id(uint8_t port_id) { if (port_id >= m_port_count) { throw TrexException("index out of range"); diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h index 74e88846..02eda7e2 100644 --- a/src/stateless/cp/trex_stateless.h +++ b/src/stateless/cp/trex_stateless.h @@ -26,6 +26,7 @@ limitations under the License. #include #include +#include /** * generic exception for errors @@ -73,6 +74,27 @@ public: } m_stats; }; +/** + * config object for stateless object + * + * @author imarom (08-Oct-15) + */ +class TrexStatelessCfg { +public: + /* default values */ + TrexStatelessCfg() { + m_port_count = 0; + m_rpc_req_resp_cfg = NULL; + m_rpc_async_cfg = NULL; + m_rpc_server_verbose = false; + } + + const TrexRpcServerConfig *m_rpc_req_resp_cfg; + const TrexRpcServerConfig *m_rpc_async_cfg; + bool m_rpc_server_verbose; + uint8_t m_port_count; +}; + /** * defines the T-Rex stateless operation mode * @@ -85,7 +107,13 @@ public: * reconfiguration is not allowed * an exception will be thrown */ - static void configure(uint8_t port_count); + static void create(const TrexStatelessCfg &cfg); + + /** + * destroy the instance + * + */ + static void destroy(); /** * singleton public get instance @@ -115,12 +143,11 @@ public: * fetch all the stats * */ - void encode_stats(Json::Value &global); + void encode_stats(Json::Value &global); protected: TrexStateless(); - ~TrexStateless(); static TrexStateless& get_instance_internal () { static TrexStateless instance; @@ -131,10 +158,17 @@ protected: TrexStateless(TrexStateless const&) = delete; void operator=(TrexStateless const&) = delete; + /* status */ bool m_is_configured; - TrexStatelessPort **m_ports; - uint8_t m_port_count; + /* RPC server array */ + TrexRpcServer *m_rpc_server; + + /* ports */ + std::vector m_ports; + uint8_t m_port_count; + + /* stats */ TrexStatelessStats m_stats; }; -- cgit 1.2.3-korg