summaryrefslogtreecommitdiffstats
path: root/src/stateless/cp/trex_stateless_port.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/stateless/cp/trex_stateless_port.cpp')
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp64
1 files changed, 55 insertions, 9 deletions
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index cbc5a328..fbc5f7c7 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -52,9 +52,25 @@ using namespace std;
* trex stateless port
*
**************************/
-TrexStatelessPort::TrexStatelessPort(uint8_t port_id) : m_port_id(port_id) {
+TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) {
+ std::vector<std::pair<uint8_t, uint8_t>> core_pair_list;
+
+ m_port_id = port_id;
+
m_port_state = PORT_STATE_IDLE;
clear_owner();
+
+ /* get the DP cores belonging to this port */
+ api->port_id_to_cores(m_port_id, core_pair_list);
+
+ for (auto core_pair : core_pair_list) {
+
+ /* send the core id */
+ m_cores_id_list.push_back(core_pair.first);
+ }
+
+ /* init the events DP DB */
+ m_dp_events.create(this);
}
@@ -105,11 +121,16 @@ TrexStatelessPort::start_traffic(double mul, double duration) {
}
/* generate a message to all the relevant DP cores to start transmitting */
- TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(compiled_obj, duration);
+ int event_id = m_dp_events.generate_event_id();
+ /* mark that DP event of stoppped is possible */
+ m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, event_id);
- send_message_to_dp(start_msg);
+ TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_obj, duration);
change_state(PORT_STATE_TX);
+
+ send_message_to_dp(start_msg);
+
}
/**
@@ -126,12 +147,16 @@ TrexStatelessPort::stop_traffic(void) {
return;
}
+ /* mask out the DP stop event */
+ m_dp_events.disable(TrexDpPortEvent::EVENT_STOP);
+
/* generate a message to all the relevant DP cores to start transmitting */
TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id);
send_message_to_dp(stop_msg);
change_state(PORT_STATE_STREAMS);
+
}
void
@@ -279,15 +304,36 @@ TrexStatelessPort::encode_stats(Json::Value &port) {
void
TrexStatelessPort::send_message_to_dp(TrexStatelessCpToDpMsgBase *msg) {
- std::vector<std::pair<uint8_t, uint8_t>> cores_id_list;
-
- get_stateless_obj()->get_platform_api()->port_id_to_cores(m_port_id, cores_id_list);
-
- for (auto core_pair : cores_id_list) {
+ for (auto core_id : m_cores_id_list) {
/* send the message to the core */
- CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_pair.first);
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_id);
ring->Enqueue((CGenNode *)msg->clone());
}
}
+
+/**
+ * when a DP (async) event occurs - handle it
+ *
+ */
+void
+TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) {
+ Json::Value data;
+
+ switch (event_type) {
+
+ case TrexDpPortEvent::EVENT_STOP:
+ /* set a stop event */
+ change_state(PORT_STATE_STREAMS);
+ /* send a ZMQ event */
+
+ data["port_id"] = m_port_id;
+ get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STOPPED, data);
+ break;
+
+ default:
+ assert(0);
+
+ }
+}