diff options
Diffstat (limited to 'src/stateless/cp')
-rw-r--r-- | src/stateless/cp/trex_dp_port_events.cpp | 217 | ||||
-rw-r--r-- | src/stateless/cp/trex_dp_port_events.h | 121 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless.cpp | 2 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless.h | 6 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.cpp | 104 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.h | 16 | ||||
-rw-r--r-- | src/stateless/cp/trex_stream.h | 2 | ||||
-rw-r--r-- | src/stateless/cp/trex_streams_compiler.cpp | 3 |
8 files changed, 192 insertions, 279 deletions
diff --git a/src/stateless/cp/trex_dp_port_events.cpp b/src/stateless/cp/trex_dp_port_events.cpp index ba327e59..8e098adf 100644 --- a/src/stateless/cp/trex_dp_port_events.cpp +++ b/src/stateless/cp/trex_dp_port_events.cpp @@ -20,6 +20,7 @@ limitations under the License. */ #include <trex_dp_port_events.h> +#include <trex_stateless_messaging.h> #include <sstream> #include <os_time.h> #include <trex_stateless.h> @@ -27,24 +28,20 @@ limitations under the License. /** * port events */ -void -TrexDpPortEvents::create(TrexStatelessPort *port) { +TrexDpPortEvents::TrexDpPortEvents(TrexStatelessPort *port) { m_port = port; - - for (int i = 0; i < TrexDpPortEvent::EVENT_MAX; i++) { - m_events[i].create((TrexDpPortEvent::event_e) i, port); - } - m_event_id_counter = EVENT_ID_INVALID; } -/** - * generate a new event ID - * - */ -int -TrexDpPortEvents::generate_event_id() { - return (++m_event_id_counter); +TrexDpPortEvent * +TrexDpPortEvents::lookup(int event_id) { + auto search = m_events.find(event_id); + + if (search != m_events.end()) { + return search->second; + } else { + return NULL; + } } /** @@ -52,21 +49,49 @@ TrexDpPortEvents::generate_event_id() { * all other events will be disabled * */ -void -TrexDpPortEvents::wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms) { +int +TrexDpPortEvents::create_event(TrexDpPortEvent *event, int timeout_ms) { + /* allocate ID for event */ + int event_id = ++m_event_id_counter; - /* first disable all events */ - for (TrexDpPortEvent & e : m_events) { - e.disable(); - } + /* init and add */ + event->init(m_port, event_id, timeout_ms); + m_events[event_id] = event; - /* mark this event as allowed */ - m_events[ev].wait_for_event(event_id, timeout_ms); + return event_id; } void -TrexDpPortEvents::disable(TrexDpPortEvent::event_e ev) { - m_events[ev].disable(); +TrexDpPortEvents::destroy_event(int event_id) { + TrexDpPortEvent *event = lookup(event_id); + if (!event) { + /* cannot find event */ + throw TrexException("internal error - cannot find event"); + } + + m_events.erase(event_id); + delete event; +} + +class DPBarrier : public TrexDpPortEvent { +protected: + virtual void on_event() { + /* do nothing */ + } +}; + +void +TrexDpPortEvents::barrier() { + int barrier_id = create_event(new DPBarrier()); + + TrexStatelessCpToDpMsgBase *barrier_msg = new TrexStatelessDpBarrier(m_port->m_port_id, barrier_id); + m_port->send_message_to_all_dp(barrier_msg); + + get_stateless_obj()->get_platform_api()->flush_dp_messages(); + while (lookup(barrier_id) != NULL) { + delay(1); + get_stateless_obj()->get_platform_api()->flush_dp_messages(); + } } /** @@ -74,39 +99,33 @@ TrexDpPortEvents::disable(TrexDpPortEvent::event_e ev) { * */ void -TrexDpPortEvents::handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id) { - m_events[ev].handle_event(thread_id, event_id); -} - -/*********** - * single event object - * - */ +TrexDpPortEvents::on_core_reporting_in(int event_id, int thread_id) { + TrexDpPortEvent *event = lookup(event_id); + /* event might have been deleted */ + if (!event) { + return; + } -void -TrexDpPortEvent::create(event_e type, TrexStatelessPort *port) { - m_event_type = type; - m_port = port; + bool done = event->on_core_reporting_in(thread_id); - /* add the core ids to the hash */ - m_signal.clear(); - for (int core_id : m_port->get_core_id_list()) { - m_signal[core_id] = false; + if (done) { + destroy_event(event_id); } - - /* event is disabled */ - disable(); } -/** - * wait the event using event id and timeout +/*************************** + * event * - */ -void -TrexDpPortEvent::wait_for_event(int event_id, int timeout_ms) { + **************************/ +TrexDpPortEvent::TrexDpPortEvent() { + m_port = NULL; + m_event_id = -1; +} - /* set a new event id */ +void +TrexDpPortEvent::init(TrexStatelessPort *port, int event_id, int timeout_ms) { + m_port = port; m_event_id = event_id; /* do we have a timeout ? */ @@ -118,103 +137,33 @@ TrexDpPortEvent::wait_for_event(int event_id, int timeout_ms) { /* prepare the signal array */ m_pending_cnt = 0; - for (auto & core_pair : m_signal) { - core_pair.second = false; + for (int core_id : m_port->get_core_id_list()) { + m_signal[core_id] = false; m_pending_cnt++; } } -void -TrexDpPortEvent::disable() { - m_event_id = TrexDpPortEvents::EVENT_ID_INVALID; -} - -/** - * get the event status - * - */ - -TrexDpPortEvent::event_status_e -TrexDpPortEvent::status() { - - /* is it even active ? */ - if (m_event_id == TrexDpPortEvents::EVENT_ID_INVALID) { - return (EVENT_DISABLE); - } - - /* did it occured ? */ - if (m_pending_cnt == 0) { - return (EVENT_OCCURED); - } - - /* so we are enabled and the event did not occur - maybe we timed out ? */ - if ( (m_expire_limit_ms > 0) && (os_get_time_msec() > m_expire_limit_ms) ) { - return (EVENT_TIMED_OUT); - } - - /* so we are still waiting... */ - return (EVENT_PENDING); - -} - -void -TrexDpPortEvent::err(int thread_id, int event_id, const std::string &err_msg) { - std::stringstream err; - err << "DP event '" << event_name(m_event_type) << "' on thread id '" << thread_id << "' with key '" << event_id <<"' - "; -} - -/** - * event occured - * - */ -void -TrexDpPortEvent::handle_event(int thread_id, int event_id) { - - /* if the event is disabled - we don't care */ - if (!is_active()) { - return; - } - - /* check the event id is matching the required event - if not maybe its an old signal */ - if (event_id != m_event_id) { - return; - } - +bool +TrexDpPortEvent::on_core_reporting_in(int thread_id) { /* mark sure no double signal */ if (m_signal.at(thread_id)) { - err(thread_id, event_id, "double signal"); + std::stringstream err; + err << "double signal detected on event id: " << m_event_id; + throw TrexException(err.str()); - } else { - /* mark */ - m_signal.at(thread_id) = true; - m_pending_cnt--; } + /* mark */ + m_signal.at(thread_id) = true; + m_pending_cnt--; + /* event occured */ if (m_pending_cnt == 0) { - m_port->on_dp_event_occured(m_event_type); - m_event_id = TrexDpPortEvents::EVENT_ID_INVALID; + on_event(); + return true; + } else { + return false; } } -bool -TrexDpPortEvent::is_active() { - return (status() != EVENT_DISABLE); -} - -bool -TrexDpPortEvent::has_timeout_expired() { - return (status() == EVENT_TIMED_OUT); -} - -const char * -TrexDpPortEvent::event_name(event_e type) { - switch (type) { - case EVENT_STOP: - return "DP STOP"; - default: - throw TrexException("unknown event type"); - } - -} diff --git a/src/stateless/cp/trex_dp_port_events.h b/src/stateless/cp/trex_dp_port_events.h index 557e590b..3b8c8633 100644 --- a/src/stateless/cp/trex_dp_port_events.h +++ b/src/stateless/cp/trex_dp_port_events.h @@ -25,95 +25,43 @@ limitations under the License. #include <string> class TrexStatelessPort; +class TrexDpPortEvents; /** - * describes a single DP event related to port + * interface class for DP events * - * @author imarom (18-Nov-15) + * @author imarom (29-Feb-16) */ class TrexDpPortEvent { -public: - - enum event_e { - EVENT_STOP = 1, - EVENT_MAX - }; - - /** - * status of the event for the port - */ - enum event_status_e { - EVENT_DISABLE, - EVENT_PENDING, - EVENT_TIMED_OUT, - EVENT_OCCURED - }; - - /** - * init for the event - * - */ - void create(event_e type, TrexStatelessPort *port); - - /** - * create a new pending event - * - */ - void wait_for_event(int event_id, int timeout_ms = -1); - - /** - * mark event as not allowed to happen - * - */ - void disable(); - - /** - * get the event status - * - */ - event_status_e status(); - - /** - * event occured - * - */ - void handle_event(int thread_id, int event_id); + friend TrexDpPortEvents; - /** - * returns true if event is active - * - */ - bool is_active(); - - /** - * has timeout already expired ? - * - */ - bool has_timeout_expired(); - - /** - * generate error - * - */ - void err(int thread_id, int event_id, const std::string &err_msg); +public: + TrexDpPortEvent(); + virtual ~TrexDpPortEvent() {} +protected: /** - * event to name + * what to do when an event has been completed (all cores + * reported in * + * @author imarom (29-Feb-16) */ - static const char * event_name(event_e type); + virtual void on_event() = 0; + TrexStatelessPort *get_port() { + return m_port; + } private: + void init(TrexStatelessPort *port, int event_id, int timeout_ms); + bool on_core_reporting_in(int thread_id); - event_e m_event_type; std::unordered_map<int, bool> m_signal; int m_pending_cnt; TrexStatelessPort *m_port; int m_event_id; int m_expire_limit_ms; - }; /** @@ -124,44 +72,39 @@ class TrexDpPortEvents { public: friend class TrexDpPortEvent; - void create(TrexStatelessPort *port); + static const int INVALID_ID = -1; - /** - * generate a new event ID to be used with wait_for_event - * - */ - int generate_event_id(); + TrexDpPortEvents(TrexStatelessPort *port); /** * wait a new DP event on the port * returns a key which will be used to identify * the event happened * - * @author imarom (18-Nov-15) - * - * @param ev - type of event - * @param event_id - a unique identifier for the event - * @param timeout_ms - does it has a timeout ? - * */ - void wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms = -1); + int create_event(TrexDpPortEvent *event, int timeout_ms = -1); /** - * disable an event (don't care) + * destroy an event * */ - void disable(TrexDpPortEvent::event_e ev); + void destroy_event(int event_id); /** - * event has occured - * + * return when all DP cores have responsed on a barrier + */ + void barrier(); + + /** + * a core has reached the event */ - void handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id); + void on_core_reporting_in(int event_id, int thread_id); private: - static const int EVENT_ID_INVALID = -1; + TrexDpPortEvent *lookup(int event_id); - TrexDpPortEvent m_events[TrexDpPortEvent::EVENT_MAX]; + static const int EVENT_ID_INVALID = -1; + std::unordered_map<int, TrexDpPortEvent *> m_events; int m_event_id_counter; TrexStatelessPort *m_port; diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index a4522837..9e24802b 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -40,7 +40,7 @@ TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) { /* create RPC servers */ /* set both servers to mutex each other */ - m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_rpc_async_cfg, &m_global_cp_lock); + m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_global_lock); m_rpc_server->set_verbose(cfg.m_rpc_server_verbose); /* configure ports */ diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h index cc47da6b..6e5e0c44 100644 --- a/src/stateless/cp/trex_stateless.h +++ b/src/stateless/cp/trex_stateless.h @@ -92,18 +92,19 @@ public: TrexStatelessCfg() { m_port_count = 0; m_rpc_req_resp_cfg = NULL; - m_rpc_async_cfg = NULL; m_rpc_server_verbose = false; m_platform_api = NULL; m_publisher = NULL; + m_global_lock = NULL; } const TrexRpcServerConfig *m_rpc_req_resp_cfg; - const TrexRpcServerConfig *m_rpc_async_cfg; const TrexPlatformApi *m_platform_api; bool m_rpc_server_verbose; uint8_t m_port_count; TrexPublisher *m_publisher; + std::mutex *m_global_lock; + }; /** @@ -186,7 +187,6 @@ protected: TrexPublisher *m_publisher; - std::mutex m_global_cp_lock; }; /** diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index c60b0e85..7302e05d 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -48,11 +48,35 @@ port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &core using namespace std; + + +/*************************** + * trex DP events handlers + * + **************************/ +class AsyncStopEvent : public TrexDpPortEvent { + +protected: + /** + * when an async event occurs (all cores have reported in) + * + * @author imarom (29-Feb-16) + */ + virtual void on_event() { + get_port()->change_state(TrexStatelessPort::PORT_STATE_STREAMS); + + get_port()->common_port_stop_actions(true); + + assert(get_port()->m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID); + get_port()->m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID; + } +}; + /*************************** * trex stateless port * **************************/ -TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) { +TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this) { std::vector<std::pair<uint8_t, uint8_t>> core_pair_list; m_port_id = port_id; @@ -73,16 +97,20 @@ TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api m_cores_id_list.push_back(core_pair.first); } - /* init the events DP DB */ - m_dp_events.create(this); - m_graph_obj = NULL; + + m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID; } TrexStatelessPort::~TrexStatelessPort() { if (m_graph_obj) { delete m_graph_obj; } + + if (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) { + m_dp_events.destroy_event(m_pending_async_stop_event); + m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID; + } } /** @@ -170,16 +198,14 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration, } /* generate a message to all the relevant DP cores to start transmitting */ - int event_id = m_dp_events.generate_event_id(); - - /* mark that DP event of stoppped is possible */ - m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, event_id); - + assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID); + m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent()); /* update object status */ m_factor = factor; m_last_all_streams_continues = compiled_objs[0]->get_all_streams_continues(); m_last_duration = duration; + change_state(PORT_STATE_TX); /* update the DP - messages will be freed by the DP */ @@ -188,17 +214,23 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration, /* was the core assigned a compiled object ? */ if (compiled_objs[index]) { - TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_objs[index], duration); + TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, + m_pending_async_stop_event, + compiled_objs[index], + duration); send_message_to_dp(core_id, start_msg); } else { /* mimic an end event */ - m_dp_events.handle_event(TrexDpPortEvent::EVENT_STOP, core_id, event_id); + m_dp_events.on_core_reporting_in(m_pending_async_stop_event, core_id); } index++; } + /* for debug - this can be turn on */ + //m_dp_events.barrier(); + /* update subscribers */ Json::Value data; data["port_id"] = m_port_id; @@ -225,17 +257,23 @@ TrexStatelessPort::stop_traffic(void) { /* delete any previous graphs */ delete_streams_graph(); - /* mask out the DP stop event */ - m_dp_events.disable(TrexDpPortEvent::EVENT_STOP); + /* to avoid race, first destroy any previous stop/pause events */ + if (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) { + m_dp_events.destroy_event(m_pending_async_stop_event); + m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID; + } + /* generate a message to all the relevant DP cores to start transmitting */ TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id); - send_message_to_all_dp(stop_msg); - /* continue to general actions */ + /* a barrier - make sure all the DP cores stopped */ + m_dp_events.barrier(); + + change_state(PORT_STATE_STREAMS); + common_port_stop_actions(false); - } /** @@ -243,14 +281,12 @@ TrexStatelessPort::stop_traffic(void) { * */ void -TrexStatelessPort::common_port_stop_actions(bool event_triggered) { +TrexStatelessPort::common_port_stop_actions(bool async) { - change_state(PORT_STATE_STREAMS); - Json::Value data; data["port_id"] = m_port_id; - if (event_triggered) { + if (async) { get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_FINISHED_TX, data); } else { get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STOPPED, data); @@ -274,10 +310,14 @@ TrexStatelessPort::pause_traffic(void) { throw TrexException(" pause is supported when duration is not enable is start command "); } + /* send a pause message */ TrexStatelessCpToDpMsgBase *pause_msg = new TrexStatelessDpPause(m_port_id); - send_message_to_all_dp(pause_msg); + /* make sure all DP cores paused */ + m_dp_events.barrier(); + + /* change state */ change_state(PORT_STATE_PAUSE); Json::Value data; @@ -285,6 +325,7 @@ TrexStatelessPort::pause_traffic(void) { get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_PAUSED, data); } + void TrexStatelessPort::resume_traffic(void) { @@ -294,7 +335,6 @@ TrexStatelessPort::resume_traffic(void) { TrexStatelessCpToDpMsgBase *resume_msg = new TrexStatelessDpResume(m_port_id); send_message_to_all_dp(resume_msg); - change_state(PORT_STATE_TX); @@ -335,7 +375,6 @@ TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul, bool force) { } TrexStatelessCpToDpMsgBase *update_msg = new TrexStatelessDpUpdate(m_port_id, factor); - send_message_to_all_dp(update_msg); m_factor *= factor; @@ -439,25 +478,6 @@ TrexStatelessPort::send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBas ring->Enqueue((CGenNode *)msg); } -/** - * when a DP (async) event occurs - handle it - * - */ -void -TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) { - Json::Value data; - - switch (event_type) { - - case TrexDpPortEvent::EVENT_STOP: - common_port_stop_actions(true); - break; - - default: - assert(0); - - } -} uint64_t TrexStatelessPort::get_port_speed_bps() const { diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index 192d0d19..d3c4dcb9 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -102,13 +102,17 @@ private: }; +class AsyncStopEvent; + /** * describes a stateless port * * @author imarom (31-Aug-15) */ class TrexStatelessPort { - friend class TrexDpPortEvent; + friend TrexDpPortEvents; + friend TrexDpPortEvent; + friend AsyncStopEvent; public: @@ -363,18 +367,12 @@ private: */ void send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg); - /** - * triggered when event occurs - * - */ - void on_dp_event_occured(TrexDpPortEvent::event_e event_type); - /** * when a port stops, perform various actions * */ - void common_port_stop_actions(bool event_triggered); + void common_port_stop_actions(bool async); /** * calculate effective M per core @@ -421,6 +419,8 @@ private: /* owner information */ TrexPortOwner m_owner; + + int m_pending_async_stop_event; }; diff --git a/src/stateless/cp/trex_stream.h b/src/stateless/cp/trex_stream.h index 161e9592..1abf0c04 100644 --- a/src/stateless/cp/trex_stream.h +++ b/src/stateless/cp/trex_stream.h @@ -514,7 +514,7 @@ public: bool m_seq_enabled; bool m_latency; uint32_t m_user_id; - + uint16_t m_hw_id; } m_rx_check; uint32_t m_burst_total_pkts; /* valid in case of burst stSINGLE_BURST,stMULTI_BURST*/ diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp index 7c91754c..be5002da 100644 --- a/src/stateless/cp/trex_streams_compiler.cpp +++ b/src/stateless/cp/trex_streams_compiler.cpp @@ -476,7 +476,8 @@ TrexStreamsCompiler::compile_stream(TrexStream *stream, } TrexStream *fixed_rx_flow_stat_stream = stream->clone(true); - get_stateless_obj()->m_rx_flow_stat.start_stream(fixed_rx_flow_stat_stream); + + get_stateless_obj()->m_rx_flow_stat.start_stream(fixed_rx_flow_stat_stream, fixed_rx_flow_stat_stream->m_rx_check.m_hw_id); //???? check for errors /* can this stream be split to many cores ? */ if (!stream->is_splitable(dp_core_count)) { |