diff options
-rwxr-xr-x | linux/ws_main.py | 1 | ||||
-rwxr-xr-x | linux_dpdk/ws_main.py | 2 | ||||
-rwxr-xr-x | scripts/automation/trex_control_plane/client/trex_stateless_client.py | 2 | ||||
-rw-r--r-- | src/gtest/trex_stateless_gtest.cpp | 16 | ||||
-rwxr-xr-x | src/main_dpdk.cpp | 148 | ||||
-rw-r--r-- | src/publisher/trex_publisher.cpp | 107 | ||||
-rw-r--r-- | src/publisher/trex_publisher.h | 54 | ||||
-rw-r--r-- | src/stateless/cp/trex_dp_port_events.cpp | 215 | ||||
-rw-r--r-- | src/stateless/cp/trex_dp_port_events.h | 165 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless.cpp | 2 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.cpp | 56 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.h | 38 | ||||
-rw-r--r-- | src/stateless/dp/trex_stateless_dp_core.cpp | 12 | ||||
-rw-r--r-- | src/stateless/dp/trex_stateless_dp_core.h | 9 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.cpp | 24 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.h | 80 |
16 files changed, 810 insertions, 121 deletions
diff --git a/linux/ws_main.py b/linux/ws_main.py index 65ca4522..0bd61f70 100755 --- a/linux/ws_main.py +++ b/linux/ws_main.py @@ -152,6 +152,7 @@ stateless_src = SrcGroup(dir='src/stateless/', 'cp/trex_stateless.cpp', 'cp/trex_stateless_port.cpp', 'cp/trex_streams_compiler.cpp', + 'cp/trex_dp_port_events.cpp', 'dp/trex_stateless_dp_core.cpp', 'messaging/trex_stateless_messaging.cpp', ]) diff --git a/linux_dpdk/ws_main.py b/linux_dpdk/ws_main.py index 6591a241..f7acd49d 100755 --- a/linux_dpdk/ws_main.py +++ b/linux_dpdk/ws_main.py @@ -107,6 +107,7 @@ main_src = SrcGroup(dir='src', 'utl_yaml.cpp', 'nat_check.cpp', 'msg_manager.cpp', + 'publisher/trex_publisher.cpp', 'pal/linux_dpdk/pal_utl.cpp', 'pal/linux_dpdk/mbuf.cpp' ]); @@ -159,6 +160,7 @@ stateless_src = SrcGroup(dir='src/stateless/', 'cp/trex_stateless.cpp', 'cp/trex_stateless_port.cpp', 'cp/trex_streams_compiler.cpp', + 'cp/trex_dp_port_events.cpp', 'dp/trex_stateless_dp_core.cpp', 'messaging/trex_stateless_messaging.cpp' ]) diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 3dcfae28..699f0af2 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -790,7 +790,7 @@ class CTRexStatelessClient(object): if opts is None: return RC_ERR("bad command line paramters") - delay_sec = opts.d if opts.d else 1 + delay_sec = opts.duration if (opts.duration > 0) else 1 print format_text("Waiting for {0} seconds...\n".format(delay_sec), 'bold') time.sleep(delay_sec) diff --git a/src/gtest/trex_stateless_gtest.cpp b/src/gtest/trex_stateless_gtest.cpp index ac0a5e63..432c7382 100644 --- a/src/gtest/trex_stateless_gtest.cpp +++ b/src/gtest/trex_stateless_gtest.cpp @@ -362,7 +362,7 @@ TEST_F(basic_stl, simple_prog3) { EXPECT_TRUE(compile.compile(streams, comp_obj) ); - TrexStatelessDpStart * lpstart = new TrexStatelessDpStart( comp_obj.clone(), 50.0 ); + TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, comp_obj.clone(), 50.0 ); t1.m_msg = lpstart; @@ -424,7 +424,7 @@ TEST_F(basic_stl, simple_prog2) { EXPECT_TRUE(compile.compile(streams, comp_obj) ); - TrexStatelessDpStart * lpstart = new TrexStatelessDpStart( comp_obj.clone(), 10.0 ); + TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, comp_obj.clone(), 10.0 ); t1.m_msg = lpstart; @@ -486,7 +486,7 @@ TEST_F(basic_stl, simple_prog1) { EXPECT_TRUE(compile.compile(streams, comp_obj) ); - TrexStatelessDpStart * lpstart = new TrexStatelessDpStart( comp_obj.clone(), 10.0 ); + TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, comp_obj.clone(), 10.0 ); t1.m_msg = lpstart; @@ -531,7 +531,7 @@ TEST_F(basic_stl, single_pkt_burst1) { assert(compile.compile(streams, comp_obj) ); - TrexStatelessDpStart * lpstart = new TrexStatelessDpStart( comp_obj.clone(), 10.0 ); + TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, comp_obj.clone(), 10.0 ); t1.m_msg = lpstart; @@ -582,7 +582,7 @@ TEST_F(basic_stl, single_pkt) { assert(compile.compile(streams, comp_obj) ); - TrexStatelessDpStart * lpstart = new TrexStatelessDpStart( comp_obj.clone(), 10.0 /*sec */ ); + TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, comp_obj.clone(), 10.0 /*sec */ ); t1.m_msg = lpstart; @@ -639,7 +639,7 @@ TEST_F(basic_stl, multi_pkt1) { assert(compile.compile(streams, comp_obj) ); - TrexStatelessDpStart * lpstart = new TrexStatelessDpStart( comp_obj.clone(), 10 ); + TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, comp_obj.clone(), 10 ); t1.m_msg = lpstart; @@ -702,7 +702,7 @@ TEST_F(basic_stl, multi_pkt2) { assert(compile.compile(streams, comp_obj) ); - TrexStatelessDpStart * lpstart = new TrexStatelessDpStart( comp_obj.clone(), 10 ); + TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, comp_obj.clone(), 10 ); t1.m_msg = lpstart; @@ -748,7 +748,7 @@ TEST_F(basic_stl, multi_burst1) { assert(compile.compile(streams, comp_obj) ); - TrexStatelessDpStart * lpstart = new TrexStatelessDpStart( comp_obj.clone(), 40 ); + TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, comp_obj.clone(), 40 ); t1.m_msg = lpstart; diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index 865c84ed..6c92172c 100755 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -58,6 +58,7 @@ limitations under the License. #include <stateless/cp/trex_stateless.h> #include <stateless/dp/trex_stream_node.h> +#include <publisher/trex_publisher.h> #include <stateless/messaging/trex_stateless_messaging.h> #include <../linux_dpdk/version.h> @@ -2398,71 +2399,6 @@ private: }; -class CZMqPublisher { -public: - CZMqPublisher(){ - m_context=0; - m_publisher=0; - } - - bool Create(uint16_t port,bool disable); - void Delete(); - void publish_json(std::string & s); -private: - void show_zmq_last_error(char *s); -private: - void * m_context; - void * m_publisher; -}; - -void CZMqPublisher::show_zmq_last_error(char *s){ - printf(" ERROR %s \n",s); - printf(" ZMQ: %s",zmq_strerror (zmq_errno ())); - exit(-1); -} - - -bool CZMqPublisher::Create(uint16_t port,bool disable){ - - if (disable) { - return(true); - } - m_context = zmq_ctx_new (); - if ( m_context == 0 ) { - show_zmq_last_error((char *)"can't connect to ZMQ library"); - } - m_publisher = zmq_socket (m_context, ZMQ_PUB); - if ( m_context == 0 ) { - show_zmq_last_error((char *)"can't create ZMQ socket"); - } - char buffer[100]; - sprintf(buffer,"tcp://*:%d",port); - int rc=zmq_bind (m_publisher, buffer); - if (rc != 0 ) { - sprintf(buffer,"can't bind to ZMQ socket %d",port); - show_zmq_last_error(buffer); - } - printf("zmq publisher at: %s \n",buffer); - return (true); -} - - -void CZMqPublisher::Delete(){ - if (m_publisher) { - zmq_close (m_publisher); - } - if (m_context) { - zmq_ctx_destroy (m_context); - } -} - - -void CZMqPublisher::publish_json(std::string & s){ - if ( m_publisher ){ - int size = zmq_send (m_publisher, s.c_str(), s.length(), 0); - assert(size==s.length()); - } -} class CPerPortStats { public: @@ -2825,6 +2761,10 @@ private: void try_stop_all_dp(); /* send message to all dp cores */ int send_message_all_dp(TrexStatelessCpToDpMsgBase *msg); + + void check_for_dp_message_from_core(int thread_id); + void check_for_dp_messages(); + public: int start_send_master(); @@ -2965,7 +2905,7 @@ private: CLatencyVmPort m_latency_vm_vports[BP_MAX_PORTS]; /* vm driver */ CLatencyPktInfo m_latency_pkt; - CZMqPublisher m_zmq_publisher; + TrexPublisher m_zmq_publisher; public: TrexStateless *m_trex_stateless; @@ -3233,6 +3173,46 @@ int CGlobalTRex::reset_counters(){ return (0); } +/** + * check for a single core + * + * @author imarom (19-Nov-15) + * + * @param thread_id + */ +void +CGlobalTRex::check_for_dp_message_from_core(int thread_id) { + + CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(thread_id); + + /* fast path check */ + if ( likely ( ring->isEmpty() ) ) { + return; + } + + while ( true ) { + CGenNode * node = NULL; + if (ring->Dequeue(node) != 0) { + break; + } + assert(node); + + TrexStatelessDpToCpMsgBase * msg = (TrexStatelessDpToCpMsgBase *)node; + msg->handle(); + } + +} + + +void +CGlobalTRex::check_for_dp_messages() { + /* for all the cores - check for a new message */ + for (int i = 0; i < get_cores_tx(); i++) { + check_for_dp_message_from_core(i); + } + + +} bool CGlobalTRex::is_all_links_are_up(bool dump){ bool all_link_are=true; @@ -3495,21 +3475,7 @@ int CGlobalTRex::ixgbe_start(void){ bool CGlobalTRex::Create(){ CFlowsYamlInfo pre_yaml_info; - if (get_is_stateless()) { - - TrexStatelessCfg cfg; - - TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, global_platform_cfg_info.m_zmq_rpc_port); - - cfg.m_port_count = CGlobalInfo::m_options.m_expected_portd; - cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg; - cfg.m_rpc_async_cfg = NULL; - cfg.m_rpc_server_verbose = false; - cfg.m_platform_api = new TrexDpdkPlatformApi(); - - m_trex_stateless = new TrexStateless(cfg); - - } else { + if (!get_is_stateless()) { pre_yaml_info.load_from_yaml_file(CGlobalInfo::m_options.cfg_file); } @@ -3553,6 +3519,23 @@ bool CGlobalTRex::Create(){ CGlobalInfo::init_pools(rx_mbuf); ixgbe_start(); dump_config(stdout); + + /* start stateless */ + if (get_is_stateless()) { + + TrexStatelessCfg cfg; + + TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, global_platform_cfg_info.m_zmq_rpc_port); + + cfg.m_port_count = CGlobalInfo::m_options.m_expected_portd; + cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg; + cfg.m_rpc_async_cfg = NULL; + cfg.m_rpc_server_verbose = false; + cfg.m_platform_api = new TrexDpdkPlatformApi(); + + m_trex_stateless = new TrexStateless(cfg); + } + return (true); } @@ -4119,6 +4102,9 @@ int CGlobalTRex::run_in_master(){ m_trex_stateless->generate_publish_snapshot(json); m_zmq_publisher.publish_json(json); + /* check from messages from DP */ + check_for_dp_messages(); + delay(500); if ( is_all_cores_finished() ) { diff --git a/src/publisher/trex_publisher.cpp b/src/publisher/trex_publisher.cpp new file mode 100644 index 00000000..1afb558a --- /dev/null +++ b/src/publisher/trex_publisher.cpp @@ -0,0 +1,107 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include "trex_publisher.h" +#include <zmq.h> +#include <assert.h> +#include <sstream> +#include <iostream> + +/** + * create the publisher + * + */ +bool +TrexPublisher::Create(uint16_t port, bool disable){ + + if (disable) { + return (true); + } + + m_context = zmq_ctx_new(); + if ( m_context == 0 ) { + show_zmq_last_error("can't connect to ZMQ library"); + } + + m_publisher = zmq_socket (m_context, ZMQ_PUB); + if ( m_context == 0 ) { + show_zmq_last_error("can't create ZMQ socket"); + } + + std::stringstream ss; + ss << "tcp://*:" << port; + + int rc = zmq_bind (m_publisher, ss.str().c_str()); + if (rc != 0 ) { + show_zmq_last_error("can't bind to ZMQ socket at " + ss.str()); + } + + std::cout << "zmq publisher at: " << ss.str() << "\n"; + return (true); +} + + +void +TrexPublisher::Delete(){ + if (m_publisher) { + zmq_close (m_publisher); + m_publisher = NULL; + } + if (m_context) { + zmq_ctx_destroy (m_context); + m_context = NULL; + } +} + + +void +TrexPublisher::publish_json(const std::string &s){ + if (m_publisher) { + int size = zmq_send (m_publisher, s.c_str(), s.length(), 0); + assert(size == s.length()); + } +} + +void +TrexPublisher::publish_event(event_type_e type, const Json::Value &data) { + Json::FastWriter writer; + Json::Value value; + std::string s; + + value["name"] = "event"; + value["type"] = type; + value["data"] = data; + + s = writer.write(value); + publish_json(s); +} + +/** + * error handling + * + */ +void +TrexPublisher::show_zmq_last_error(const std::string &err){ + std::cout << " ERROR " << err << "\n"; + std::cout << " ZMQ: " << zmq_strerror (zmq_errno ()); + exit(-1); +} + diff --git a/src/publisher/trex_publisher.h b/src/publisher/trex_publisher.h new file mode 100644 index 00000000..07d06678 --- /dev/null +++ b/src/publisher/trex_publisher.h @@ -0,0 +1,54 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_PUBLISHER_H__ +#define __TREX_PUBLISHER_H__ + +#include <stdint.h> +#include <string> +#include <json/json.h> + +class TrexPublisher { + +public: + + TrexPublisher() { + m_context = NULL; + m_publisher = NULL; + } + + bool Create(uint16_t port, bool disable); + void Delete(); + void publish_json(const std::string &s); + + enum event_type_e { + EVENT_PORT_STOPPED = 1 + }; + + void publish_event(event_type_e type, const Json::Value &data); + +private: + void show_zmq_last_error(const std::string &err); +private: + void * m_context; + void * m_publisher; +}; + +#endif /* __TREX_PUBLISHER_H__ */ diff --git a/src/stateless/cp/trex_dp_port_events.cpp b/src/stateless/cp/trex_dp_port_events.cpp new file mode 100644 index 00000000..533ab605 --- /dev/null +++ b/src/stateless/cp/trex_dp_port_events.cpp @@ -0,0 +1,215 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include <trex_dp_port_events.h> +#include <sstream> +#include <os_time.h> +#include <trex_stateless.h> + +/** + * port events + */ +void +TrexDpPortEvents::create(TrexStatelessPort *port) { + m_port = port; + + for (int i = 0; i < TrexDpPortEvent::EVENT_MAX; i++) { + m_events[i].create((TrexDpPortEvent::event_e) i, port); + } + + m_event_id_counter = EVENT_ID_INVALID; +} + +/** + * generate a new event ID + * + */ +int +TrexDpPortEvents::generate_event_id() { + return (++m_event_id_counter); +} + +/** + * mark the next allowed event + * all other events will be disabled + * + */ +void +TrexDpPortEvents::wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms) { + + /* first disable all events */ + for (TrexDpPortEvent & e : m_events) { + e.disable(); + } + + /* mark this event as allowed */ + m_events[ev].wait_for_event(event_id, timeout_ms); +} + +/** + * handle an event + * + */ +void +TrexDpPortEvents::handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id) { + m_events[ev].handle_event(thread_id, event_id); +} + +/*********** + * single event object + * + */ + +void +TrexDpPortEvent::create(event_e type, TrexStatelessPort *port) { + m_event_type = type; + m_port = port; + + /* add the core ids to the hash */ + m_signal.clear(); + for (int core_id : m_port->get_core_id_list()) { + m_signal[core_id] = false; + } + + /* event is disabled */ + disable(); +} + + +/** + * wait the event using event id and timeout + * + */ +void +TrexDpPortEvent::wait_for_event(int event_id, int timeout_ms) { + + /* set a new event id */ + m_event_id = event_id; + + /* do we have a timeout ? */ + if (timeout_ms > 0) { + m_expire_limit_ms = os_get_time_msec() + timeout_ms; + } else { + m_expire_limit_ms = -1; + } + + /* prepare the signal array */ + m_pending_cnt = 0; + for (auto & core_pair : m_signal) { + core_pair.second = false; + m_pending_cnt++; + } +} + +void +TrexDpPortEvent::disable() { + m_event_id = TrexDpPortEvents::EVENT_ID_INVALID; +} + +/** + * get the event status + * + */ + +TrexDpPortEvent::event_status_e +TrexDpPortEvent::status() { + + /* is it even active ? */ + if (m_event_id == TrexDpPortEvents::EVENT_ID_INVALID) { + return (EVENT_DISABLE); + } + + /* did it occured ? */ + if (m_pending_cnt == 0) { + return (EVENT_OCCURED); + } + + /* so we are enabled and the event did not occur - maybe we timed out ? */ + if ( (m_expire_limit_ms > 0) && (os_get_time_msec() > m_expire_limit_ms) ) { + return (EVENT_TIMED_OUT); + } + + /* so we are still waiting... */ + return (EVENT_PENDING); + +} + +void +TrexDpPortEvent::err(int thread_id, int event_id, const std::string &err_msg) { + std::stringstream err; + err << "DP event '" << event_name(m_event_type) << "' on thread id '" << thread_id << "' with key '" << event_id <<"' - "; +} + +/** + * event occured + * + */ +void +TrexDpPortEvent::handle_event(int thread_id, int event_id) { + + /* if the event is disabled - we don't care */ + if (!is_active()) { + return; + } + + /* check the event id is matching the required event */ + if (event_id != m_event_id) { + err(thread_id, event_id, "event key mismatch"); + } + + /* mark sure no double signal */ + if (m_signal.at(thread_id)) { + err(thread_id, event_id, "double signal"); + + } else { + /* mark */ + m_signal.at(thread_id) = true; + m_pending_cnt--; + } + + /* event occured */ + if (m_pending_cnt == 0) { + m_port->on_dp_event_occured(m_event_type); + m_event_id = TrexDpPortEvents::EVENT_ID_INVALID; + } +} + +bool +TrexDpPortEvent::is_active() { + return (status() != EVENT_DISABLE); +} + +bool +TrexDpPortEvent::has_timeout_expired() { + return (status() == EVENT_TIMED_OUT); +} + +const char * +TrexDpPortEvent::event_name(event_e type) { + switch (type) { + case EVENT_STOP: + return "DP STOP"; + + default: + throw TrexException("unknown event type"); + } + +} diff --git a/src/stateless/cp/trex_dp_port_events.h b/src/stateless/cp/trex_dp_port_events.h new file mode 100644 index 00000000..309288df --- /dev/null +++ b/src/stateless/cp/trex_dp_port_events.h @@ -0,0 +1,165 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_DP_PORT_EVENTS_H__ +#define __TREX_DP_PORT_EVENTS_H__ + +#include <unordered_map> +#include <string> + +class TrexStatelessPort; + +/** + * describes a single DP event related to port + * + * @author imarom (18-Nov-15) + */ +class TrexDpPortEvent { +public: + + enum event_e { + EVENT_STOP = 1, + EVENT_MAX + }; + + /** + * status of the event for the port + */ + enum event_status_e { + EVENT_DISABLE, + EVENT_PENDING, + EVENT_TIMED_OUT, + EVENT_OCCURED + }; + + /** + * init for the event + * + */ + void create(event_e type, TrexStatelessPort *port); + + /** + * create a new pending event + * + */ + void wait_for_event(int event_id, int timeout_ms = -1); + + /** + * mark event as not allowed to happen + * + */ + void disable(); + + /** + * get the event status + * + */ + event_status_e status(); + + /** + * event occured + * + */ + void handle_event(int thread_id, int event_id); + + /** + * returns true if event is active + * + */ + bool is_active(); + + /** + * has timeout already expired ? + * + */ + bool has_timeout_expired(); + + /** + * generate error + * + */ + void err(int thread_id, int event_id, const std::string &err_msg); + + /** + * event to name + * + */ + static const char * event_name(event_e type); + + +private: + + event_e m_event_type; + std::unordered_map<int, bool> m_signal; + int m_pending_cnt; + + TrexStatelessPort *m_port; + int m_event_id; + int m_expire_limit_ms; + +}; + +/** + * all the events related to a port + * + */ +class TrexDpPortEvents { +public: + friend class TrexDpPortEvent; + + void create(TrexStatelessPort *port); + + /** + * generate a new event ID to be used with wait_for_event + * + */ + int generate_event_id(); + + /** + * wait a new DP event on the port + * returns a key which will be used to identify + * the event happened + * + * @author imarom (18-Nov-15) + * + * @param ev - type of event + * @param event_id - a unique identifier for the event + * @param timeout_ms - does it has a timeout ? + * + */ + void wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms = -1); + + /** + * event has occured + * + */ + void handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id); + +private: + static const int EVENT_ID_INVALID = -1; + + TrexDpPortEvent m_events[TrexDpPortEvent::EVENT_MAX]; + int m_event_id_counter; + + TrexStatelessPort *m_port; + +}; + +#endif /* __TREX_DP_PORT_EVENTS_H__ */ diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index e0e95450..6ef24a7b 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -47,7 +47,7 @@ TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) { m_port_count = cfg.m_port_count; for (int i = 0; i < m_port_count; i++) { - m_ports.push_back(new TrexStatelessPort(i)); + m_ports.push_back(new TrexStatelessPort(i, cfg.m_platform_api)); } m_platform_api = cfg.m_platform_api; diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index cbc5a328..13d0fc9f 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -52,9 +52,25 @@ using namespace std; * trex stateless port * **************************/ -TrexStatelessPort::TrexStatelessPort(uint8_t port_id) : m_port_id(port_id) { +TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) { + std::vector<std::pair<uint8_t, uint8_t>> core_pair_list; + + m_port_id = port_id; + m_port_state = PORT_STATE_IDLE; clear_owner(); + + /* get the DP cores belonging to this port */ + api->port_id_to_cores(m_port_id, core_pair_list); + + for (auto core_pair : core_pair_list) { + + /* send the core id */ + m_cores_id_list.push_back(core_pair.first); + } + + /* init the events DP DB */ + m_dp_events.create(this); } @@ -105,11 +121,16 @@ TrexStatelessPort::start_traffic(double mul, double duration) { } /* generate a message to all the relevant DP cores to start transmitting */ - TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(compiled_obj, duration); + m_event_id = m_dp_events.generate_event_id(); + TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_event_id, compiled_obj, duration); + + change_state(PORT_STATE_TX); send_message_to_dp(start_msg); - change_state(PORT_STATE_TX); + /* mark that DP event of stoppped is possible */ + m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, m_event_id); + } /** @@ -279,15 +300,32 @@ TrexStatelessPort::encode_stats(Json::Value &port) { void TrexStatelessPort::send_message_to_dp(TrexStatelessCpToDpMsgBase *msg) { - std::vector<std::pair<uint8_t, uint8_t>> cores_id_list; - - get_stateless_obj()->get_platform_api()->port_id_to_cores(m_port_id, cores_id_list); - - for (auto core_pair : cores_id_list) { + for (auto core_id : m_cores_id_list) { /* send the message to the core */ - CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_pair.first); + CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_id); ring->Enqueue((CGenNode *)msg->clone()); } } + +/** + * when a DP (async) event occurs - handle it + * + */ +void +TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) { + switch (event_type) { + + case TrexDpPortEvent::EVENT_STOP: + /* set a stop event */ + change_state(PORT_STATE_STREAMS); + /* send a ZMQ event */ + break; + + default: + assert(0); + + } + printf("hey"); +} diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index b533f793..da75284e 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -22,7 +22,9 @@ limitations under the License. #define __TREX_STATELESS_PORT_H__ #include <trex_stream.h> +#include <trex_dp_port_events.h> +class TrexPlatformApi; class TrexStatelessCpToDpMsgBase; /** @@ -31,6 +33,8 @@ class TrexStatelessCpToDpMsgBase; * @author imarom (31-Aug-15) */ class TrexStatelessPort { + friend class TrexDpPortEvent; + public: /** @@ -54,7 +58,7 @@ public: RC_ERR_FAILED_TO_COMPILE_STREAMS }; - TrexStatelessPort(uint8_t port_id); + TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api); /** * acquire port @@ -199,6 +203,10 @@ public: m_stream_table.get_object_list(object_list); } + TrexDpPortEvents & get_dp_events() { + return m_dp_events; + } + private: @@ -224,6 +232,10 @@ private: } + const std::vector<int> get_core_id_list () { + return m_cores_id_list; + } + bool verify_state(int state, bool should_throw = true) const; void change_state(port_state_e new_state); @@ -232,11 +244,25 @@ private: void send_message_to_dp(TrexStatelessCpToDpMsgBase *msg); - TrexStreamTable m_stream_table; - uint8_t m_port_id; - port_state_e m_port_state; - std::string m_owner; - std::string m_owner_handler; + /** + * triggered when event occurs + * + */ + void on_dp_event_occured(TrexDpPortEvent::event_e event_type); + + + TrexStreamTable m_stream_table; + uint8_t m_port_id; + port_state_e m_port_state; + std::string m_owner; + std::string m_owner_handler; + + /* holds the DP cores associated with this port */ + //std::vector<std::pair<uint8_t, uint8_t>> m_cores_id_list; + std::vector<int> m_cores_id_list; + + TrexDpPortEvents m_dp_events; + int m_event_id; }; #endif /* __TREX_STATELESS_PORT_H__ */ diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index c4fdd44b..cde34a4b 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -486,6 +486,7 @@ TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj, if ( duration > 0.0 ){ add_port_duration( duration ,obj->get_port_id() ); } + } @@ -516,6 +517,17 @@ TrexStatelessDpCore::stop_traffic(uint8_t port_id) { schedule_exit(); } + + /* inform the control plane we stopped - this might be a async stop + (streams ended) + */ + CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id); + TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id, + port_id, + TrexDpPortEvent::EVENT_STOP, + get_event_id()); + ring->Enqueue((CGenNode *)event_msg); + } /** diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h index 85afcf8f..7ee5abc4 100644 --- a/src/stateless/dp/trex_stateless_dp_core.h +++ b/src/stateless/dp/trex_stateless_dp_core.h @@ -166,6 +166,14 @@ public: /* quit the main loop, work in both stateless in stateful, don't free memory trigger from master */ void quit_main_loop(); + void set_event_id(int event_id) { + m_event_id = event_id; + } + + int get_event_id() { + return m_event_id; + } + bool set_stateless_next_node(CGenNodeStateless * cur_node, CGenNodeStateless * next_node); @@ -228,6 +236,7 @@ private: CFlowGenListPerThread * m_core; double m_duration; + int m_event_id; }; #endif /* __TREX_STATELESS_DP_CORE_H__ */ diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index 856fd9e3..c92ad68a 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -22,12 +22,17 @@ limitations under the License. #include <trex_stateless_messaging.h> #include <trex_stateless_dp_core.h> #include <trex_streams_compiler.h> +#include <trex_stateless.h> + #include <string.h> /************************* start traffic message ************************/ -TrexStatelessDpStart::TrexStatelessDpStart(TrexStreamsCompiledObj *obj, double duration) : m_obj(obj), m_duration(duration) { +TrexStatelessDpStart::TrexStatelessDpStart(int event_id, TrexStreamsCompiledObj *obj, double duration) { + m_event_id = event_id; + m_obj = obj; + m_duration = duration; } @@ -40,7 +45,7 @@ TrexStatelessDpStart::clone() { TrexStreamsCompiledObj *new_obj = m_obj->clone(); - TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpStart(new_obj, m_duration); + TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpStart(m_event_id, new_obj, m_duration); return new_msg; } @@ -54,7 +59,12 @@ TrexStatelessDpStart::~TrexStatelessDpStart() { bool TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) { + /* mark the event id for DP response */ + dp_core->set_event_id(m_event_id); + + /* staet traffic */ dp_core->start_traffic(m_obj, m_duration); + return true; } @@ -96,7 +106,6 @@ bool TrexStatelessDpQuit::handle(TrexStatelessDpCore *dp_core){ return (true); } - bool TrexStatelessDpCanQuit::handle(TrexStatelessDpCore *dp_core){ if ( dp_core->are_all_ports_idle() ){ @@ -115,3 +124,12 @@ TrexStatelessDpCanQuit::clone(){ } +/************************* messages from DP to CP **********************/ +bool +TrexDpPortEventMsg::handle() { + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(m_port_id); + port->get_dp_events().handle_event(m_event_type, m_thread_id, m_event_id); + + return (true); +} + diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index 7dc307c7..445e9378 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -23,6 +23,7 @@ limitations under the License. #define __TREX_STATELESS_MESSAGING_H__ #include <msg_manager.h> +#include <trex_dp_port_events.h> class TrexStatelessDpCore; class TrexStreamsCompiledObj; @@ -42,12 +43,8 @@ public: virtual ~TrexStatelessCpToDpMsgBase() { } - /** - * virtual function to handle a message - * - */ - virtual bool handle(TrexStatelessDpCore *dp_core) = 0; + virtual bool handle(TrexStatelessDpCore *dp_core) = 0; /** * clone the current message @@ -66,7 +63,9 @@ public: /* no copy constructor */ TrexStatelessCpToDpMsgBase(TrexStatelessCpToDpMsgBase &) = delete; -private: + +protected: + int m_event_id; bool m_quit_scheduler; }; @@ -78,16 +77,17 @@ private: class TrexStatelessDpStart : public TrexStatelessCpToDpMsgBase { public: - TrexStatelessDpStart(TrexStreamsCompiledObj *obj, double duration); + TrexStatelessDpStart(int m_event_id, TrexStreamsCompiledObj *obj, double duration); ~TrexStatelessDpStart(); - virtual bool handle(TrexStatelessDpCore *dp_core); - virtual TrexStatelessCpToDpMsgBase * clone(); + virtual bool handle(TrexStatelessDpCore *dp_core); private: + + int m_event_id; TrexStreamsCompiledObj *m_obj; double m_duration; }; @@ -103,10 +103,10 @@ public: TrexStatelessDpStop(uint8_t port_id) : m_port_id(port_id) { } - virtual bool handle(TrexStatelessDpCore *dp_core); - virtual TrexStatelessCpToDpMsgBase * clone(); + virtual bool handle(TrexStatelessDpCore *dp_core); + private: uint8_t m_port_id; }; @@ -122,9 +122,11 @@ public: TrexStatelessDpQuit() { } - virtual bool handle(TrexStatelessDpCore *dp_core); virtual TrexStatelessCpToDpMsgBase * clone(); + + virtual bool handle(TrexStatelessDpCore *dp_core); + }; /** @@ -145,4 +147,58 @@ public: +/************************* messages from DP to CP **********************/ + +/** + * defines the base class for CP to DP messages + * + * @author imarom (27-Oct-15) + */ +class TrexStatelessDpToCpMsgBase { +public: + + TrexStatelessDpToCpMsgBase() { + } + + virtual ~TrexStatelessDpToCpMsgBase() { + } + + /** + * virtual function to handle a message + * + */ + virtual bool handle() = 0; + + /* no copy constructor */ + TrexStatelessDpToCpMsgBase(TrexStatelessDpToCpMsgBase &) = delete; + +}; + + +/** + * a message indicating an event has happened on a port at the + * DP + * + */ +class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase { +public: + + TrexDpPortEventMsg(int thread_id, uint8_t port_id, TrexDpPortEvent::event_e type, int event_id) { + m_thread_id = thread_id; + m_port_id = port_id; + m_event_type = type; + m_event_id = event_id; + } + + virtual bool handle(); + +private: + int m_thread_id; + uint8_t m_port_id; + TrexDpPortEvent::event_e m_event_type; + int m_event_id; + +}; + #endif /* __TREX_STATELESS_MESSAGING_H__ */ + |