diff options
author | 2015-10-08 10:23:33 +0200 | |
---|---|---|
committer | 2015-10-08 10:23:56 +0200 | |
commit | 09c9d77dc2f5a89924bd27226727220801a5df13 (patch) | |
tree | 3db008dbcc1aba670e2ec691051082949d9c6492 /src | |
parent | 74b648a86c16933680b03a736afe3f0305b4f6d2 (diff) |
fixed some bugs in the async server
also added affinity to the stateless main object
Diffstat (limited to 'src')
-rw-r--r-- | src/gtest/rpc_test.cpp | 10 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_async_server.cpp | 6 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_server.cpp | 13 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_server_api.h | 4 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_server_mock.cpp | 35 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless.cpp | 63 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless.h | 44 |
7 files changed, 129 insertions, 46 deletions
diff --git a/src/gtest/rpc_test.cpp b/src/gtest/rpc_test.cpp index 4084b664..250d5342 100644 --- a/src/gtest/rpc_test.cpp +++ b/src/gtest/rpc_test.cpp @@ -42,12 +42,6 @@ protected: m_verbose = false; - TrexRpcServerConfig req_resp_cfg = TrexRpcServerConfig(TrexRpcServerConfig::RPC_PROT_TCP, 5050); - TrexRpcServerConfig async_cfg = TrexRpcServerConfig(TrexRpcServerConfig::RPC_PROT_TCP, 5051); - - m_rpc = new TrexRpcServer(req_resp_cfg, async_cfg); - m_rpc->start(); - m_context = zmq_ctx_new (); m_socket = zmq_socket (m_context, ZMQ_REQ); zmq_connect (m_socket, "tcp://localhost:5050"); @@ -55,9 +49,6 @@ protected: } virtual void TearDown() { - m_rpc->stop(); - - delete m_rpc; zmq_close(m_socket); zmq_term(m_context); } @@ -658,3 +649,4 @@ TEST_F(RpcTestOwned, start_stop_traffic) { send_request(request, response); EXPECT_EQ(response["result"], "ACK"); } + diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp index 01f03af3..3313e42e 100644 --- a/src/rpc-server/trex_rpc_async_server.cpp +++ b/src/rpc-server/trex_rpc_async_server.cpp @@ -87,13 +87,13 @@ TrexRpcServerAsync::_rpc_thread_cb() { /* relax for some time */ std::this_thread::sleep_for (std::chrono::milliseconds(1000)); - } + + /* must be closed from the same thread */ + zmq_close(m_socket); } void TrexRpcServerAsync::_stop_rpc_thread() { - m_is_running = false; - this->m_thread->join(); zmq_term(m_context); } diff --git a/src/rpc-server/trex_rpc_server.cpp b/src/rpc-server/trex_rpc_server.cpp index 18265a0e..8749c9b4 100644 --- a/src/rpc-server/trex_rpc_server.cpp +++ b/src/rpc-server/trex_rpc_server.cpp @@ -113,13 +113,18 @@ get_current_date_time() { const std::string TrexRpcServer::s_server_uptime = get_current_date_time(); -TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig &req_resp_cfg, - const TrexRpcServerConfig &async_cfg) { +TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg, + const TrexRpcServerConfig *async_cfg) { /* add the request response server */ - m_servers.push_back(new TrexRpcServerReqRes(req_resp_cfg)); + if (req_resp_cfg) { + m_servers.push_back(new TrexRpcServerReqRes(*req_resp_cfg)); + } + /* add async publisher */ - m_servers.push_back(new TrexRpcServerAsync(async_cfg)); + if (async_cfg) { + m_servers.push_back(new TrexRpcServerAsync(*async_cfg)); + } } TrexRpcServer::~TrexRpcServer() { diff --git a/src/rpc-server/trex_rpc_server_api.h b/src/rpc-server/trex_rpc_server_api.h index 5a7cad48..327c6eaa 100644 --- a/src/rpc-server/trex_rpc_server_api.h +++ b/src/rpc-server/trex_rpc_server_api.h @@ -140,8 +140,8 @@ class TrexRpcServer { public: /* creates the collection of servers using configurations */ - TrexRpcServer(const TrexRpcServerConfig &req_resp_cfg, - const TrexRpcServerConfig &async_cfg); + TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg, + const TrexRpcServerConfig *async_cfg); ~TrexRpcServer(); diff --git a/src/rpc-server/trex_rpc_server_mock.cpp b/src/rpc-server/trex_rpc_server_mock.cpp index 32635c75..8dae42d7 100644 --- a/src/rpc-server/trex_rpc_server_mock.cpp +++ b/src/rpc-server/trex_rpc_server_mock.cpp @@ -44,31 +44,39 @@ int gtest_main(int argc, char **argv); int main(int argc, char *argv[]) { - /* configure the stateless object with 4 ports */ - TrexStateless::configure(4); + bool is_gtest = false; - // gtest ? + // gtest ? if (argc > 1) { if (string(argv[1]) != "--ut") { cout << "\n[Usage] " << argv[0] << ": " << " [--ut]\n\n"; exit(-1); } - return gtest_main(argc, argv); + is_gtest = true; } - cout << "\n-= Starting RPC Server Mock =-\n\n"; - cout << "Listening on tcp://localhost:5050 [ZMQ]\n\n"; + /* configure the stateless object with 4 ports */ + TrexStatelessCfg cfg; TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, 5050); TrexRpcServerConfig rpc_async_cfg(TrexRpcServerConfig::RPC_PROT_TCP, 5051); - TrexRpcServer rpc(rpc_req_resp_cfg, rpc_async_cfg); + cfg.m_port_count = 4; + cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg; + cfg.m_rpc_async_cfg = &rpc_async_cfg; + cfg.m_rpc_server_verbose = (is_gtest ? false : true); - /* init the RPC server */ - rpc.start(); + TrexStateless::create(cfg); - cout << "Setting Server To Full Verbose\n\n"; - rpc.set_verbose(true); + /* gtest handling */ + if (is_gtest) { + int rc = gtest_main(argc, argv); + TrexStateless::destroy(); + return rc; + } + + cout << "\n-= Starting RPC Server Mock =-\n\n"; + cout << "Listening on tcp://localhost:5050 [ZMQ]\n\n"; cout << "Server Started\n\n"; @@ -76,7 +84,6 @@ int main(int argc, char *argv[]) { sleep(1); } - rpc.stop(); - - + TrexStateless::destroy(); } + diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index 92f54bc4..1e7e2dfa 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -21,6 +21,8 @@ limitations under the License. #include <trex_stateless.h> #include <trex_stateless_port.h> +#include <sched.h> + using namespace std; /*********************************************************** @@ -31,36 +33,79 @@ TrexStateless::TrexStateless() { m_is_configured = false; } + /** - * one time configuration of the stateless object + * creates the singleton stateless object * */ -void TrexStateless::configure(uint8_t port_count) { +void TrexStateless::create(const TrexStatelessCfg &cfg) { TrexStateless& instance = get_instance_internal(); + /* check status */ if (instance.m_is_configured) { throw TrexException("re-configuration of stateless object is not allowed"); } - instance.m_port_count = port_count; - instance.m_ports = new TrexStatelessPort*[port_count]; + /* pin this process to the current running CPU + any new thread will be called on the same CPU + (control plane restriction) + */ + cpu_set_t mask; + CPU_ZERO(&mask); + CPU_SET(sched_getcpu(), &mask); + sched_setaffinity(0, sizeof(mask), &mask); + + /* start RPC servers */ + instance.m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_rpc_async_cfg); + instance.m_rpc_server->set_verbose(cfg.m_rpc_server_verbose); + instance.m_rpc_server->start(); + + /* configure ports */ + + instance.m_port_count = cfg.m_port_count; for (int i = 0; i < instance.m_port_count; i++) { - instance.m_ports[i] = new TrexStatelessPort(i); + instance.m_ports.push_back(new TrexStatelessPort(i)); } + /* done */ instance.m_is_configured = true; } -TrexStateless::~TrexStateless() { - for (int i = 0; i < m_port_count; i++) { - delete m_ports[i]; +/** + * destroy the singleton and release all memory + * + * @author imarom (08-Oct-15) + */ +void +TrexStateless::destroy() { + TrexStateless& instance = get_instance_internal(); + + if (!instance.m_is_configured) { + return; } - delete [] m_ports; + /* release memory for ports */ + for (auto port : instance.m_ports) { + delete port; + } + instance.m_ports.clear(); + + /* stops the RPC server */ + instance.m_rpc_server->stop(); + delete instance.m_rpc_server; + + instance.m_rpc_server = NULL; + + /* done */ + instance.m_is_configured = false; } +/** + * fetch a port by ID + * + */ TrexStatelessPort * TrexStateless::get_port_by_id(uint8_t port_id) { if (port_id >= m_port_count) { throw TrexException("index out of range"); diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h index 74e88846..02eda7e2 100644 --- a/src/stateless/cp/trex_stateless.h +++ b/src/stateless/cp/trex_stateless.h @@ -26,6 +26,7 @@ limitations under the License. #include <stdexcept> #include <trex_stream.h> +#include <trex_rpc_server_api.h> /** * generic exception for errors @@ -74,6 +75,27 @@ public: }; /** + * config object for stateless object + * + * @author imarom (08-Oct-15) + */ +class TrexStatelessCfg { +public: + /* default values */ + TrexStatelessCfg() { + m_port_count = 0; + m_rpc_req_resp_cfg = NULL; + m_rpc_async_cfg = NULL; + m_rpc_server_verbose = false; + } + + const TrexRpcServerConfig *m_rpc_req_resp_cfg; + const TrexRpcServerConfig *m_rpc_async_cfg; + bool m_rpc_server_verbose; + uint8_t m_port_count; +}; + +/** * defines the T-Rex stateless operation mode * */ @@ -85,7 +107,13 @@ public: * reconfiguration is not allowed * an exception will be thrown */ - static void configure(uint8_t port_count); + static void create(const TrexStatelessCfg &cfg); + + /** + * destroy the instance + * + */ + static void destroy(); /** * singleton public get instance @@ -115,12 +143,11 @@ public: * fetch all the stats * */ - void encode_stats(Json::Value &global); + void encode_stats(Json::Value &global); protected: TrexStateless(); - ~TrexStateless(); static TrexStateless& get_instance_internal () { static TrexStateless instance; @@ -131,10 +158,17 @@ protected: TrexStateless(TrexStateless const&) = delete; void operator=(TrexStateless const&) = delete; + /* status */ bool m_is_configured; - TrexStatelessPort **m_ports; - uint8_t m_port_count; + /* RPC server array */ + TrexRpcServer *m_rpc_server; + + /* ports */ + std::vector <TrexStatelessPort *> m_ports; + uint8_t m_port_count; + + /* stats */ TrexStatelessStats m_stats; }; |