From 1f90e4ed08414c8aa423693c4ee8fa84e7675230 Mon Sep 17 00:00:00 2001 From: imarom Date: Thu, 24 Sep 2015 17:51:04 +0300 Subject: some improvements to the console --- src/rpc-server/commands/trex_rpc_cmd_general.cpp | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'src/rpc-server') diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index 0c9f2c49..940baf3d 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -271,17 +271,17 @@ TrexRpcCmdGetPortStats::_run(const Json::Value ¶ms, Json::Value &result) { result["result"]["status"] = port->get_state_as_string(); - result["result"]["tx_bps"] = Json::Value::UInt64(port->get_port_stats().tx_bps); - result["result"]["tx_pps"] = Json::Value::UInt64(port->get_port_stats().tx_pps); - result["result"]["total_tx_pkts"] = Json::Value::UInt64(port->get_port_stats().total_tx_pkts); - result["result"]["total_tx_bytes"] = Json::Value::UInt64(port->get_port_stats().total_tx_bytes); + result["result"]["tx_bps"] = Json::Value::UInt64(port->get_port_stats().m_stats.tx_bps); + result["result"]["tx_pps"] = Json::Value::UInt64(port->get_port_stats().m_stats.tx_pps); + result["result"]["total_tx_pkts"] = Json::Value::UInt64(port->get_port_stats().m_stats.total_tx_pkts); + result["result"]["total_tx_bytes"] = Json::Value::UInt64(port->get_port_stats().m_stats.total_tx_bytes); - result["result"]["rx_bps"] = Json::Value::UInt64(port->get_port_stats().rx_bps); - result["result"]["rx_pps"] = Json::Value::UInt64(port->get_port_stats().rx_pps); - result["result"]["total_rx_pkts"] = Json::Value::UInt64(port->get_port_stats().total_rx_pkts); - result["result"]["total_rx_bytes"] = Json::Value::UInt64(port->get_port_stats().total_rx_bytes); + result["result"]["rx_bps"] = Json::Value::UInt64(port->get_port_stats().m_stats.rx_bps); + result["result"]["rx_pps"] = Json::Value::UInt64(port->get_port_stats().m_stats.rx_pps); + result["result"]["total_rx_pkts"] = Json::Value::UInt64(port->get_port_stats().m_stats.total_rx_pkts); + result["result"]["total_rx_bytes"] = Json::Value::UInt64(port->get_port_stats().m_stats.total_rx_bytes); - result["result"]["tx_rx_error"] = Json::Value::UInt64(port->get_port_stats().tx_rx_errors); + result["result"]["tx_rx_error"] = Json::Value::UInt64(port->get_port_stats().m_stats.tx_rx_errors); return (TREX_RPC_CMD_OK); } -- cgit From 73574943ae438985f37aae3ea52e9713c55ef62e Mon Sep 17 00:00:00 2001 From: imarom Date: Tue, 29 Sep 2015 19:47:45 +0300 Subject: just a checkpoint on async server --- src/rpc-server/commands/trex_rpc_cmd_general.cpp | 18 +++--- src/rpc-server/trex_rpc_async_server.cpp | 77 ++++++++++++++++++++++++ src/rpc-server/trex_rpc_async_server.h | 53 ++++++++++++++++ 3 files changed, 139 insertions(+), 9 deletions(-) create mode 100644 src/rpc-server/trex_rpc_async_server.cpp create mode 100644 src/rpc-server/trex_rpc_async_server.h (limited to 'src/rpc-server') diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index 940baf3d..7721526c 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -271,17 +271,17 @@ TrexRpcCmdGetPortStats::_run(const Json::Value ¶ms, Json::Value &result) { result["result"]["status"] = port->get_state_as_string(); - result["result"]["tx_bps"] = Json::Value::UInt64(port->get_port_stats().m_stats.tx_bps); - result["result"]["tx_pps"] = Json::Value::UInt64(port->get_port_stats().m_stats.tx_pps); - result["result"]["total_tx_pkts"] = Json::Value::UInt64(port->get_port_stats().m_stats.total_tx_pkts); - result["result"]["total_tx_bytes"] = Json::Value::UInt64(port->get_port_stats().m_stats.total_tx_bytes); + result["result"]["tx_bps"] = Json::Value::UInt64(port->get_stats().m_stats.tx_bps); + result["result"]["tx_pps"] = Json::Value::UInt64(port->get_stats().m_stats.tx_pps); + result["result"]["total_tx_pkts"] = Json::Value::UInt64(port->get_stats().m_stats.total_tx_pkts); + result["result"]["total_tx_bytes"] = Json::Value::UInt64(port->get_stats().m_stats.total_tx_bytes); - result["result"]["rx_bps"] = Json::Value::UInt64(port->get_port_stats().m_stats.rx_bps); - result["result"]["rx_pps"] = Json::Value::UInt64(port->get_port_stats().m_stats.rx_pps); - result["result"]["total_rx_pkts"] = Json::Value::UInt64(port->get_port_stats().m_stats.total_rx_pkts); - result["result"]["total_rx_bytes"] = Json::Value::UInt64(port->get_port_stats().m_stats.total_rx_bytes); + result["result"]["rx_bps"] = Json::Value::UInt64(port->get_stats().m_stats.rx_bps); + result["result"]["rx_pps"] = Json::Value::UInt64(port->get_stats().m_stats.rx_pps); + result["result"]["total_rx_pkts"] = Json::Value::UInt64(port->get_stats().m_stats.total_rx_pkts); + result["result"]["total_rx_bytes"] = Json::Value::UInt64(port->get_stats().m_stats.total_rx_bytes); - result["result"]["tx_rx_error"] = Json::Value::UInt64(port->get_port_stats().m_stats.tx_rx_errors); + result["result"]["tx_rx_error"] = Json::Value::UInt64(port->get_stats().m_stats.tx_rx_errors); return (TREX_RPC_CMD_OK); } diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp new file mode 100644 index 00000000..76549cbd --- /dev/null +++ b/src/rpc-server/trex_rpc_async_server.cpp @@ -0,0 +1,77 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#include +#include +#include +#include + +/** + * ZMQ based publisher server + * + */ +TrexRpcServerAsync::TrexRpcServerAsync(const TrexRpcServerConfig &cfg) : TrexRpcServerInterface(cfg, "publisher") { + /* ZMQ is not thread safe - this should be outside */ + m_context = zmq_ctx_new(); +} + +void +TrexRpcServerAsync::_rpc_thread_cb() { + std::stringstream ss; + + /* create a socket based on the configuration */ + m_socket = zmq_socket (m_context, ZMQ_PUB); + + switch (m_cfg.get_protocol()) { + case TrexRpcServerConfig::RPC_PROT_TCP: + ss << "tcp://*:"; + break; + default: + throw TrexRpcException("unknown protocol for RPC"); + } + + ss << m_cfg.get_port(); + + /* bind the scoket */ + int rc = zmq_bind (m_socket, ss.str().c_str()); + if (rc != 0) { + throw TrexRpcException("Unable to start ZMQ server at: " + ss.str()); + } + + /* while the server is running - publish results */ + while (m_is_running) { + /* update all ports for their stats */ + uint8_t port_count = TrexStateless::get_instance().get_port_count(); + for (uint8_t i = 0; i < port_count; i++) { + TrexStateless::get_instance().get_port_by_id(i).update_stats(); + const TrexPortStats &stats = TrexStateless::get_instance().get_port_by_id(i).get_stats(); + + + + } + } +} + +void +TrexRpcServerAsync::_stop_rpc_thread() { + m_is_running = false; + this->m_thread.join(); + zmq_term(m_context); +} diff --git a/src/rpc-server/trex_rpc_async_server.h b/src/rpc-server/trex_rpc_async_server.h new file mode 100644 index 00000000..d0a1ee90 --- /dev/null +++ b/src/rpc-server/trex_rpc_async_server.h @@ -0,0 +1,53 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#ifndef __TREX_RPC_ASYNC_SERVER_H__ +#define __TREX_RPC_ASYNC_SERVER_H__ + +#include + +/** + * async RPC server + * + * @author imarom (11-Aug-15) + */ +class TrexRpcServerAsync : public TrexRpcServerInterface { +public: + + TrexRpcServerAsync(const TrexRpcServerConfig &cfg); + +protected: + void _rpc_thread_cb(); + void _stop_rpc_thread(); + +private: + + void handle_server_error(const std::string &specific_err); + + static const int RPC_MAX_MSG_SIZE = (20 * 1024); + void *m_context; + void *m_socket; + uint8_t m_msg_buffer[RPC_MAX_MSG_SIZE]; +}; + + +#endif /* __TREX_RPC_ASYNC_SERVER_H__ */ + -- cgit From 4d53d6e2633caed782067965b1b4422b45dab4a2 Mon Sep 17 00:00:00 2001 From: imarom Date: Wed, 7 Oct 2015 14:57:48 +0200 Subject: added async publisher to the RPC server --- src/rpc-server/commands/trex_rpc_cmd_general.cpp | 13 ++------- src/rpc-server/commands/trex_rpc_cmd_stream.cpp | 1 + src/rpc-server/trex_rpc_async_server.cpp | 36 +++++++++++++++++++----- src/rpc-server/trex_rpc_async_server.h | 1 + src/rpc-server/trex_rpc_cmd.cpp | 1 + src/rpc-server/trex_rpc_server.cpp | 6 +++- src/rpc-server/trex_rpc_server_api.h | 6 ++-- src/rpc-server/trex_rpc_server_mock.cpp | 6 ++-- 8 files changed, 47 insertions(+), 23 deletions(-) (limited to 'src/rpc-server') diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index 7721526c..3a0a12f8 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -22,6 +22,7 @@ limitations under the License. #include "trex_rpc_cmds.h" #include #include +#include #include #include @@ -271,17 +272,7 @@ TrexRpcCmdGetPortStats::_run(const Json::Value ¶ms, Json::Value &result) { result["result"]["status"] = port->get_state_as_string(); - result["result"]["tx_bps"] = Json::Value::UInt64(port->get_stats().m_stats.tx_bps); - result["result"]["tx_pps"] = Json::Value::UInt64(port->get_stats().m_stats.tx_pps); - result["result"]["total_tx_pkts"] = Json::Value::UInt64(port->get_stats().m_stats.total_tx_pkts); - result["result"]["total_tx_bytes"] = Json::Value::UInt64(port->get_stats().m_stats.total_tx_bytes); - - result["result"]["rx_bps"] = Json::Value::UInt64(port->get_stats().m_stats.rx_bps); - result["result"]["rx_pps"] = Json::Value::UInt64(port->get_stats().m_stats.rx_pps); - result["result"]["total_rx_pkts"] = Json::Value::UInt64(port->get_stats().m_stats.total_rx_pkts); - result["result"]["total_rx_bytes"] = Json::Value::UInt64(port->get_stats().m_stats.total_rx_bytes); - - result["result"]["tx_rx_error"] = Json::Value::UInt64(port->get_stats().m_stats.tx_rx_errors); + port->encode_stats(result["result"]); return (TREX_RPC_CMD_OK); } diff --git a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp index 1450e1a9..97c2b791 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp @@ -22,6 +22,7 @@ limitations under the License. #include #include #include +#include #include diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp index 76549cbd..40d16dfe 100644 --- a/src/rpc-server/trex_rpc_async_server.cpp +++ b/src/rpc-server/trex_rpc_async_server.cpp @@ -18,10 +18,19 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + +/* required for sleep_for c++ 2011 + https://bugs.launchpad.net/ubuntu/+source/gcc-4.4/+bug/608145 +*/ +#define _GLIBCXX_USE_NANOSLEEP + #include +#include #include #include #include +#include +#include /** * ZMQ based publisher server @@ -32,6 +41,10 @@ TrexRpcServerAsync::TrexRpcServerAsync(const TrexRpcServerConfig &cfg) : TrexRpc m_context = zmq_ctx_new(); } +/** + * publisher thread + * + */ void TrexRpcServerAsync::_rpc_thread_cb() { std::stringstream ss; @@ -57,21 +70,30 @@ TrexRpcServerAsync::_rpc_thread_cb() { /* while the server is running - publish results */ while (m_is_running) { - /* update all ports for their stats */ - uint8_t port_count = TrexStateless::get_instance().get_port_count(); - for (uint8_t i = 0; i < port_count; i++) { - TrexStateless::get_instance().get_port_by_id(i).update_stats(); - const TrexPortStats &stats = TrexStateless::get_instance().get_port_by_id(i).get_stats(); + Json::Value snapshot; + Json::FastWriter writer; + + /* trigger a full update for stats */ + TrexStateless::get_instance().update_stats(); + + /* encode them to JSON */ + TrexStateless::get_instance().encode_stats(snapshot); + + /* write to string and publish */ + std::string snapshot_str = writer.write(snapshot); + zmq_send(m_socket, snapshot_str.c_str(), snapshot_str.size(), 0); + //std::cout << "sending " << snapshot_str << "\n"; + /* relax for some time */ + std::this_thread::sleep_for (std::chrono::milliseconds(1000)); - } } } void TrexRpcServerAsync::_stop_rpc_thread() { m_is_running = false; - this->m_thread.join(); + this->m_thread->join(); zmq_term(m_context); } diff --git a/src/rpc-server/trex_rpc_async_server.h b/src/rpc-server/trex_rpc_async_server.h index d0a1ee90..13525c01 100644 --- a/src/rpc-server/trex_rpc_async_server.h +++ b/src/rpc-server/trex_rpc_async_server.h @@ -23,6 +23,7 @@ limitations under the License. #define __TREX_RPC_ASYNC_SERVER_H__ #include +#include /** * async RPC server diff --git a/src/rpc-server/trex_rpc_cmd.cpp b/src/rpc-server/trex_rpc_cmd.cpp index 6c355e70..f7cb5259 100644 --- a/src/rpc-server/trex_rpc_cmd.cpp +++ b/src/rpc-server/trex_rpc_cmd.cpp @@ -21,6 +21,7 @@ limitations under the License. #include #include #include +#include trex_rpc_cmd_rc_e TrexRpcCommand::run(const Json::Value ¶ms, Json::Value &result) { diff --git a/src/rpc-server/trex_rpc_server.cpp b/src/rpc-server/trex_rpc_server.cpp index 6b8c200d..18265a0e 100644 --- a/src/rpc-server/trex_rpc_server.cpp +++ b/src/rpc-server/trex_rpc_server.cpp @@ -21,6 +21,7 @@ limitations under the License. #include #include +#include #include #include #include @@ -112,10 +113,13 @@ get_current_date_time() { const std::string TrexRpcServer::s_server_uptime = get_current_date_time(); -TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig &req_resp_cfg) { +TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig &req_resp_cfg, + const TrexRpcServerConfig &async_cfg) { /* add the request response server */ m_servers.push_back(new TrexRpcServerReqRes(req_resp_cfg)); + /* add async publisher */ + m_servers.push_back(new TrexRpcServerAsync(async_cfg)); } TrexRpcServer::~TrexRpcServer() { diff --git a/src/rpc-server/trex_rpc_server_api.h b/src/rpc-server/trex_rpc_server_api.h index 06bbe10c..5a7cad48 100644 --- a/src/rpc-server/trex_rpc_server_api.h +++ b/src/rpc-server/trex_rpc_server_api.h @@ -139,8 +139,10 @@ protected: class TrexRpcServer { public: - /* currently only request response server config is required */ - TrexRpcServer(const TrexRpcServerConfig &req_resp_cfg); + /* creates the collection of servers using configurations */ + TrexRpcServer(const TrexRpcServerConfig &req_resp_cfg, + const TrexRpcServerConfig &async_cfg); + ~TrexRpcServer(); /** diff --git a/src/rpc-server/trex_rpc_server_mock.cpp b/src/rpc-server/trex_rpc_server_mock.cpp index 835e28b8..16aa6774 100644 --- a/src/rpc-server/trex_rpc_server_mock.cpp +++ b/src/rpc-server/trex_rpc_server_mock.cpp @@ -59,8 +59,10 @@ int main(int argc, char *argv[]) { cout << "\n-= Starting RPC Server Mock =-\n\n"; cout << "Listening on tcp://localhost:5050 [ZMQ]\n\n"; - TrexRpcServerConfig rpc_cfg(TrexRpcServerConfig::RPC_PROT_TCP, 5050); - TrexRpcServer rpc(rpc_cfg); + TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, 5050); + TrexRpcServerConfig rpc_async_cfg(TrexRpcServerConfig::RPC_PROT_TCP, 5051); + + TrexRpcServer rpc(rpc_req_resp_cfg, rpc_async_cfg); /* init the RPC server */ rpc.start(); -- cgit From 74b648a86c16933680b03a736afe3f0305b4f6d2 Mon Sep 17 00:00:00 2001 From: imarom Date: Thu, 8 Oct 2015 08:34:43 +0200 Subject: some file renaming --- src/rpc-server/commands/trex_rpc_cmd_general.cpp | 2 +- src/rpc-server/commands/trex_rpc_cmd_stream.cpp | 4 ++-- src/rpc-server/trex_rpc_async_server.cpp | 2 +- src/rpc-server/trex_rpc_cmd.cpp | 2 +- src/rpc-server/trex_rpc_server_mock.cpp | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) (limited to 'src/rpc-server') diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index 3a0a12f8..b440fa81 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -21,7 +21,7 @@ limitations under the License. #include "trex_rpc_cmds.h" #include -#include +#include #include #include diff --git a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp index 97c2b791..812d819e 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp @@ -20,8 +20,8 @@ limitations under the License. */ #include "trex_rpc_cmds.h" #include -#include -#include +#include +#include #include #include diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp index 40d16dfe..01f03af3 100644 --- a/src/rpc-server/trex_rpc_async_server.cpp +++ b/src/rpc-server/trex_rpc_async_server.cpp @@ -24,7 +24,7 @@ limitations under the License. */ #define _GLIBCXX_USE_NANOSLEEP -#include +#include #include #include #include diff --git a/src/rpc-server/trex_rpc_cmd.cpp b/src/rpc-server/trex_rpc_cmd.cpp index f7cb5259..920a8d30 100644 --- a/src/rpc-server/trex_rpc_cmd.cpp +++ b/src/rpc-server/trex_rpc_cmd.cpp @@ -20,7 +20,7 @@ limitations under the License. */ #include #include -#include +#include #include trex_rpc_cmd_rc_e diff --git a/src/rpc-server/trex_rpc_server_mock.cpp b/src/rpc-server/trex_rpc_server_mock.cpp index 16aa6774..32635c75 100644 --- a/src/rpc-server/trex_rpc_server_mock.cpp +++ b/src/rpc-server/trex_rpc_server_mock.cpp @@ -20,7 +20,7 @@ limitations under the License. */ #include -#include +#include #include #include -- cgit From 09c9d77dc2f5a89924bd27226727220801a5df13 Mon Sep 17 00:00:00 2001 From: imarom Date: Thu, 8 Oct 2015 10:23:33 +0200 Subject: fixed some bugs in the async server also added affinity to the stateless main object --- src/rpc-server/trex_rpc_async_server.cpp | 6 +++--- src/rpc-server/trex_rpc_server.cpp | 13 ++++++++---- src/rpc-server/trex_rpc_server_api.h | 4 ++-- src/rpc-server/trex_rpc_server_mock.cpp | 35 +++++++++++++++++++------------- 4 files changed, 35 insertions(+), 23 deletions(-) (limited to 'src/rpc-server') diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp index 01f03af3..3313e42e 100644 --- a/src/rpc-server/trex_rpc_async_server.cpp +++ b/src/rpc-server/trex_rpc_async_server.cpp @@ -87,13 +87,13 @@ TrexRpcServerAsync::_rpc_thread_cb() { /* relax for some time */ std::this_thread::sleep_for (std::chrono::milliseconds(1000)); - } + + /* must be closed from the same thread */ + zmq_close(m_socket); } void TrexRpcServerAsync::_stop_rpc_thread() { - m_is_running = false; - this->m_thread->join(); zmq_term(m_context); } diff --git a/src/rpc-server/trex_rpc_server.cpp b/src/rpc-server/trex_rpc_server.cpp index 18265a0e..8749c9b4 100644 --- a/src/rpc-server/trex_rpc_server.cpp +++ b/src/rpc-server/trex_rpc_server.cpp @@ -113,13 +113,18 @@ get_current_date_time() { const std::string TrexRpcServer::s_server_uptime = get_current_date_time(); -TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig &req_resp_cfg, - const TrexRpcServerConfig &async_cfg) { +TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg, + const TrexRpcServerConfig *async_cfg) { /* add the request response server */ - m_servers.push_back(new TrexRpcServerReqRes(req_resp_cfg)); + if (req_resp_cfg) { + m_servers.push_back(new TrexRpcServerReqRes(*req_resp_cfg)); + } + /* add async publisher */ - m_servers.push_back(new TrexRpcServerAsync(async_cfg)); + if (async_cfg) { + m_servers.push_back(new TrexRpcServerAsync(*async_cfg)); + } } TrexRpcServer::~TrexRpcServer() { diff --git a/src/rpc-server/trex_rpc_server_api.h b/src/rpc-server/trex_rpc_server_api.h index 5a7cad48..327c6eaa 100644 --- a/src/rpc-server/trex_rpc_server_api.h +++ b/src/rpc-server/trex_rpc_server_api.h @@ -140,8 +140,8 @@ class TrexRpcServer { public: /* creates the collection of servers using configurations */ - TrexRpcServer(const TrexRpcServerConfig &req_resp_cfg, - const TrexRpcServerConfig &async_cfg); + TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg, + const TrexRpcServerConfig *async_cfg); ~TrexRpcServer(); diff --git a/src/rpc-server/trex_rpc_server_mock.cpp b/src/rpc-server/trex_rpc_server_mock.cpp index 32635c75..8dae42d7 100644 --- a/src/rpc-server/trex_rpc_server_mock.cpp +++ b/src/rpc-server/trex_rpc_server_mock.cpp @@ -44,31 +44,39 @@ int gtest_main(int argc, char **argv); int main(int argc, char *argv[]) { - /* configure the stateless object with 4 ports */ - TrexStateless::configure(4); + bool is_gtest = false; - // gtest ? + // gtest ? if (argc > 1) { if (string(argv[1]) != "--ut") { cout << "\n[Usage] " << argv[0] << ": " << " [--ut]\n\n"; exit(-1); } - return gtest_main(argc, argv); + is_gtest = true; } - cout << "\n-= Starting RPC Server Mock =-\n\n"; - cout << "Listening on tcp://localhost:5050 [ZMQ]\n\n"; + /* configure the stateless object with 4 ports */ + TrexStatelessCfg cfg; TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, 5050); TrexRpcServerConfig rpc_async_cfg(TrexRpcServerConfig::RPC_PROT_TCP, 5051); - TrexRpcServer rpc(rpc_req_resp_cfg, rpc_async_cfg); + cfg.m_port_count = 4; + cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg; + cfg.m_rpc_async_cfg = &rpc_async_cfg; + cfg.m_rpc_server_verbose = (is_gtest ? false : true); - /* init the RPC server */ - rpc.start(); + TrexStateless::create(cfg); - cout << "Setting Server To Full Verbose\n\n"; - rpc.set_verbose(true); + /* gtest handling */ + if (is_gtest) { + int rc = gtest_main(argc, argv); + TrexStateless::destroy(); + return rc; + } + + cout << "\n-= Starting RPC Server Mock =-\n\n"; + cout << "Listening on tcp://localhost:5050 [ZMQ]\n\n"; cout << "Server Started\n\n"; @@ -76,7 +84,6 @@ int main(int argc, char *argv[]) { sleep(1); } - rpc.stop(); - - + TrexStateless::destroy(); } + -- cgit From d7af282dc1cd629c251a937c9aa88a9a5a47030b Mon Sep 17 00:00:00 2001 From: imarom Date: Thu, 8 Oct 2015 17:39:51 +0200 Subject: first integration with DPDK layer --- src/rpc-server/commands/trex_rpc_cmd_general.cpp | 2 +- src/rpc-server/trex_rpc_server_mock.cpp | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) (limited to 'src/rpc-server') diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index b440fa81..ae87d749 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -155,7 +155,7 @@ TrexRpcCmdGetSysInfo::_run(const Json::Value ¶ms, Json::Value &result) { section["uptime"] = TrexRpcServer::get_server_uptime(); /* FIXME: core count */ - section["dp_core_count"] = 1; + section["dp_core_count"] = instance.get_dp_core_count(); section["core_type"] = get_cpu_model(); /* ports */ diff --git a/src/rpc-server/trex_rpc_server_mock.cpp b/src/rpc-server/trex_rpc_server_mock.cpp index 8dae42d7..de43f92f 100644 --- a/src/rpc-server/trex_rpc_server_mock.cpp +++ b/src/rpc-server/trex_rpc_server_mock.cpp @@ -62,11 +62,14 @@ int main(int argc, char *argv[]) { TrexRpcServerConfig rpc_async_cfg(TrexRpcServerConfig::RPC_PROT_TCP, 5051); cfg.m_port_count = 4; + cfg.m_dp_core_count = 2; cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg; cfg.m_rpc_async_cfg = &rpc_async_cfg; cfg.m_rpc_server_verbose = (is_gtest ? false : true); - TrexStateless::create(cfg); + TrexStateless::configure(cfg); + + TrexStateless::get_instance().launch_control_plane(); /* gtest handling */ if (is_gtest) { -- cgit From 6c7880b9881ed6690954f0c29259dd0b584b3970 Mon Sep 17 00:00:00 2001 From: imarom Date: Sun, 11 Oct 2015 14:42:24 +0200 Subject: DP cores now inject a single packet as a dummy to see stats --- src/rpc-server/trex_rpc_async_server.cpp | 12 +++- src/rpc-server/trex_rpc_async_server.h | 2 +- src/rpc-server/trex_rpc_req_resp_server.cpp | 12 +++- src/rpc-server/trex_rpc_req_resp_server.h | 2 +- src/rpc-server/trex_rpc_server.cpp | 9 +-- src/rpc-server/trex_rpc_server_api.h | 7 ++- src/rpc-server/trex_rpc_server_mock.cpp | 92 ----------------------------- 7 files changed, 34 insertions(+), 102 deletions(-) delete mode 100644 src/rpc-server/trex_rpc_server_mock.cpp (limited to 'src/rpc-server') diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp index 3313e42e..f4d21f2f 100644 --- a/src/rpc-server/trex_rpc_async_server.cpp +++ b/src/rpc-server/trex_rpc_async_server.cpp @@ -36,7 +36,7 @@ limitations under the License. * ZMQ based publisher server * */ -TrexRpcServerAsync::TrexRpcServerAsync(const TrexRpcServerConfig &cfg) : TrexRpcServerInterface(cfg, "publisher") { +TrexRpcServerAsync::TrexRpcServerAsync(const TrexRpcServerConfig &cfg, std::mutex *lock) : TrexRpcServerInterface(cfg, "publisher", lock) { /* ZMQ is not thread safe - this should be outside */ m_context = zmq_ctx_new(); } @@ -73,9 +73,19 @@ TrexRpcServerAsync::_rpc_thread_cb() { Json::Value snapshot; Json::FastWriter writer; + /* if lock was provided - take it */ + if (m_lock) { + m_lock->lock(); + } + /* trigger a full update for stats */ TrexStateless::get_instance().update_stats(); + /* done with the lock */ + if (m_lock) { + m_lock->unlock(); + } + /* encode them to JSON */ TrexStateless::get_instance().encode_stats(snapshot); diff --git a/src/rpc-server/trex_rpc_async_server.h b/src/rpc-server/trex_rpc_async_server.h index 13525c01..02d1490e 100644 --- a/src/rpc-server/trex_rpc_async_server.h +++ b/src/rpc-server/trex_rpc_async_server.h @@ -33,7 +33,7 @@ limitations under the License. class TrexRpcServerAsync : public TrexRpcServerInterface { public: - TrexRpcServerAsync(const TrexRpcServerConfig &cfg); + TrexRpcServerAsync(const TrexRpcServerConfig &cfg, std::mutex *lock = NULL); protected: void _rpc_thread_cb(); diff --git a/src/rpc-server/trex_rpc_req_resp_server.cpp b/src/rpc-server/trex_rpc_req_resp_server.cpp index 3d52686c..9147f75d 100644 --- a/src/rpc-server/trex_rpc_req_resp_server.cpp +++ b/src/rpc-server/trex_rpc_req_resp_server.cpp @@ -34,7 +34,7 @@ limitations under the License. * ZMQ based request-response server * */ -TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg) : TrexRpcServerInterface(cfg, "req resp") { +TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock) : TrexRpcServerInterface(cfg, "req resp", lock) { /* ZMQ is not thread safe - this should be outside */ m_context = zmq_ctx_new(); } @@ -127,6 +127,11 @@ void TrexRpcServerReqRes::handle_request(const std::string &request) { int index = 0; + /* if lock was provided, take it */ + if (m_lock) { + m_lock->lock(); + } + /* for every command parsed - launch it */ for (auto command : commands) { Json::Value single_response; @@ -138,6 +143,11 @@ void TrexRpcServerReqRes::handle_request(const std::string &request) { } + /* done with the lock */ + if (m_lock) { + m_lock->unlock(); + } + /* write the JSON to string and sever on ZMQ */ std::string response_str; diff --git a/src/rpc-server/trex_rpc_req_resp_server.h b/src/rpc-server/trex_rpc_req_resp_server.h index 7c1d66d1..1f638adf 100644 --- a/src/rpc-server/trex_rpc_req_resp_server.h +++ b/src/rpc-server/trex_rpc_req_resp_server.h @@ -32,7 +32,7 @@ limitations under the License. class TrexRpcServerReqRes : public TrexRpcServerInterface { public: - TrexRpcServerReqRes(const TrexRpcServerConfig &cfg); + TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock = NULL); protected: void _rpc_thread_cb(); diff --git a/src/rpc-server/trex_rpc_server.cpp b/src/rpc-server/trex_rpc_server.cpp index 8749c9b4..a14e6f97 100644 --- a/src/rpc-server/trex_rpc_server.cpp +++ b/src/rpc-server/trex_rpc_server.cpp @@ -30,7 +30,7 @@ limitations under the License. /************** RPC server interface ***************/ -TrexRpcServerInterface::TrexRpcServerInterface(const TrexRpcServerConfig &cfg, const std::string &name) : m_cfg(cfg), m_name(name) { +TrexRpcServerInterface::TrexRpcServerInterface(const TrexRpcServerConfig &cfg, const std::string &name, std::mutex *lock) : m_cfg(cfg), m_name(name), m_lock(lock) { m_is_running = false; m_is_verbose = false; } @@ -114,16 +114,17 @@ get_current_date_time() { const std::string TrexRpcServer::s_server_uptime = get_current_date_time(); TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg, - const TrexRpcServerConfig *async_cfg) { + const TrexRpcServerConfig *async_cfg, + std::mutex *lock) { /* add the request response server */ if (req_resp_cfg) { - m_servers.push_back(new TrexRpcServerReqRes(*req_resp_cfg)); + m_servers.push_back(new TrexRpcServerReqRes(*req_resp_cfg, lock)); } /* add async publisher */ if (async_cfg) { - m_servers.push_back(new TrexRpcServerAsync(*async_cfg)); + m_servers.push_back(new TrexRpcServerAsync(*async_cfg, lock)); } } diff --git a/src/rpc-server/trex_rpc_server_api.h b/src/rpc-server/trex_rpc_server_api.h index 327c6eaa..ff876ac4 100644 --- a/src/rpc-server/trex_rpc_server_api.h +++ b/src/rpc-server/trex_rpc_server_api.h @@ -24,6 +24,7 @@ limitations under the License. #include #include +#include #include #include #include @@ -68,7 +69,7 @@ private: class TrexRpcServerInterface { public: - TrexRpcServerInterface(const TrexRpcServerConfig &cfg, const std::string &name); + TrexRpcServerInterface(const TrexRpcServerConfig &cfg, const std::string &name, std::mutex *m_lock = NULL); virtual ~TrexRpcServerInterface(); /** @@ -127,6 +128,7 @@ protected: bool m_is_verbose; std::thread *m_thread; std::string m_name; + std::mutex *m_lock; }; /** @@ -141,7 +143,8 @@ public: /* creates the collection of servers using configurations */ TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg, - const TrexRpcServerConfig *async_cfg); + const TrexRpcServerConfig *async_cfg, + std::mutex *m_lock = NULL); ~TrexRpcServer(); diff --git a/src/rpc-server/trex_rpc_server_mock.cpp b/src/rpc-server/trex_rpc_server_mock.cpp deleted file mode 100644 index de43f92f..00000000 --- a/src/rpc-server/trex_rpc_server_mock.cpp +++ /dev/null @@ -1,92 +0,0 @@ -/* - Itay Marom - Cisco Systems, Inc. -*/ - -/* -Copyright (c) 2015-2015 Cisco Systems, Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -#include -#include - -#include -#include - -using namespace std; - -/** - * on simulation this is not rebuild every version - * (improved stub) - * - */ -extern "C" const char * get_build_date(void){ - return (__DATE__); -} - -extern "C" const char * get_build_time(void){ - return (__TIME__ ); -} - -int gtest_main(int argc, char **argv); - -int main(int argc, char *argv[]) { - - bool is_gtest = false; - - // gtest ? - if (argc > 1) { - if (string(argv[1]) != "--ut") { - cout << "\n[Usage] " << argv[0] << ": " << " [--ut]\n\n"; - exit(-1); - } - is_gtest = true; - } - - /* configure the stateless object with 4 ports */ - TrexStatelessCfg cfg; - - TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, 5050); - TrexRpcServerConfig rpc_async_cfg(TrexRpcServerConfig::RPC_PROT_TCP, 5051); - - cfg.m_port_count = 4; - cfg.m_dp_core_count = 2; - cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg; - cfg.m_rpc_async_cfg = &rpc_async_cfg; - cfg.m_rpc_server_verbose = (is_gtest ? false : true); - - TrexStateless::configure(cfg); - - TrexStateless::get_instance().launch_control_plane(); - - /* gtest handling */ - if (is_gtest) { - int rc = gtest_main(argc, argv); - TrexStateless::destroy(); - return rc; - } - - cout << "\n-= Starting RPC Server Mock =-\n\n"; - cout << "Listening on tcp://localhost:5050 [ZMQ]\n\n"; - - cout << "Server Started\n\n"; - - while (true) { - sleep(1); - } - - TrexStateless::destroy(); -} - -- cgit