summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xlinux/ws_main.py1
-rw-r--r--src/rpc-server/commands/trex_rpc_cmd_general.cpp18
-rw-r--r--src/rpc-server/trex_rpc_async_server.cpp77
-rw-r--r--src/rpc-server/trex_rpc_async_server.h53
-rw-r--r--src/stateless/trex_stateless_api.h13
5 files changed, 147 insertions, 15 deletions
diff --git a/linux/ws_main.py b/linux/ws_main.py
index 8ad3e5ba..f1d064cb 100755
--- a/linux/ws_main.py
+++ b/linux/ws_main.py
@@ -151,6 +151,7 @@ rpc_server_src = SrcGroup(dir='src/rpc-server/',
src_list=[
'trex_rpc_server.cpp',
'trex_rpc_req_resp_server.cpp',
+ 'trex_rpc_async_server.cpp',
'trex_rpc_jsonrpc_v2_parser.cpp',
'trex_rpc_cmds_table.cpp',
'trex_rpc_cmd.cpp',
diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
index 940baf3d..7721526c 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
@@ -271,17 +271,17 @@ TrexRpcCmdGetPortStats::_run(const Json::Value &params, Json::Value &result) {
result["result"]["status"] = port->get_state_as_string();
- result["result"]["tx_bps"] = Json::Value::UInt64(port->get_port_stats().m_stats.tx_bps);
- result["result"]["tx_pps"] = Json::Value::UInt64(port->get_port_stats().m_stats.tx_pps);
- result["result"]["total_tx_pkts"] = Json::Value::UInt64(port->get_port_stats().m_stats.total_tx_pkts);
- result["result"]["total_tx_bytes"] = Json::Value::UInt64(port->get_port_stats().m_stats.total_tx_bytes);
+ result["result"]["tx_bps"] = Json::Value::UInt64(port->get_stats().m_stats.tx_bps);
+ result["result"]["tx_pps"] = Json::Value::UInt64(port->get_stats().m_stats.tx_pps);
+ result["result"]["total_tx_pkts"] = Json::Value::UInt64(port->get_stats().m_stats.total_tx_pkts);
+ result["result"]["total_tx_bytes"] = Json::Value::UInt64(port->get_stats().m_stats.total_tx_bytes);
- result["result"]["rx_bps"] = Json::Value::UInt64(port->get_port_stats().m_stats.rx_bps);
- result["result"]["rx_pps"] = Json::Value::UInt64(port->get_port_stats().m_stats.rx_pps);
- result["result"]["total_rx_pkts"] = Json::Value::UInt64(port->get_port_stats().m_stats.total_rx_pkts);
- result["result"]["total_rx_bytes"] = Json::Value::UInt64(port->get_port_stats().m_stats.total_rx_bytes);
+ result["result"]["rx_bps"] = Json::Value::UInt64(port->get_stats().m_stats.rx_bps);
+ result["result"]["rx_pps"] = Json::Value::UInt64(port->get_stats().m_stats.rx_pps);
+ result["result"]["total_rx_pkts"] = Json::Value::UInt64(port->get_stats().m_stats.total_rx_pkts);
+ result["result"]["total_rx_bytes"] = Json::Value::UInt64(port->get_stats().m_stats.total_rx_bytes);
- result["result"]["tx_rx_error"] = Json::Value::UInt64(port->get_port_stats().m_stats.tx_rx_errors);
+ result["result"]["tx_rx_error"] = Json::Value::UInt64(port->get_stats().m_stats.tx_rx_errors);
return (TREX_RPC_CMD_OK);
}
diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp
new file mode 100644
index 00000000..76549cbd
--- /dev/null
+++ b/src/rpc-server/trex_rpc_async_server.cpp
@@ -0,0 +1,77 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#include <trex_stateless_api.h>
+#include <trex_rpc_async_server.h>
+#include <zmq.h>
+#include <json/json.h>
+
+/**
+ * ZMQ based publisher server
+ *
+ */
+TrexRpcServerAsync::TrexRpcServerAsync(const TrexRpcServerConfig &cfg) : TrexRpcServerInterface(cfg, "publisher") {
+ /* ZMQ is not thread safe - this should be outside */
+ m_context = zmq_ctx_new();
+}
+
+void
+TrexRpcServerAsync::_rpc_thread_cb() {
+ std::stringstream ss;
+
+ /* create a socket based on the configuration */
+ m_socket = zmq_socket (m_context, ZMQ_PUB);
+
+ switch (m_cfg.get_protocol()) {
+ case TrexRpcServerConfig::RPC_PROT_TCP:
+ ss << "tcp://*:";
+ break;
+ default:
+ throw TrexRpcException("unknown protocol for RPC");
+ }
+
+ ss << m_cfg.get_port();
+
+ /* bind the scoket */
+ int rc = zmq_bind (m_socket, ss.str().c_str());
+ if (rc != 0) {
+ throw TrexRpcException("Unable to start ZMQ server at: " + ss.str());
+ }
+
+ /* while the server is running - publish results */
+ while (m_is_running) {
+ /* update all ports for their stats */
+ uint8_t port_count = TrexStateless::get_instance().get_port_count();
+ for (uint8_t i = 0; i < port_count; i++) {
+ TrexStateless::get_instance().get_port_by_id(i).update_stats();
+ const TrexPortStats &stats = TrexStateless::get_instance().get_port_by_id(i).get_stats();
+
+
+
+ }
+ }
+}
+
+void
+TrexRpcServerAsync::_stop_rpc_thread() {
+ m_is_running = false;
+ this->m_thread.join();
+ zmq_term(m_context);
+}
diff --git a/src/rpc-server/trex_rpc_async_server.h b/src/rpc-server/trex_rpc_async_server.h
new file mode 100644
index 00000000..d0a1ee90
--- /dev/null
+++ b/src/rpc-server/trex_rpc_async_server.h
@@ -0,0 +1,53 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#ifndef __TREX_RPC_ASYNC_SERVER_H__
+#define __TREX_RPC_ASYNC_SERVER_H__
+
+#include <trex_rpc_server_api.h>
+
+/**
+ * async RPC server
+ *
+ * @author imarom (11-Aug-15)
+ */
+class TrexRpcServerAsync : public TrexRpcServerInterface {
+public:
+
+ TrexRpcServerAsync(const TrexRpcServerConfig &cfg);
+
+protected:
+ void _rpc_thread_cb();
+ void _stop_rpc_thread();
+
+private:
+
+ void handle_server_error(const std::string &specific_err);
+
+ static const int RPC_MAX_MSG_SIZE = (20 * 1024);
+ void *m_context;
+ void *m_socket;
+ uint8_t m_msg_buffer[RPC_MAX_MSG_SIZE];
+};
+
+
+#endif /* __TREX_RPC_ASYNC_SERVER_H__ */
+
diff --git a/src/stateless/trex_stateless_api.h b/src/stateless/trex_stateless_api.h
index ca093e03..fd88baf7 100644
--- a/src/stateless/trex_stateless_api.h
+++ b/src/stateless/trex_stateless_api.h
@@ -181,13 +181,14 @@ public:
}
- const TrexPortStats & get_port_stats(void) {
- /* scrabble */
- m_stats.m_stats.tx_bps += 1 + rand() % 100;
- m_stats.m_stats.tx_pps += 1 + rand() % 10;
- m_stats.m_stats.total_tx_bytes += 1 + rand() % 10;
- m_stats.m_stats.total_tx_pkts += 1 + rand() % 5;
+ /**
+ * update the values of the stats
+ *
+ * @author imarom (24-Sep-15)
+ */
+ void update_stats();
+ const TrexPortStats & get_stats() {
return m_stats;
}