summaryrefslogtreecommitdiffstats
path: root/src/stateless/cp/trex_stateless_port.cpp
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2016-03-02 11:05:51 +0200
committerimarom <imarom@cisco.com>2016-03-02 13:35:09 +0200
commit59a3b58d240661a2bc06c6ede473d2eda4eb5e55 (patch)
tree37dd8d43c4bc6a0421d5964d7d1c57be3cca51a2 /src/stateless/cp/trex_stateless_port.cpp
parent70cfb9f88b00016f1413754e5625b5b05acc2063 (diff)
TX barrier
Diffstat (limited to 'src/stateless/cp/trex_stateless_port.cpp')
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp108
1 files changed, 62 insertions, 46 deletions
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index c60b0e85..257af546 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -48,11 +48,35 @@ port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &core
using namespace std;
+
+
+/***************************
+ * trex DP events handlers
+ *
+ **************************/
+class AsyncStopEvent : public TrexDpPortEvent {
+
+protected:
+ /**
+ * when an async event occurs (all cores have reported in)
+ *
+ * @author imarom (29-Feb-16)
+ */
+ virtual void on_event() {
+ get_port()->change_state(TrexStatelessPort::PORT_STATE_STREAMS);
+
+ get_port()->common_port_stop_actions(true);
+
+ assert(get_port()->m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID);
+ get_port()->m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
+ }
+};
+
/***************************
* trex stateless port
*
**************************/
-TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) {
+TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this) {
std::vector<std::pair<uint8_t, uint8_t>> core_pair_list;
m_port_id = port_id;
@@ -73,16 +97,20 @@ TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api
m_cores_id_list.push_back(core_pair.first);
}
- /* init the events DP DB */
- m_dp_events.create(this);
-
m_graph_obj = NULL;
+
+ m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
}
TrexStatelessPort::~TrexStatelessPort() {
if (m_graph_obj) {
delete m_graph_obj;
}
+
+ if (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) {
+ m_dp_events.destroy_event(m_pending_async_stop_event);
+ m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
+ }
}
/**
@@ -170,16 +198,14 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration,
}
/* generate a message to all the relevant DP cores to start transmitting */
- int event_id = m_dp_events.generate_event_id();
-
- /* mark that DP event of stoppped is possible */
- m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, event_id);
-
+ assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID);
+ m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent());
/* update object status */
m_factor = factor;
m_last_all_streams_continues = compiled_objs[0]->get_all_streams_continues();
m_last_duration = duration;
+
change_state(PORT_STATE_TX);
/* update the DP - messages will be freed by the DP */
@@ -188,17 +214,23 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration,
/* was the core assigned a compiled object ? */
if (compiled_objs[index]) {
- TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_objs[index], duration);
+ TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id,
+ m_pending_async_stop_event,
+ compiled_objs[index],
+ duration);
send_message_to_dp(core_id, start_msg);
} else {
/* mimic an end event */
- m_dp_events.handle_event(TrexDpPortEvent::EVENT_STOP, core_id, event_id);
+ m_dp_events.on_core_reporting_in(m_pending_async_stop_event, core_id);
}
index++;
}
+ /* for debug - this can be turn on */
+ //m_dp_events.barrier();
+
/* update subscribers */
Json::Value data;
data["port_id"] = m_port_id;
@@ -225,17 +257,23 @@ TrexStatelessPort::stop_traffic(void) {
/* delete any previous graphs */
delete_streams_graph();
- /* mask out the DP stop event */
- m_dp_events.disable(TrexDpPortEvent::EVENT_STOP);
+ /* to avoid race, first destroy any previous stop/pause events */
+ if (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) {
+ m_dp_events.destroy_event(m_pending_async_stop_event);
+ m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
+ }
+
/* generate a message to all the relevant DP cores to start transmitting */
TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id);
-
send_message_to_all_dp(stop_msg);
- /* continue to general actions */
+ /* a barrier - make sure all the DP cores stopped */
+ m_dp_events.barrier();
+
+ change_state(PORT_STATE_STREAMS);
+
common_port_stop_actions(false);
-
}
/**
@@ -243,14 +281,12 @@ TrexStatelessPort::stop_traffic(void) {
*
*/
void
-TrexStatelessPort::common_port_stop_actions(bool event_triggered) {
+TrexStatelessPort::common_port_stop_actions(bool async) {
- change_state(PORT_STATE_STREAMS);
-
Json::Value data;
data["port_id"] = m_port_id;
- if (event_triggered) {
+ if (async) {
get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_FINISHED_TX, data);
} else {
get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STOPPED, data);
@@ -274,17 +310,18 @@ TrexStatelessPort::pause_traffic(void) {
throw TrexException(" pause is supported when duration is not enable is start command ");
}
+ /* send a pause message */
TrexStatelessCpToDpMsgBase *pause_msg = new TrexStatelessDpPause(m_port_id);
-
send_message_to_all_dp(pause_msg);
- change_state(PORT_STATE_PAUSE);
+ /* make sure all DP cores paused */
+ m_dp_events.barrier();
- Json::Value data;
- data["port_id"] = m_port_id;
- get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_PAUSED, data);
+ /* change state */
+ change_state(PORT_STATE_PAUSE);
}
+
void
TrexStatelessPort::resume_traffic(void) {
@@ -294,7 +331,6 @@ TrexStatelessPort::resume_traffic(void) {
TrexStatelessCpToDpMsgBase *resume_msg = new TrexStatelessDpResume(m_port_id);
send_message_to_all_dp(resume_msg);
-
change_state(PORT_STATE_TX);
@@ -335,7 +371,6 @@ TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul, bool force) {
}
TrexStatelessCpToDpMsgBase *update_msg = new TrexStatelessDpUpdate(m_port_id, factor);
-
send_message_to_all_dp(update_msg);
m_factor *= factor;
@@ -439,25 +474,6 @@ TrexStatelessPort::send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBas
ring->Enqueue((CGenNode *)msg);
}
-/**
- * when a DP (async) event occurs - handle it
- *
- */
-void
-TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) {
- Json::Value data;
-
- switch (event_type) {
-
- case TrexDpPortEvent::EVENT_STOP:
- common_port_stop_actions(true);
- break;
-
- default:
- assert(0);
-
- }
-}
uint64_t
TrexStatelessPort::get_port_speed_bps() const {