From 3adfe9c3c8a6e2ce1cdc5bd1a673e428c18fa64b Mon Sep 17 00:00:00 2001 From: imarom Date: Sun, 6 Sep 2015 17:55:00 +0300 Subject: added more commands to RPC server --- src/gtest/rpc_test.cpp | 80 +++++++++++++-- src/rpc-server/commands/trex_rpc_cmd_stream.cpp | 128 ++++++++++++++++++++++-- src/rpc-server/commands/trex_rpc_cmds.h | 9 +- src/rpc-server/trex_rpc_cmds_table.cpp | 3 + src/rpc-server/trex_rpc_server_mock.cpp | 6 +- src/stateless/trex_stateless.cpp | 46 ++++++++- src/stateless/trex_stateless_api.h | 22 +++- src/stateless/trex_stream.cpp | 4 + src/stateless/trex_stream_api.h | 14 ++- 9 files changed, 288 insertions(+), 24 deletions(-) diff --git a/src/gtest/rpc_test.cpp b/src/gtest/rpc_test.cpp index a3df2a67..5d3c4738 100644 --- a/src/gtest/rpc_test.cpp +++ b/src/gtest/rpc_test.cpp @@ -230,13 +230,81 @@ TEST_F(RpcTest, add_stream) { Json::Value response; Json::Reader reader; - string req_str; string resp_str; - req_str = "{'stream':{'port_id':7,'stream_id':12,'enable':True,'start':True,'Is':10.0,'packet':[0,1,2,3,4]," - "'vm_data':[{'Name':'ip_cnt','Size':4,'big_edian':True,'type':'inc','core_mask':'split','init_val':'10.0.0.7','min':'10.0.0.1','max':'10.0.0.10',}]," - "'vm_program':[{'op_core':['read_to_reg_mem','write_reg_offet','write_rand_offset'],'read_name':'nameofopecodetoread','pkt_offset':20}]," - "'mode':{'type':'continues','pps':1000},'next_stream':17,'next_stream_loop':100,'rx_stats':{'enable':True,'rx_stream_id':71,'seq_enable':True,'latency':True}}}"; + // check the stream does not exists + string lookup_str = "{\"jsonrpc\":\"2.0\", \"id\":1, \"method\":\"get_stream\", \"params\":{\"port_id\":1, \"stream_id\":5}}"; + resp_str = send_msg(lookup_str); + + EXPECT_TRUE(reader.parse(resp_str, response, false)); + EXPECT_EQ(response["jsonrpc"], "2.0"); + EXPECT_EQ(response["id"], 1); + + EXPECT_EQ(response["error"]["code"], -32000); + + // add it + + string add_str = "{\"jsonrpc\":\"2.0\", \"id\":1, \"method\":\"add_stream\", \"params\":" + "{\"port_id\":1, \"stream_id\":5, \"stream\":{" + "\"mode\": {\"type\":\"continuous\", \"pps\":3}," + "\"isg\":4.3, \"enabled\":true, \"self_start\":true," + "\"next_stream_id\":-1," + "\"packet\":{\"binary\":[4,1,255], \"meta\":\"dummy\"}," + "\"rx_stats\":{\"enabled\":false}}}}"; + + resp_str = send_msg(add_str); + + EXPECT_TRUE(reader.parse(resp_str, response, false)); + EXPECT_EQ(response["jsonrpc"], "2.0"); + EXPECT_EQ(response["id"], 1); + + EXPECT_EQ(response["result"], "ACK"); + + resp_str = send_msg(lookup_str); + + EXPECT_TRUE(reader.parse(resp_str, response, false)); + EXPECT_EQ(response["jsonrpc"], "2.0"); + EXPECT_EQ(response["id"], 1); + + const Json::Value &stream = response["result"]["stream"]; + + EXPECT_EQ(stream["enabled"], true); + EXPECT_EQ(stream["self_start"], true); + + EXPECT_EQ(stream["packet"]["binary"][0], 4); + EXPECT_EQ(stream["packet"]["binary"][1], 1); + EXPECT_EQ(stream["packet"]["binary"][2], 255); + + EXPECT_EQ(stream["packet"]["meta"], "dummy"); + EXPECT_EQ(stream["next_stream_id"], -1); + + double delta = stream["isg"].asDouble() - 4.3; + EXPECT_TRUE(delta < 0.0001); + + EXPECT_EQ(stream["mode"]["type"], "continuous"); + EXPECT_EQ(stream["mode"]["pps"], 3); + + // remove it + + string remove_str = "{\"jsonrpc\":\"2.0\", \"id\":1, \"method\":\"remove_stream\", \"params\":{\"port_id\":1, \"stream_id\":5}}"; + resp_str = send_msg(remove_str); + + EXPECT_TRUE(reader.parse(resp_str, response, false)); + EXPECT_EQ(response["jsonrpc"], "2.0"); + EXPECT_EQ(response["id"], 1); + + EXPECT_EQ(response["result"], "ACK"); + + resp_str = send_msg(remove_str); + + // should not be present anymore + + EXPECT_TRUE(reader.parse(resp_str, response, false)); + EXPECT_EQ(response["jsonrpc"], "2.0"); + EXPECT_EQ(response["id"], 1); + + EXPECT_EQ(response["error"]["code"], -32000); - resp_str = send_msg(req_str); } + + diff --git a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp index 6dfebe6d..16889413 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp @@ -34,6 +34,9 @@ using namespace std; trex_rpc_cmd_rc_e TrexRpcCmdAddStream::_run(const Json::Value ¶ms, Json::Value &result) { + uint8_t port_id = parse_int(params, "port_id", result); + uint32_t stream_id = parse_int(params, "stream_id", result); + const Json::Value §ion = parse_object(params, "stream", result); /* get the type of the stream */ @@ -41,7 +44,7 @@ TrexRpcCmdAddStream::_run(const Json::Value ¶ms, Json::Value &result) { string type = parse_string(mode, "type", result); /* allocate a new stream based on the type */ - TrexStream *stream = allocate_new_stream(section, result); + TrexStream *stream = allocate_new_stream(section, port_id, stream_id, result); /* some fields */ stream->m_enabled = parse_bool(section, "enabled", result); @@ -97,10 +100,7 @@ TrexRpcCmdAddStream::_run(const Json::Value ¶ms, Json::Value &result) { TrexStream * -TrexRpcCmdAddStream::allocate_new_stream(const Json::Value §ion, Json::Value &result) { - - uint8_t port_id = parse_int(section, "port_id", result); - uint32_t stream_id = parse_int(section, "stream_id", result); +TrexRpcCmdAddStream::allocate_new_stream(const Json::Value §ion, uint8_t port_id, uint32_t stream_id, Json::Value &result) { TrexStream *stream; @@ -200,6 +200,7 @@ TrexRpcCmdRemoveStream::_run(const Json::Value ¶ms, Json::Value &result) { } port->get_stream_table()->remove_stream(stream); + delete stream; result["result"] = "ACK"; @@ -231,7 +232,7 @@ TrexRpcCmdRemoveAllStreams::_run(const Json::Value ¶ms, Json::Value &result) /*************************** * get all streams configured - * on specific port + * on a specific port * **************************/ trex_rpc_cmd_rc_e @@ -261,3 +262,118 @@ TrexRpcCmdGetStreamList::_run(const Json::Value ¶ms, Json::Value &result) { return (TREX_RPC_CMD_OK); } +/*************************** + * get stream by id + * on a specific port + * + **************************/ +trex_rpc_cmd_rc_e +TrexRpcCmdGetStream::_run(const Json::Value ¶ms, Json::Value &result) { + uint8_t port_id = parse_byte(params, "port_id", result); + + uint32_t stream_id = parse_int(params, "stream_id", result); + + if (port_id >= TrexStateless::get_instance().get_port_count()) { + std::stringstream ss; + ss << "invalid port id - should be between 0 and " << (int)TrexStateless::get_instance().get_port_count() - 1; + generate_execute_err(result, ss.str()); + } + + TrexStatelessPort *port = TrexStateless::get_instance().get_port_by_id(port_id); + + TrexStream *stream = port->get_stream_table()->get_stream_by_id(stream_id); + + if (!stream) { + std::stringstream ss; + ss << "stream id " << stream_id << " on port " << (int)port_id << " does not exists"; + generate_execute_err(result, ss.str()); + } + + Json::Value stream_json; + + stream_json["enabled"] = stream->m_enabled; + stream_json["self_start"] = stream->m_self_start; + + stream_json["isg"] = stream->m_isg_usec; + stream_json["next_stream_id"] = stream->m_next_stream_id; + + stream_json["packet"]["binary"] = Json::arrayValue; + for (int i = 0; i < stream->m_pkt.len; i++) { + stream_json["packet"]["binary"].append(stream->m_pkt.binary[i]); + } + + stream_json["packet"]["meta"] = stream->m_pkt.meta; + + if (TrexStreamContinuous *cont = dynamic_cast(stream)) { + stream_json["mode"]["type"] = "continuous"; + stream_json["mode"]["pps"] = cont->get_pps(); + + } + + result["result"]["stream"] = stream_json; + + return (TREX_RPC_CMD_OK); +} + +/*************************** + * start traffic on port + * + **************************/ +trex_rpc_cmd_rc_e +TrexRpcCmdStartTraffic::_run(const Json::Value ¶ms, Json::Value &result) { + uint8_t port_id = parse_byte(params, "port_id", result); + + if (port_id >= TrexStateless::get_instance().get_port_count()) { + std::stringstream ss; + ss << "invalid port id - should be between 0 and " << (int)TrexStateless::get_instance().get_port_count() - 1; + generate_execute_err(result, ss.str()); + } + + TrexStatelessPort *port = TrexStateless::get_instance().get_port_by_id(port_id); + + TrexStatelessPort::traffic_rc_e rc = port->start_traffic(); + + if (rc == TrexStatelessPort::TRAFFIC_OK) { + result["result"] = "ACK"; + } else { + std::stringstream ss; + switch (rc) { + case TrexStatelessPort::TRAFFIC_ERR_ALREADY_STARTED: + ss << "traffic has already started on that port"; + break; + case TrexStatelessPort::TRAFFIC_ERR_NO_STREAMS: + ss << "no active streams on that port"; + break; + default: + ss << "failed to start traffic"; + break; + } + + generate_execute_err(result, ss.str()); + } + + return (TREX_RPC_CMD_OK); +} + +/*************************** + * start traffic on port + * + **************************/ +trex_rpc_cmd_rc_e +TrexRpcCmdStopTraffic::_run(const Json::Value ¶ms, Json::Value &result) { + uint8_t port_id = parse_byte(params, "port_id", result); + + if (port_id >= TrexStateless::get_instance().get_port_count()) { + std::stringstream ss; + ss << "invalid port id - should be between 0 and " << (int)TrexStateless::get_instance().get_port_count() - 1; + generate_execute_err(result, ss.str()); + } + + TrexStatelessPort *port = TrexStateless::get_instance().get_port_by_id(port_id); + + port->stop_traffic(); + result["result"] = "ACK"; + + return (TREX_RPC_CMD_OK); +} + diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h index e9666f21..428d48c1 100644 --- a/src/rpc-server/commands/trex_rpc_cmds.h +++ b/src/rpc-server/commands/trex_rpc_cmds.h @@ -65,10 +65,10 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStatus, "get_status", 0); TREX_RPC_CMD_DEFINE(TrexRpcCmdRemoveAllStreams, "remove_all_streams", 1); TREX_RPC_CMD_DEFINE(TrexRpcCmdRemoveStream, "remove_stream", 2); -TREX_RPC_CMD_DEFINE_EXTENED(TrexRpcCmdAddStream, "add_stream", 1, +TREX_RPC_CMD_DEFINE_EXTENED(TrexRpcCmdAddStream, "add_stream", 3, /* extended part */ -TrexStream * allocate_new_stream(const Json::Value §ion, Json::Value &result); +TrexStream * allocate_new_stream(const Json::Value §ion, uint8_t port_id, uint32_t stream_id, Json::Value &result); void validate_stream(const TrexStream *stream, Json::Value &result); ); @@ -76,4 +76,9 @@ void validate_stream(const TrexStream *stream, Json::Value &result); TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStreamList, "get_stream_list", 1); +TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStream, "get_stream", 2); + +TREX_RPC_CMD_DEFINE(TrexRpcCmdStartTraffic, "start_traffic", 1); +TREX_RPC_CMD_DEFINE(TrexRpcCmdStopTraffic, "stop_traffic", 1); + #endif /* __TREX_RPC_CMD_H__ */ diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp index fc6d7b87..71668994 100644 --- a/src/rpc-server/trex_rpc_cmds_table.cpp +++ b/src/rpc-server/trex_rpc_cmds_table.cpp @@ -39,6 +39,9 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() { register_command(new TrexRpcCmdRemoveStream()); register_command(new TrexRpcCmdRemoveAllStreams()); register_command(new TrexRpcCmdGetStreamList()); + register_command(new TrexRpcCmdGetStream()); + register_command(new TrexRpcCmdStartTraffic()); + register_command(new TrexRpcCmdStopTraffic()); } TrexRpcCommandsTable::~TrexRpcCommandsTable() { diff --git a/src/rpc-server/trex_rpc_server_mock.cpp b/src/rpc-server/trex_rpc_server_mock.cpp index a248558e..835e28b8 100644 --- a/src/rpc-server/trex_rpc_server_mock.cpp +++ b/src/rpc-server/trex_rpc_server_mock.cpp @@ -44,6 +44,9 @@ int gtest_main(int argc, char **argv); int main(int argc, char *argv[]) { + /* configure the stateless object with 4 ports */ + TrexStateless::configure(4); + // gtest ? if (argc > 1) { if (string(argv[1]) != "--ut") { @@ -56,9 +59,6 @@ int main(int argc, char *argv[]) { cout << "\n-= Starting RPC Server Mock =-\n\n"; cout << "Listening on tcp://localhost:5050 [ZMQ]\n\n"; - /* configure the stateless object with 4 ports */ - TrexStateless::configure(4); - TrexRpcServerConfig rpc_cfg(TrexRpcServerConfig::RPC_PROT_TCP, 5050); TrexRpcServer rpc(rpc_cfg); diff --git a/src/stateless/trex_stateless.cpp b/src/stateless/trex_stateless.cpp index ff469d7e..2ab0c5d9 100644 --- a/src/stateless/trex_stateless.cpp +++ b/src/stateless/trex_stateless.cpp @@ -55,7 +55,7 @@ TrexStateless::~TrexStateless() { delete m_ports[i]; } - delete m_ports; + delete [] m_ports; } TrexStatelessPort * TrexStateless::get_port_by_id(uint8_t port_id) { @@ -71,3 +71,47 @@ uint8_t TrexStateless::get_port_count() { return m_port_count; } +/*************************** + * trex stateless port + * + **************************/ +TrexStatelessPort::TrexStatelessPort(uint8_t port_id) : m_port_id(port_id) { + m_started = false; +} + + +/** + * starts the traffic on the port + * + */ +TrexStatelessPort::traffic_rc_e +TrexStatelessPort::start_traffic(void) { + if (m_started) { + return (TRAFFIC_ERR_ALREADY_STARTED); + } + + if (get_stream_table()->size() == 0) { + return (TRAFFIC_ERR_NO_STREAMS); + } + + m_started = true; + + return (TRAFFIC_OK); +} + +void +TrexStatelessPort::stop_traffic(void) { + if (m_started) { + m_started = false; + } +} + +/** +* access the stream table +* +*/ +TrexStreamTable * TrexStatelessPort::get_stream_table() { + return &m_stream_table; +} + + diff --git a/src/stateless/trex_stateless_api.h b/src/stateless/trex_stateless_api.h index edd7b051..358ab339 100644 --- a/src/stateless/trex_stateless_api.h +++ b/src/stateless/trex_stateless_api.h @@ -49,20 +49,32 @@ public: class TrexStatelessPort { public: - TrexStatelessPort(uint8_t port_id) : m_port_id(port_id) { - } + /** + * describess error codes for starting traffic + */ + enum traffic_rc_e { + TRAFFIC_OK, + TRAFFIC_ERR_ALREADY_STARTED, + TRAFFIC_ERR_NO_STREAMS, + TRAFFIC_ERR_FAILED_TO_COMPILE_STREAMS + }; + + TrexStatelessPort(uint8_t port_id); + + traffic_rc_e start_traffic(void); + + void stop_traffic(void); /** * access the stream table * */ - TrexStreamTable *get_stream_table() { - return &m_stream_table; - } + TrexStreamTable *get_stream_table(); private: TrexStreamTable m_stream_table; uint8_t m_port_id; + bool m_started; }; /** diff --git a/src/stateless/trex_stream.cpp b/src/stateless/trex_stream.cpp index b3919770..2b5b2424 100644 --- a/src/stateless/trex_stream.cpp +++ b/src/stateless/trex_stream.cpp @@ -99,3 +99,7 @@ void TrexStreamTable::get_stream_list(std::vector &stream_list) { stream_list.push_back(stream.first); } } + +int TrexStreamTable::size() { + return m_stream_table.size(); +} diff --git a/src/stateless/trex_stream_api.h b/src/stateless/trex_stream_api.h index 97e0b7f7..c9248999 100644 --- a/src/stateless/trex_stream_api.h +++ b/src/stateless/trex_stream_api.h @@ -35,6 +35,7 @@ class TrexRpcCmdAddStream; class TrexStream { /* provide the RPC parser a way to access private fields */ friend class TrexRpcCmdAddStream; + friend class TrexRpcCmdGetStream; friend class TrexStreamTable; public: @@ -53,7 +54,7 @@ private: /* config fields */ double m_isg_usec; - uint32_t m_next_stream_id; + int m_next_stream_id; /* indicators */ bool m_enabled; @@ -87,6 +88,11 @@ class TrexStreamContinuous : public TrexStream { public: TrexStreamContinuous(uint8_t port_id, uint32_t stream_id, uint32_t pps) : TrexStream(port_id, stream_id), m_pps(pps) { } + + uint32_t get_pps() { + return m_pps; + } + protected: uint32_t m_pps; }; @@ -171,6 +177,12 @@ public: */ void get_stream_list(std::vector &stream_list); + /** + * get the table size + * + */ + int size(); + private: /** * holds all the stream in a hash table by stream id -- cgit 1.2.3-korg