From 73574943ae438985f37aae3ea52e9713c55ef62e Mon Sep 17 00:00:00 2001 From: imarom Date: Tue, 29 Sep 2015 19:47:45 +0300 Subject: just a checkpoint on async server --- linux/ws_main.py | 1 + src/rpc-server/commands/trex_rpc_cmd_general.cpp | 18 +++--- src/rpc-server/trex_rpc_async_server.cpp | 77 ++++++++++++++++++++++++ src/rpc-server/trex_rpc_async_server.h | 53 ++++++++++++++++ src/stateless/trex_stateless_api.h | 13 ++-- 5 files changed, 147 insertions(+), 15 deletions(-) create mode 100644 src/rpc-server/trex_rpc_async_server.cpp create mode 100644 src/rpc-server/trex_rpc_async_server.h diff --git a/linux/ws_main.py b/linux/ws_main.py index 8ad3e5ba..f1d064cb 100755 --- a/linux/ws_main.py +++ b/linux/ws_main.py @@ -151,6 +151,7 @@ rpc_server_src = SrcGroup(dir='src/rpc-server/', src_list=[ 'trex_rpc_server.cpp', 'trex_rpc_req_resp_server.cpp', + 'trex_rpc_async_server.cpp', 'trex_rpc_jsonrpc_v2_parser.cpp', 'trex_rpc_cmds_table.cpp', 'trex_rpc_cmd.cpp', diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index 940baf3d..7721526c 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -271,17 +271,17 @@ TrexRpcCmdGetPortStats::_run(const Json::Value ¶ms, Json::Value &result) { result["result"]["status"] = port->get_state_as_string(); - result["result"]["tx_bps"] = Json::Value::UInt64(port->get_port_stats().m_stats.tx_bps); - result["result"]["tx_pps"] = Json::Value::UInt64(port->get_port_stats().m_stats.tx_pps); - result["result"]["total_tx_pkts"] = Json::Value::UInt64(port->get_port_stats().m_stats.total_tx_pkts); - result["result"]["total_tx_bytes"] = Json::Value::UInt64(port->get_port_stats().m_stats.total_tx_bytes); + result["result"]["tx_bps"] = Json::Value::UInt64(port->get_stats().m_stats.tx_bps); + result["result"]["tx_pps"] = Json::Value::UInt64(port->get_stats().m_stats.tx_pps); + result["result"]["total_tx_pkts"] = Json::Value::UInt64(port->get_stats().m_stats.total_tx_pkts); + result["result"]["total_tx_bytes"] = Json::Value::UInt64(port->get_stats().m_stats.total_tx_bytes); - result["result"]["rx_bps"] = Json::Value::UInt64(port->get_port_stats().m_stats.rx_bps); - result["result"]["rx_pps"] = Json::Value::UInt64(port->get_port_stats().m_stats.rx_pps); - result["result"]["total_rx_pkts"] = Json::Value::UInt64(port->get_port_stats().m_stats.total_rx_pkts); - result["result"]["total_rx_bytes"] = Json::Value::UInt64(port->get_port_stats().m_stats.total_rx_bytes); + result["result"]["rx_bps"] = Json::Value::UInt64(port->get_stats().m_stats.rx_bps); + result["result"]["rx_pps"] = Json::Value::UInt64(port->get_stats().m_stats.rx_pps); + result["result"]["total_rx_pkts"] = Json::Value::UInt64(port->get_stats().m_stats.total_rx_pkts); + result["result"]["total_rx_bytes"] = Json::Value::UInt64(port->get_stats().m_stats.total_rx_bytes); - result["result"]["tx_rx_error"] = Json::Value::UInt64(port->get_port_stats().m_stats.tx_rx_errors); + result["result"]["tx_rx_error"] = Json::Value::UInt64(port->get_stats().m_stats.tx_rx_errors); return (TREX_RPC_CMD_OK); } diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp new file mode 100644 index 00000000..76549cbd --- /dev/null +++ b/src/rpc-server/trex_rpc_async_server.cpp @@ -0,0 +1,77 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#include +#include +#include +#include + +/** + * ZMQ based publisher server + * + */ +TrexRpcServerAsync::TrexRpcServerAsync(const TrexRpcServerConfig &cfg) : TrexRpcServerInterface(cfg, "publisher") { + /* ZMQ is not thread safe - this should be outside */ + m_context = zmq_ctx_new(); +} + +void +TrexRpcServerAsync::_rpc_thread_cb() { + std::stringstream ss; + + /* create a socket based on the configuration */ + m_socket = zmq_socket (m_context, ZMQ_PUB); + + switch (m_cfg.get_protocol()) { + case TrexRpcServerConfig::RPC_PROT_TCP: + ss << "tcp://*:"; + break; + default: + throw TrexRpcException("unknown protocol for RPC"); + } + + ss << m_cfg.get_port(); + + /* bind the scoket */ + int rc = zmq_bind (m_socket, ss.str().c_str()); + if (rc != 0) { + throw TrexRpcException("Unable to start ZMQ server at: " + ss.str()); + } + + /* while the server is running - publish results */ + while (m_is_running) { + /* update all ports for their stats */ + uint8_t port_count = TrexStateless::get_instance().get_port_count(); + for (uint8_t i = 0; i < port_count; i++) { + TrexStateless::get_instance().get_port_by_id(i).update_stats(); + const TrexPortStats &stats = TrexStateless::get_instance().get_port_by_id(i).get_stats(); + + + + } + } +} + +void +TrexRpcServerAsync::_stop_rpc_thread() { + m_is_running = false; + this->m_thread.join(); + zmq_term(m_context); +} diff --git a/src/rpc-server/trex_rpc_async_server.h b/src/rpc-server/trex_rpc_async_server.h new file mode 100644 index 00000000..d0a1ee90 --- /dev/null +++ b/src/rpc-server/trex_rpc_async_server.h @@ -0,0 +1,53 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#ifndef __TREX_RPC_ASYNC_SERVER_H__ +#define __TREX_RPC_ASYNC_SERVER_H__ + +#include + +/** + * async RPC server + * + * @author imarom (11-Aug-15) + */ +class TrexRpcServerAsync : public TrexRpcServerInterface { +public: + + TrexRpcServerAsync(const TrexRpcServerConfig &cfg); + +protected: + void _rpc_thread_cb(); + void _stop_rpc_thread(); + +private: + + void handle_server_error(const std::string &specific_err); + + static const int RPC_MAX_MSG_SIZE = (20 * 1024); + void *m_context; + void *m_socket; + uint8_t m_msg_buffer[RPC_MAX_MSG_SIZE]; +}; + + +#endif /* __TREX_RPC_ASYNC_SERVER_H__ */ + diff --git a/src/stateless/trex_stateless_api.h b/src/stateless/trex_stateless_api.h index ca093e03..fd88baf7 100644 --- a/src/stateless/trex_stateless_api.h +++ b/src/stateless/trex_stateless_api.h @@ -181,13 +181,14 @@ public: } - const TrexPortStats & get_port_stats(void) { - /* scrabble */ - m_stats.m_stats.tx_bps += 1 + rand() % 100; - m_stats.m_stats.tx_pps += 1 + rand() % 10; - m_stats.m_stats.total_tx_bytes += 1 + rand() % 10; - m_stats.m_stats.total_tx_pkts += 1 + rand() % 5; + /** + * update the values of the stats + * + * @author imarom (24-Sep-15) + */ + void update_stats(); + const TrexPortStats & get_stats() { return m_stats; } -- cgit 1.2.3-korg