diff options
-rwxr-xr-x | linux/ws_main.py | 2 | ||||
-rw-r--r-- | scripts/automation/trex_control_plane/client/trex_async_client.py | 48 | ||||
-rwxr-xr-x | scripts/automation/trex_control_plane/client/trex_stateless_client.py | 9 | ||||
-rw-r--r-- | scripts/exp/ignore-0-ex.erf | bin | 0 -> 880 bytes | |||
-rw-r--r-- | scripts/exp/ignore-0.erf | bin | 0 -> 880 bytes | |||
-rw-r--r-- | src/gtest/trex_stateless_gtest.cpp | 121 | ||||
-rwxr-xr-x | src/main_dpdk.cpp | 9 | ||||
-rw-r--r-- | src/publisher/trex_publisher.cpp | 2 | ||||
-rw-r--r-- | src/publisher/trex_publisher.h | 2 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless.cpp | 2 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless.h | 9 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.cpp | 6 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.h | 16 |
13 files changed, 201 insertions, 25 deletions
diff --git a/linux/ws_main.py b/linux/ws_main.py index 0bd61f70..617bcf37 100755 --- a/linux/ws_main.py +++ b/linux/ws_main.py @@ -117,7 +117,7 @@ main_src = SrcGroup(dir='src', 'utl_json.cpp', 'utl_cpuu.cpp', 'msg_manager.cpp', - + 'publisher/trex_publisher.cpp', 'pal/linux/pal_utl.cpp', 'pal/linux/mbuf.cpp' diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 31bec93f..adb91d97 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -8,6 +8,8 @@ except ImportError: import client.outer_packages from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage +from common.text_opts import * + import json import threading import time @@ -103,13 +105,9 @@ class TrexAsyncStatsManager(): return self.port_stats[str(port_id)] - def update (self, snapshot): - - if snapshot['name'] == 'trex-global': - self.__handle_snapshot(snapshot['data']) - else: - # for now ignore the rest - return + + def update (self, data): + self.__handle_snapshot(data) def __handle_snapshot (self, snapshot): @@ -151,10 +149,11 @@ class TrexAsyncStatsManager(): class CTRexAsyncClient(): - def __init__ (self, server, port): + def __init__ (self, server, port, stateless_client): self.port = port self.server = server + self.stateless_client = stateless_client self.raw_snapshot = {} @@ -165,14 +164,15 @@ class CTRexAsyncClient(): print "\nConnecting To ZMQ Publisher At {0}".format(self.tr) self.active = True - self.t = threading.Thread(target = self._run) + self.t = threading.Thread(target = self.run) # kill this thread on exit and don't add it to the join list self.t.setDaemon(True) self.t.start() - def _run (self): + + def run (self): # Socket to talk to server self.context = zmq.Context() @@ -185,10 +185,12 @@ class CTRexAsyncClient(): line = self.socket.recv_string(); msg = json.loads(line) - key = msg['name'] - self.raw_snapshot[key] = msg['data'] + name = msg['name'] + data = msg['data'] + type = msg['type'] + self.raw_snapshot[name] = data - self.stats.update(msg) + self.__dispatch(name, type, data) def get_stats (self): @@ -199,6 +201,26 @@ class CTRexAsyncClient(): return self.raw_snapshot + # dispatch the message to the right place + def __dispatch (self, name, type, data): + # stats + if name == "trex-global": + self.stats.update(data) + # events + elif name == "trex-event": + self.__handle_async_event(type, data) + else: + # ignore + pass + + def __handle_async_event (self, type, data): + # DP stopped + if (type == 0): + port_id = int(data['port_id']) + print format_text("\n[Event] - Port {0} Stopped".format(port_id), 'bold') + # call the handler + self.stateless_client.async_event_port_stopped(port_id) + def stop (self): self.active = False self.t.join() diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 699f0af2..dd11fb67 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -320,6 +320,9 @@ class Port(object): return self.ok() + ################# events handler ###################### + def async_event_port_stopped (self): + self.state = self.STATE_STREAMS class CTRexStatelessClient(object): @@ -337,12 +340,16 @@ class CTRexStatelessClient(object): self._server_version = None self.__err_log = None - self._async_client = CTRexAsyncClient(server, async_port) + self._async_client = CTRexAsyncClient(server, async_port, self) self.streams_db = CStreamsDB() self.connected = False + ################# events handler ###################### + def async_event_port_stopped (self, port_id): + self.ports[port_id].async_event_port_stopped() + ############# helper functions section ############## def validate_port_list(self, port_id_list): diff --git a/scripts/exp/ignore-0-ex.erf b/scripts/exp/ignore-0-ex.erf Binary files differnew file mode 100644 index 00000000..92883717 --- /dev/null +++ b/scripts/exp/ignore-0-ex.erf diff --git a/scripts/exp/ignore-0.erf b/scripts/exp/ignore-0.erf Binary files differnew file mode 100644 index 00000000..92883717 --- /dev/null +++ b/scripts/exp/ignore-0.erf diff --git a/src/gtest/trex_stateless_gtest.cpp b/src/gtest/trex_stateless_gtest.cpp index 432c7382..d7bf2ab6 100644 --- a/src/gtest/trex_stateless_gtest.cpp +++ b/src/gtest/trex_stateless_gtest.cpp @@ -46,16 +46,51 @@ public: }; +/** + * handler for DP to CP messages + * + * @author imarom (19-Nov-15) + */ +class DpToCpHandler { +public: + virtual void handle(TrexStatelessDpToCpMsgBase *msg) = 0; +}; class CBasicStl { public: + + CBasicStl(){ m_time_diff=0.001; m_threads=1; m_dump_json=false; + m_dp_to_cp_handler = NULL; + } + + + void flush_dp_to_cp_messages() { + + CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(0); + + while ( true ) { + CGenNode * node = NULL; + if (ring->Dequeue(node) != 0) { + break; + } + assert(node); + + TrexStatelessDpToCpMsgBase * msg = (TrexStatelessDpToCpMsgBase *)node; + if (m_dp_to_cp_handler) { + m_dp_to_cp_handler->handle(msg); + } + + delete msg; + } + } + bool init(void){ CErfIFStl erf_vif; @@ -106,14 +141,18 @@ public: printf(" %s \n",s.c_str()); } + flush_dp_to_cp_messages(); + fl.Delete(); return (res); } public: - int m_threads; - double m_time_diff; - bool m_dump_json; + int m_threads; + double m_time_diff; + bool m_dump_json; + DpToCpHandler *m_dp_to_cp_handler; + TrexStatelessCpToDpMsgBase * m_msg; CNodeRing *m_ring_from_cp; CFlowGenList fl; @@ -292,7 +331,7 @@ TEST_F(basic_stl, simple_prog4) { EXPECT_TRUE(compile.compile(streams, comp_obj) ); - TrexStatelessDpStart * lpstart = new TrexStatelessDpStart( comp_obj.clone(), 20.0 ); + TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, comp_obj.clone(), 20.0 ); t1.m_msg = lpstart; @@ -1003,4 +1042,78 @@ TEST_F(basic_stl, compile_good_stream_id_compres) { +class DpToCpHandlerStopEvent: public DpToCpHandler { +public: + DpToCpHandlerStopEvent(int event_id) { + m_event_id = event_id; + } + + virtual void handle(TrexStatelessDpToCpMsgBase *msg) { + /* first the message must be an event */ + TrexDpPortEventMsg *event = dynamic_cast<TrexDpPortEventMsg *>(msg); + EXPECT_TRUE(event != NULL); + EXPECT_TRUE(event->get_event_type() == TrexDpPortEvent::EVENT_STOP); + + EXPECT_TRUE(event->get_event_id() == m_event_id); + EXPECT_TRUE(event->get_port_id() == 0); + + } + +private: + int m_event_id; +}; + +TEST_F(basic_stl, dp_stop_event) { + + CBasicStl t1; + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(7); + po->preview.setFileWrite(true); + po->out_file ="exp/ignore"; + + TrexStreamsCompiler compile; + + uint8_t port_id=0; + + std::vector<TrexStream *> streams; + + TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST,0,0); + stream1->set_pps(1.0); + stream1->set_single_burst(100); + + stream1->m_enabled = true; + stream1->m_self_start = true; + stream1->m_port_id= port_id; + + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + + // stream - clean + + TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/); + + assert(compile.compile(streams, comp_obj) ); + + TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(17, comp_obj.clone(), 10.0 /*sec */ ); + + + t1.m_msg = lpstart; + + /* let me handle these */ + DpToCpHandlerStopEvent handler(17); + t1.m_dp_to_cp_handler = &handler; + + bool res=t1.init(); + EXPECT_EQ_UINT32(1, res?1:0); + + delete stream1 ; + +} + + /********************************************* Itay Tests End *************************************/ diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index 6c92172c..80739d35 100755 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -3199,19 +3199,21 @@ CGlobalTRex::check_for_dp_message_from_core(int thread_id) { TrexStatelessDpToCpMsgBase * msg = (TrexStatelessDpToCpMsgBase *)node; msg->handle(); + delete msg; } } - +/** + * check for messages that arrived from DP to CP + * + */ void CGlobalTRex::check_for_dp_messages() { /* for all the cores - check for a new message */ for (int i = 0; i < get_cores_tx(); i++) { check_for_dp_message_from_core(i); } - - } bool CGlobalTRex::is_all_links_are_up(bool dump){ @@ -3532,6 +3534,7 @@ bool CGlobalTRex::Create(){ cfg.m_rpc_async_cfg = NULL; cfg.m_rpc_server_verbose = false; cfg.m_platform_api = new TrexDpdkPlatformApi(); + cfg.m_publisher = &m_zmq_publisher; m_trex_stateless = new TrexStateless(cfg); } diff --git a/src/publisher/trex_publisher.cpp b/src/publisher/trex_publisher.cpp index 1afb558a..35653069 100644 --- a/src/publisher/trex_publisher.cpp +++ b/src/publisher/trex_publisher.cpp @@ -86,7 +86,7 @@ TrexPublisher::publish_event(event_type_e type, const Json::Value &data) { Json::Value value; std::string s; - value["name"] = "event"; + value["name"] = "trex-event"; value["type"] = type; value["data"] = data; diff --git a/src/publisher/trex_publisher.h b/src/publisher/trex_publisher.h index 07d06678..82603fda 100644 --- a/src/publisher/trex_publisher.h +++ b/src/publisher/trex_publisher.h @@ -39,7 +39,7 @@ public: void publish_json(const std::string &s); enum event_type_e { - EVENT_PORT_STOPPED = 1 + EVENT_PORT_STOPPED = 0 }; void publish_event(event_type_e type, const Json::Value &data); diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index 6ef24a7b..a4522837 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -51,6 +51,8 @@ TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) { } m_platform_api = cfg.m_platform_api; + m_publisher = cfg.m_publisher; + } /** diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h index 57c6ef1d..5c11be1e 100644 --- a/src/stateless/cp/trex_stateless.h +++ b/src/stateless/cp/trex_stateless.h @@ -30,6 +30,7 @@ limitations under the License. #include <trex_stream.h> #include <trex_stateless_port.h> #include <trex_rpc_server_api.h> +#include <publisher/trex_publisher.h> #include <internal_api/trex_platform_api.h> @@ -93,6 +94,7 @@ public: m_rpc_async_cfg = NULL; m_rpc_server_verbose = false; m_platform_api = NULL; + m_publisher = NULL; } const TrexRpcServerConfig *m_rpc_req_resp_cfg; @@ -100,6 +102,7 @@ public: const TrexPlatformApi *m_platform_api; bool m_rpc_server_verbose; uint8_t m_port_count; + TrexPublisher *m_publisher; }; /** @@ -150,6 +153,10 @@ public: return (m_platform_api); } + TrexPublisher * get_publisher() { + return m_publisher; + } + const std::vector <TrexStatelessPort *> get_port_list() { return m_ports; } @@ -170,6 +177,8 @@ protected: /* platform API */ const TrexPlatformApi *m_platform_api; + TrexPublisher *m_publisher; + std::mutex m_global_cp_lock; }; diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index 13d0fc9f..489e2172 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -315,17 +315,21 @@ TrexStatelessPort::send_message_to_dp(TrexStatelessCpToDpMsgBase *msg) { */ void TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) { + Json::Value data; + switch (event_type) { case TrexDpPortEvent::EVENT_STOP: /* set a stop event */ change_state(PORT_STATE_STREAMS); /* send a ZMQ event */ + + data["port_id"] = m_port_id; + get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STOPPED, data); break; default: assert(0); } - printf("hey"); } diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index 445e9378..ffb36124 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -192,6 +192,22 @@ public: virtual bool handle(); + int get_thread_id() { + return m_thread_id; + } + + uint8_t get_port_id() { + return m_port_id; + } + + TrexDpPortEvent::event_e get_event_type() { + return m_event_type; + } + + int get_event_id() { + return m_event_id; + } + private: int m_thread_id; uint8_t m_port_id; |