diff options
author | imarom <imarom@cisco.com> | 2016-03-02 11:05:51 +0200 |
---|---|---|
committer | imarom <imarom@cisco.com> | 2016-03-02 13:35:09 +0200 |
commit | 59a3b58d240661a2bc06c6ede473d2eda4eb5e55 (patch) | |
tree | 37dd8d43c4bc6a0421d5964d7d1c57be3cca51a2 /src/stateless/cp/trex_stateless_port.cpp | |
parent | 70cfb9f88b00016f1413754e5625b5b05acc2063 (diff) |
TX barrier
Diffstat (limited to 'src/stateless/cp/trex_stateless_port.cpp')
-rw-r--r-- | src/stateless/cp/trex_stateless_port.cpp | 108 |
1 files changed, 62 insertions, 46 deletions
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index c60b0e85..257af546 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -48,11 +48,35 @@ port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &core using namespace std; + + +/*************************** + * trex DP events handlers + * + **************************/ +class AsyncStopEvent : public TrexDpPortEvent { + +protected: + /** + * when an async event occurs (all cores have reported in) + * + * @author imarom (29-Feb-16) + */ + virtual void on_event() { + get_port()->change_state(TrexStatelessPort::PORT_STATE_STREAMS); + + get_port()->common_port_stop_actions(true); + + assert(get_port()->m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID); + get_port()->m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID; + } +}; + /*************************** * trex stateless port * **************************/ -TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) { +TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this) { std::vector<std::pair<uint8_t, uint8_t>> core_pair_list; m_port_id = port_id; @@ -73,16 +97,20 @@ TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api m_cores_id_list.push_back(core_pair.first); } - /* init the events DP DB */ - m_dp_events.create(this); - m_graph_obj = NULL; + + m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID; } TrexStatelessPort::~TrexStatelessPort() { if (m_graph_obj) { delete m_graph_obj; } + + if (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) { + m_dp_events.destroy_event(m_pending_async_stop_event); + m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID; + } } /** @@ -170,16 +198,14 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration, } /* generate a message to all the relevant DP cores to start transmitting */ - int event_id = m_dp_events.generate_event_id(); - - /* mark that DP event of stoppped is possible */ - m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, event_id); - + assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID); + m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent()); /* update object status */ m_factor = factor; m_last_all_streams_continues = compiled_objs[0]->get_all_streams_continues(); m_last_duration = duration; + change_state(PORT_STATE_TX); /* update the DP - messages will be freed by the DP */ @@ -188,17 +214,23 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration, /* was the core assigned a compiled object ? */ if (compiled_objs[index]) { - TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_objs[index], duration); + TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, + m_pending_async_stop_event, + compiled_objs[index], + duration); send_message_to_dp(core_id, start_msg); } else { /* mimic an end event */ - m_dp_events.handle_event(TrexDpPortEvent::EVENT_STOP, core_id, event_id); + m_dp_events.on_core_reporting_in(m_pending_async_stop_event, core_id); } index++; } + /* for debug - this can be turn on */ + //m_dp_events.barrier(); + /* update subscribers */ Json::Value data; data["port_id"] = m_port_id; @@ -225,17 +257,23 @@ TrexStatelessPort::stop_traffic(void) { /* delete any previous graphs */ delete_streams_graph(); - /* mask out the DP stop event */ - m_dp_events.disable(TrexDpPortEvent::EVENT_STOP); + /* to avoid race, first destroy any previous stop/pause events */ + if (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) { + m_dp_events.destroy_event(m_pending_async_stop_event); + m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID; + } + /* generate a message to all the relevant DP cores to start transmitting */ TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id); - send_message_to_all_dp(stop_msg); - /* continue to general actions */ + /* a barrier - make sure all the DP cores stopped */ + m_dp_events.barrier(); + + change_state(PORT_STATE_STREAMS); + common_port_stop_actions(false); - } /** @@ -243,14 +281,12 @@ TrexStatelessPort::stop_traffic(void) { * */ void -TrexStatelessPort::common_port_stop_actions(bool event_triggered) { +TrexStatelessPort::common_port_stop_actions(bool async) { - change_state(PORT_STATE_STREAMS); - Json::Value data; data["port_id"] = m_port_id; - if (event_triggered) { + if (async) { get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_FINISHED_TX, data); } else { get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STOPPED, data); @@ -274,17 +310,18 @@ TrexStatelessPort::pause_traffic(void) { throw TrexException(" pause is supported when duration is not enable is start command "); } + /* send a pause message */ TrexStatelessCpToDpMsgBase *pause_msg = new TrexStatelessDpPause(m_port_id); - send_message_to_all_dp(pause_msg); - change_state(PORT_STATE_PAUSE); + /* make sure all DP cores paused */ + m_dp_events.barrier(); - Json::Value data; - data["port_id"] = m_port_id; - get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_PAUSED, data); + /* change state */ + change_state(PORT_STATE_PAUSE); } + void TrexStatelessPort::resume_traffic(void) { @@ -294,7 +331,6 @@ TrexStatelessPort::resume_traffic(void) { TrexStatelessCpToDpMsgBase *resume_msg = new TrexStatelessDpResume(m_port_id); send_message_to_all_dp(resume_msg); - change_state(PORT_STATE_TX); @@ -335,7 +371,6 @@ TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul, bool force) { } TrexStatelessCpToDpMsgBase *update_msg = new TrexStatelessDpUpdate(m_port_id, factor); - send_message_to_all_dp(update_msg); m_factor *= factor; @@ -439,25 +474,6 @@ TrexStatelessPort::send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBas ring->Enqueue((CGenNode *)msg); } -/** - * when a DP (async) event occurs - handle it - * - */ -void -TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) { - Json::Value data; - - switch (event_type) { - - case TrexDpPortEvent::EVENT_STOP: - common_port_stop_actions(true); - break; - - default: - assert(0); - - } -} uint64_t TrexStatelessPort::get_port_speed_bps() const { |