summaryrefslogtreecommitdiffstats
path: root/src/rpc-server
diff options
context:
space:
mode:
Diffstat (limited to 'src/rpc-server')
-rw-r--r--src/rpc-server/commands/trex_rpc_cmd_general.cpp17
-rw-r--r--src/rpc-server/commands/trex_rpc_cmd_stream.cpp11
-rw-r--r--src/rpc-server/trex_rpc_async_server.cpp109
-rw-r--r--src/rpc-server/trex_rpc_async_server.h54
-rw-r--r--src/rpc-server/trex_rpc_cmd.cpp3
-rw-r--r--src/rpc-server/trex_rpc_req_resp_server.cpp12
-rw-r--r--src/rpc-server/trex_rpc_req_resp_server.h2
-rw-r--r--src/rpc-server/trex_rpc_server.cpp16
-rw-r--r--src/rpc-server/trex_rpc_server_api.h11
-rw-r--r--src/rpc-server/trex_rpc_server_mock.cpp80
10 files changed, 208 insertions, 107 deletions
diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
index 0c9f2c49..ae87d749 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
@@ -21,7 +21,8 @@ limitations under the License.
#include "trex_rpc_cmds.h"
#include <trex_rpc_server_api.h>
-#include <trex_stateless_api.h>
+#include <trex_stateless.h>
+#include <trex_stateless_port.h>
#include <trex_rpc_cmds_table.h>
#include <fstream>
@@ -154,7 +155,7 @@ TrexRpcCmdGetSysInfo::_run(const Json::Value &params, Json::Value &result) {
section["uptime"] = TrexRpcServer::get_server_uptime();
/* FIXME: core count */
- section["dp_core_count"] = 1;
+ section["dp_core_count"] = instance.get_dp_core_count();
section["core_type"] = get_cpu_model();
/* ports */
@@ -271,17 +272,7 @@ TrexRpcCmdGetPortStats::_run(const Json::Value &params, Json::Value &result) {
result["result"]["status"] = port->get_state_as_string();
- result["result"]["tx_bps"] = Json::Value::UInt64(port->get_port_stats().tx_bps);
- result["result"]["tx_pps"] = Json::Value::UInt64(port->get_port_stats().tx_pps);
- result["result"]["total_tx_pkts"] = Json::Value::UInt64(port->get_port_stats().total_tx_pkts);
- result["result"]["total_tx_bytes"] = Json::Value::UInt64(port->get_port_stats().total_tx_bytes);
-
- result["result"]["rx_bps"] = Json::Value::UInt64(port->get_port_stats().rx_bps);
- result["result"]["rx_pps"] = Json::Value::UInt64(port->get_port_stats().rx_pps);
- result["result"]["total_rx_pkts"] = Json::Value::UInt64(port->get_port_stats().total_rx_pkts);
- result["result"]["total_rx_bytes"] = Json::Value::UInt64(port->get_port_stats().total_rx_bytes);
-
- result["result"]["tx_rx_error"] = Json::Value::UInt64(port->get_port_stats().tx_rx_errors);
+ port->encode_stats(result["result"]);
return (TREX_RPC_CMD_OK);
}
diff --git a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp
index 1450e1a9..20107411 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp
@@ -1,5 +1,5 @@
/*
- Itay Marom
+ Itay Marom, Dan Klein
Cisco Systems, Inc.
*/
@@ -20,8 +20,9 @@ limitations under the License.
*/
#include "trex_rpc_cmds.h"
#include <trex_rpc_server_api.h>
-#include <trex_stream_api.h>
-#include <trex_stateless_api.h>
+#include <trex_stream.h>
+#include <trex_stateless.h>
+#include <trex_stateless_port.h>
#include <iostream>
@@ -107,7 +108,7 @@ TrexRpcCmdAddStream::_run(const Json::Value &params, Json::Value &result) {
if (stream->m_rx_check.m_enable) {
stream->m_rx_check.m_stream_id = parse_int(rx, "stream_id", result);
stream->m_rx_check.m_seq_enabled = parse_bool(rx, "seq_enabled", result);
- stream->m_rx_check.m_latency = parse_bool(rx, "latency", result);
+ stream->m_rx_check.m_latency = parse_bool(rx, "latency_enabled", result);
}
/* make sure this is a valid stream to add */
@@ -232,7 +233,7 @@ TrexRpcCmdAddStream::parse_vm_instr_flow_var(const Json::Value &inst, TrexStream
void
TrexRpcCmdAddStream::parse_vm_instr_write_flow_var(const Json::Value &inst, TrexStream *stream, Json::Value &result) {
- std::string flow_var_name = parse_string(inst, "flow_var_name", result);
+ std::string flow_var_name = parse_string(inst, "name", result);
uint16_t pkt_offset = parse_uint16(inst, "pkt_offset", result);
int add_value = parse_int(inst, "add_value", result);
bool is_big_endian = parse_bool(inst, "is_big_endian", result);
diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp
new file mode 100644
index 00000000..f4d21f2f
--- /dev/null
+++ b/src/rpc-server/trex_rpc_async_server.cpp
@@ -0,0 +1,109 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+/* required for sleep_for c++ 2011
+ https://bugs.launchpad.net/ubuntu/+source/gcc-4.4/+bug/608145
+*/
+#define _GLIBCXX_USE_NANOSLEEP
+
+#include <trex_stateless.h>
+#include <trex_stateless_port.h>
+#include <trex_rpc_async_server.h>
+#include <zmq.h>
+#include <json/json.h>
+#include <string>
+#include <iostream>
+
+/**
+ * ZMQ based publisher server
+ *
+ */
+TrexRpcServerAsync::TrexRpcServerAsync(const TrexRpcServerConfig &cfg, std::mutex *lock) : TrexRpcServerInterface(cfg, "publisher", lock) {
+ /* ZMQ is not thread safe - this should be outside */
+ m_context = zmq_ctx_new();
+}
+
+/**
+ * publisher thread
+ *
+ */
+void
+TrexRpcServerAsync::_rpc_thread_cb() {
+ std::stringstream ss;
+
+ /* create a socket based on the configuration */
+ m_socket = zmq_socket (m_context, ZMQ_PUB);
+
+ switch (m_cfg.get_protocol()) {
+ case TrexRpcServerConfig::RPC_PROT_TCP:
+ ss << "tcp://*:";
+ break;
+ default:
+ throw TrexRpcException("unknown protocol for RPC");
+ }
+
+ ss << m_cfg.get_port();
+
+ /* bind the scoket */
+ int rc = zmq_bind (m_socket, ss.str().c_str());
+ if (rc != 0) {
+ throw TrexRpcException("Unable to start ZMQ server at: " + ss.str());
+ }
+
+ /* while the server is running - publish results */
+ while (m_is_running) {
+ Json::Value snapshot;
+ Json::FastWriter writer;
+
+ /* if lock was provided - take it */
+ if (m_lock) {
+ m_lock->lock();
+ }
+
+ /* trigger a full update for stats */
+ TrexStateless::get_instance().update_stats();
+
+ /* done with the lock */
+ if (m_lock) {
+ m_lock->unlock();
+ }
+
+ /* encode them to JSON */
+ TrexStateless::get_instance().encode_stats(snapshot);
+
+ /* write to string and publish */
+ std::string snapshot_str = writer.write(snapshot);
+
+ zmq_send(m_socket, snapshot_str.c_str(), snapshot_str.size(), 0);
+ //std::cout << "sending " << snapshot_str << "\n";
+
+ /* relax for some time */
+ std::this_thread::sleep_for (std::chrono::milliseconds(1000));
+ }
+
+ /* must be closed from the same thread */
+ zmq_close(m_socket);
+}
+
+void
+TrexRpcServerAsync::_stop_rpc_thread() {
+ zmq_term(m_context);
+}
diff --git a/src/rpc-server/trex_rpc_async_server.h b/src/rpc-server/trex_rpc_async_server.h
new file mode 100644
index 00000000..02d1490e
--- /dev/null
+++ b/src/rpc-server/trex_rpc_async_server.h
@@ -0,0 +1,54 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#ifndef __TREX_RPC_ASYNC_SERVER_H__
+#define __TREX_RPC_ASYNC_SERVER_H__
+
+#include <trex_rpc_server_api.h>
+#include <trex_stateless_port.h>
+
+/**
+ * async RPC server
+ *
+ * @author imarom (11-Aug-15)
+ */
+class TrexRpcServerAsync : public TrexRpcServerInterface {
+public:
+
+ TrexRpcServerAsync(const TrexRpcServerConfig &cfg, std::mutex *lock = NULL);
+
+protected:
+ void _rpc_thread_cb();
+ void _stop_rpc_thread();
+
+private:
+
+ void handle_server_error(const std::string &specific_err);
+
+ static const int RPC_MAX_MSG_SIZE = (20 * 1024);
+ void *m_context;
+ void *m_socket;
+ uint8_t m_msg_buffer[RPC_MAX_MSG_SIZE];
+};
+
+
+#endif /* __TREX_RPC_ASYNC_SERVER_H__ */
+
diff --git a/src/rpc-server/trex_rpc_cmd.cpp b/src/rpc-server/trex_rpc_cmd.cpp
index 6c355e70..920a8d30 100644
--- a/src/rpc-server/trex_rpc_cmd.cpp
+++ b/src/rpc-server/trex_rpc_cmd.cpp
@@ -20,7 +20,8 @@ limitations under the License.
*/
#include <trex_rpc_cmd_api.h>
#include <trex_rpc_server_api.h>
-#include <trex_stateless_api.h>
+#include <trex_stateless.h>
+#include <trex_stateless_port.h>
trex_rpc_cmd_rc_e
TrexRpcCommand::run(const Json::Value &params, Json::Value &result) {
diff --git a/src/rpc-server/trex_rpc_req_resp_server.cpp b/src/rpc-server/trex_rpc_req_resp_server.cpp
index 3d52686c..9147f75d 100644
--- a/src/rpc-server/trex_rpc_req_resp_server.cpp
+++ b/src/rpc-server/trex_rpc_req_resp_server.cpp
@@ -34,7 +34,7 @@ limitations under the License.
* ZMQ based request-response server
*
*/
-TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg) : TrexRpcServerInterface(cfg, "req resp") {
+TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock) : TrexRpcServerInterface(cfg, "req resp", lock) {
/* ZMQ is not thread safe - this should be outside */
m_context = zmq_ctx_new();
}
@@ -127,6 +127,11 @@ void TrexRpcServerReqRes::handle_request(const std::string &request) {
int index = 0;
+ /* if lock was provided, take it */
+ if (m_lock) {
+ m_lock->lock();
+ }
+
/* for every command parsed - launch it */
for (auto command : commands) {
Json::Value single_response;
@@ -138,6 +143,11 @@ void TrexRpcServerReqRes::handle_request(const std::string &request) {
}
+ /* done with the lock */
+ if (m_lock) {
+ m_lock->unlock();
+ }
+
/* write the JSON to string and sever on ZMQ */
std::string response_str;
diff --git a/src/rpc-server/trex_rpc_req_resp_server.h b/src/rpc-server/trex_rpc_req_resp_server.h
index 7c1d66d1..1f638adf 100644
--- a/src/rpc-server/trex_rpc_req_resp_server.h
+++ b/src/rpc-server/trex_rpc_req_resp_server.h
@@ -32,7 +32,7 @@ limitations under the License.
class TrexRpcServerReqRes : public TrexRpcServerInterface {
public:
- TrexRpcServerReqRes(const TrexRpcServerConfig &cfg);
+ TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock = NULL);
protected:
void _rpc_thread_cb();
diff --git a/src/rpc-server/trex_rpc_server.cpp b/src/rpc-server/trex_rpc_server.cpp
index 6b8c200d..a14e6f97 100644
--- a/src/rpc-server/trex_rpc_server.cpp
+++ b/src/rpc-server/trex_rpc_server.cpp
@@ -21,6 +21,7 @@ limitations under the License.
#include <trex_rpc_server_api.h>
#include <trex_rpc_req_resp_server.h>
+#include <trex_rpc_async_server.h>
#include <trex_rpc_jsonrpc_v2_parser.h>
#include <unistd.h>
#include <zmq.h>
@@ -29,7 +30,7 @@ limitations under the License.
/************** RPC server interface ***************/
-TrexRpcServerInterface::TrexRpcServerInterface(const TrexRpcServerConfig &cfg, const std::string &name) : m_cfg(cfg), m_name(name) {
+TrexRpcServerInterface::TrexRpcServerInterface(const TrexRpcServerConfig &cfg, const std::string &name, std::mutex *lock) : m_cfg(cfg), m_name(name), m_lock(lock) {
m_is_running = false;
m_is_verbose = false;
}
@@ -112,10 +113,19 @@ get_current_date_time() {
const std::string TrexRpcServer::s_server_uptime = get_current_date_time();
-TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig &req_resp_cfg) {
+TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg,
+ const TrexRpcServerConfig *async_cfg,
+ std::mutex *lock) {
/* add the request response server */
- m_servers.push_back(new TrexRpcServerReqRes(req_resp_cfg));
+ if (req_resp_cfg) {
+ m_servers.push_back(new TrexRpcServerReqRes(*req_resp_cfg, lock));
+ }
+
+ /* add async publisher */
+ if (async_cfg) {
+ m_servers.push_back(new TrexRpcServerAsync(*async_cfg, lock));
+ }
}
TrexRpcServer::~TrexRpcServer() {
diff --git a/src/rpc-server/trex_rpc_server_api.h b/src/rpc-server/trex_rpc_server_api.h
index 06bbe10c..ff876ac4 100644
--- a/src/rpc-server/trex_rpc_server_api.h
+++ b/src/rpc-server/trex_rpc_server_api.h
@@ -24,6 +24,7 @@ limitations under the License.
#include <stdint.h>
#include <vector>
+#include <mutex>
#include <thread>
#include <string>
#include <stdexcept>
@@ -68,7 +69,7 @@ private:
class TrexRpcServerInterface {
public:
- TrexRpcServerInterface(const TrexRpcServerConfig &cfg, const std::string &name);
+ TrexRpcServerInterface(const TrexRpcServerConfig &cfg, const std::string &name, std::mutex *m_lock = NULL);
virtual ~TrexRpcServerInterface();
/**
@@ -127,6 +128,7 @@ protected:
bool m_is_verbose;
std::thread *m_thread;
std::string m_name;
+ std::mutex *m_lock;
};
/**
@@ -139,8 +141,11 @@ protected:
class TrexRpcServer {
public:
- /* currently only request response server config is required */
- TrexRpcServer(const TrexRpcServerConfig &req_resp_cfg);
+ /* creates the collection of servers using configurations */
+ TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg,
+ const TrexRpcServerConfig *async_cfg,
+ std::mutex *m_lock = NULL);
+
~TrexRpcServer();
/**
diff --git a/src/rpc-server/trex_rpc_server_mock.cpp b/src/rpc-server/trex_rpc_server_mock.cpp
deleted file mode 100644
index 835e28b8..00000000
--- a/src/rpc-server/trex_rpc_server_mock.cpp
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- Itay Marom
- Cisco Systems, Inc.
-*/
-
-/*
-Copyright (c) 2015-2015 Cisco Systems, Inc.
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-#include <trex_rpc_server_api.h>
-#include <trex_stateless_api.h>
-
-#include <iostream>
-#include <unistd.h>
-
-using namespace std;
-
-/**
- * on simulation this is not rebuild every version
- * (improved stub)
- *
- */
-extern "C" const char * get_build_date(void){
- return (__DATE__);
-}
-
-extern "C" const char * get_build_time(void){
- return (__TIME__ );
-}
-
-int gtest_main(int argc, char **argv);
-
-int main(int argc, char *argv[]) {
-
- /* configure the stateless object with 4 ports */
- TrexStateless::configure(4);
-
- // gtest ?
- if (argc > 1) {
- if (string(argv[1]) != "--ut") {
- cout << "\n[Usage] " << argv[0] << ": " << " [--ut]\n\n";
- exit(-1);
- }
- return gtest_main(argc, argv);
- }
-
- cout << "\n-= Starting RPC Server Mock =-\n\n";
- cout << "Listening on tcp://localhost:5050 [ZMQ]\n\n";
-
- TrexRpcServerConfig rpc_cfg(TrexRpcServerConfig::RPC_PROT_TCP, 5050);
- TrexRpcServer rpc(rpc_cfg);
-
- /* init the RPC server */
- rpc.start();
-
- cout << "Setting Server To Full Verbose\n\n";
- rpc.set_verbose(true);
-
- cout << "Server Started\n\n";
-
- while (true) {
- sleep(1);
- }
-
- rpc.stop();
-
-
-}