summaryrefslogtreecommitdiffstats
path: root/src/stateless
diff options
context:
space:
mode:
Diffstat (limited to 'src/stateless')
-rw-r--r--src/stateless/cp/trex_dp_port_events.cpp217
-rw-r--r--src/stateless/cp/trex_dp_port_events.h121
-rw-r--r--src/stateless/cp/trex_stateless.cpp2
-rw-r--r--src/stateless/cp/trex_stateless.h6
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp108
-rw-r--r--src/stateless/cp/trex_stateless_port.h16
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp36
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.h3
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp20
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h30
10 files changed, 254 insertions, 305 deletions
diff --git a/src/stateless/cp/trex_dp_port_events.cpp b/src/stateless/cp/trex_dp_port_events.cpp
index ba327e59..8e098adf 100644
--- a/src/stateless/cp/trex_dp_port_events.cpp
+++ b/src/stateless/cp/trex_dp_port_events.cpp
@@ -20,6 +20,7 @@ limitations under the License.
*/
#include <trex_dp_port_events.h>
+#include <trex_stateless_messaging.h>
#include <sstream>
#include <os_time.h>
#include <trex_stateless.h>
@@ -27,24 +28,20 @@ limitations under the License.
/**
* port events
*/
-void
-TrexDpPortEvents::create(TrexStatelessPort *port) {
+TrexDpPortEvents::TrexDpPortEvents(TrexStatelessPort *port) {
m_port = port;
-
- for (int i = 0; i < TrexDpPortEvent::EVENT_MAX; i++) {
- m_events[i].create((TrexDpPortEvent::event_e) i, port);
- }
-
m_event_id_counter = EVENT_ID_INVALID;
}
-/**
- * generate a new event ID
- *
- */
-int
-TrexDpPortEvents::generate_event_id() {
- return (++m_event_id_counter);
+TrexDpPortEvent *
+TrexDpPortEvents::lookup(int event_id) {
+ auto search = m_events.find(event_id);
+
+ if (search != m_events.end()) {
+ return search->second;
+ } else {
+ return NULL;
+ }
}
/**
@@ -52,21 +49,49 @@ TrexDpPortEvents::generate_event_id() {
* all other events will be disabled
*
*/
-void
-TrexDpPortEvents::wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms) {
+int
+TrexDpPortEvents::create_event(TrexDpPortEvent *event, int timeout_ms) {
+ /* allocate ID for event */
+ int event_id = ++m_event_id_counter;
- /* first disable all events */
- for (TrexDpPortEvent & e : m_events) {
- e.disable();
- }
+ /* init and add */
+ event->init(m_port, event_id, timeout_ms);
+ m_events[event_id] = event;
- /* mark this event as allowed */
- m_events[ev].wait_for_event(event_id, timeout_ms);
+ return event_id;
}
void
-TrexDpPortEvents::disable(TrexDpPortEvent::event_e ev) {
- m_events[ev].disable();
+TrexDpPortEvents::destroy_event(int event_id) {
+ TrexDpPortEvent *event = lookup(event_id);
+ if (!event) {
+ /* cannot find event */
+ throw TrexException("internal error - cannot find event");
+ }
+
+ m_events.erase(event_id);
+ delete event;
+}
+
+class DPBarrier : public TrexDpPortEvent {
+protected:
+ virtual void on_event() {
+ /* do nothing */
+ }
+};
+
+void
+TrexDpPortEvents::barrier() {
+ int barrier_id = create_event(new DPBarrier());
+
+ TrexStatelessCpToDpMsgBase *barrier_msg = new TrexStatelessDpBarrier(m_port->m_port_id, barrier_id);
+ m_port->send_message_to_all_dp(barrier_msg);
+
+ get_stateless_obj()->get_platform_api()->flush_dp_messages();
+ while (lookup(barrier_id) != NULL) {
+ delay(1);
+ get_stateless_obj()->get_platform_api()->flush_dp_messages();
+ }
}
/**
@@ -74,39 +99,33 @@ TrexDpPortEvents::disable(TrexDpPortEvent::event_e ev) {
*
*/
void
-TrexDpPortEvents::handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id) {
- m_events[ev].handle_event(thread_id, event_id);
-}
-
-/***********
- * single event object
- *
- */
+TrexDpPortEvents::on_core_reporting_in(int event_id, int thread_id) {
+ TrexDpPortEvent *event = lookup(event_id);
+ /* event might have been deleted */
+ if (!event) {
+ return;
+ }
-void
-TrexDpPortEvent::create(event_e type, TrexStatelessPort *port) {
- m_event_type = type;
- m_port = port;
+ bool done = event->on_core_reporting_in(thread_id);
- /* add the core ids to the hash */
- m_signal.clear();
- for (int core_id : m_port->get_core_id_list()) {
- m_signal[core_id] = false;
+ if (done) {
+ destroy_event(event_id);
}
-
- /* event is disabled */
- disable();
}
-/**
- * wait the event using event id and timeout
+/***************************
+ * event
*
- */
-void
-TrexDpPortEvent::wait_for_event(int event_id, int timeout_ms) {
+ **************************/
+TrexDpPortEvent::TrexDpPortEvent() {
+ m_port = NULL;
+ m_event_id = -1;
+}
- /* set a new event id */
+void
+TrexDpPortEvent::init(TrexStatelessPort *port, int event_id, int timeout_ms) {
+ m_port = port;
m_event_id = event_id;
/* do we have a timeout ? */
@@ -118,103 +137,33 @@ TrexDpPortEvent::wait_for_event(int event_id, int timeout_ms) {
/* prepare the signal array */
m_pending_cnt = 0;
- for (auto & core_pair : m_signal) {
- core_pair.second = false;
+ for (int core_id : m_port->get_core_id_list()) {
+ m_signal[core_id] = false;
m_pending_cnt++;
}
}
-void
-TrexDpPortEvent::disable() {
- m_event_id = TrexDpPortEvents::EVENT_ID_INVALID;
-}
-
-/**
- * get the event status
- *
- */
-
-TrexDpPortEvent::event_status_e
-TrexDpPortEvent::status() {
-
- /* is it even active ? */
- if (m_event_id == TrexDpPortEvents::EVENT_ID_INVALID) {
- return (EVENT_DISABLE);
- }
-
- /* did it occured ? */
- if (m_pending_cnt == 0) {
- return (EVENT_OCCURED);
- }
-
- /* so we are enabled and the event did not occur - maybe we timed out ? */
- if ( (m_expire_limit_ms > 0) && (os_get_time_msec() > m_expire_limit_ms) ) {
- return (EVENT_TIMED_OUT);
- }
-
- /* so we are still waiting... */
- return (EVENT_PENDING);
-
-}
-
-void
-TrexDpPortEvent::err(int thread_id, int event_id, const std::string &err_msg) {
- std::stringstream err;
- err << "DP event '" << event_name(m_event_type) << "' on thread id '" << thread_id << "' with key '" << event_id <<"' - ";
-}
-
-/**
- * event occured
- *
- */
-void
-TrexDpPortEvent::handle_event(int thread_id, int event_id) {
-
- /* if the event is disabled - we don't care */
- if (!is_active()) {
- return;
- }
-
- /* check the event id is matching the required event - if not maybe its an old signal */
- if (event_id != m_event_id) {
- return;
- }
-
+bool
+TrexDpPortEvent::on_core_reporting_in(int thread_id) {
/* mark sure no double signal */
if (m_signal.at(thread_id)) {
- err(thread_id, event_id, "double signal");
+ std::stringstream err;
+ err << "double signal detected on event id: " << m_event_id;
+ throw TrexException(err.str());
- } else {
- /* mark */
- m_signal.at(thread_id) = true;
- m_pending_cnt--;
}
+ /* mark */
+ m_signal.at(thread_id) = true;
+ m_pending_cnt--;
+
/* event occured */
if (m_pending_cnt == 0) {
- m_port->on_dp_event_occured(m_event_type);
- m_event_id = TrexDpPortEvents::EVENT_ID_INVALID;
+ on_event();
+ return true;
+ } else {
+ return false;
}
}
-bool
-TrexDpPortEvent::is_active() {
- return (status() != EVENT_DISABLE);
-}
-
-bool
-TrexDpPortEvent::has_timeout_expired() {
- return (status() == EVENT_TIMED_OUT);
-}
-
-const char *
-TrexDpPortEvent::event_name(event_e type) {
- switch (type) {
- case EVENT_STOP:
- return "DP STOP";
- default:
- throw TrexException("unknown event type");
- }
-
-}
diff --git a/src/stateless/cp/trex_dp_port_events.h b/src/stateless/cp/trex_dp_port_events.h
index 557e590b..3b8c8633 100644
--- a/src/stateless/cp/trex_dp_port_events.h
+++ b/src/stateless/cp/trex_dp_port_events.h
@@ -25,95 +25,43 @@ limitations under the License.
#include <string>
class TrexStatelessPort;
+class TrexDpPortEvents;
/**
- * describes a single DP event related to port
+ * interface class for DP events
*
- * @author imarom (18-Nov-15)
+ * @author imarom (29-Feb-16)
*/
class TrexDpPortEvent {
-public:
-
- enum event_e {
- EVENT_STOP = 1,
- EVENT_MAX
- };
-
- /**
- * status of the event for the port
- */
- enum event_status_e {
- EVENT_DISABLE,
- EVENT_PENDING,
- EVENT_TIMED_OUT,
- EVENT_OCCURED
- };
-
- /**
- * init for the event
- *
- */
- void create(event_e type, TrexStatelessPort *port);
-
- /**
- * create a new pending event
- *
- */
- void wait_for_event(int event_id, int timeout_ms = -1);
-
- /**
- * mark event as not allowed to happen
- *
- */
- void disable();
-
- /**
- * get the event status
- *
- */
- event_status_e status();
-
- /**
- * event occured
- *
- */
- void handle_event(int thread_id, int event_id);
+ friend TrexDpPortEvents;
- /**
- * returns true if event is active
- *
- */
- bool is_active();
-
- /**
- * has timeout already expired ?
- *
- */
- bool has_timeout_expired();
-
- /**
- * generate error
- *
- */
- void err(int thread_id, int event_id, const std::string &err_msg);
+public:
+ TrexDpPortEvent();
+ virtual ~TrexDpPortEvent() {}
+protected:
/**
- * event to name
+ * what to do when an event has been completed (all cores
+ * reported in
*
+ * @author imarom (29-Feb-16)
*/
- static const char * event_name(event_e type);
+ virtual void on_event() = 0;
+ TrexStatelessPort *get_port() {
+ return m_port;
+ }
private:
+ void init(TrexStatelessPort *port, int event_id, int timeout_ms);
+ bool on_core_reporting_in(int thread_id);
- event_e m_event_type;
std::unordered_map<int, bool> m_signal;
int m_pending_cnt;
TrexStatelessPort *m_port;
int m_event_id;
int m_expire_limit_ms;
-
};
/**
@@ -124,44 +72,39 @@ class TrexDpPortEvents {
public:
friend class TrexDpPortEvent;
- void create(TrexStatelessPort *port);
+ static const int INVALID_ID = -1;
- /**
- * generate a new event ID to be used with wait_for_event
- *
- */
- int generate_event_id();
+ TrexDpPortEvents(TrexStatelessPort *port);
/**
* wait a new DP event on the port
* returns a key which will be used to identify
* the event happened
*
- * @author imarom (18-Nov-15)
- *
- * @param ev - type of event
- * @param event_id - a unique identifier for the event
- * @param timeout_ms - does it has a timeout ?
- *
*/
- void wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms = -1);
+ int create_event(TrexDpPortEvent *event, int timeout_ms = -1);
/**
- * disable an event (don't care)
+ * destroy an event
*
*/
- void disable(TrexDpPortEvent::event_e ev);
+ void destroy_event(int event_id);
/**
- * event has occured
- *
+ * return when all DP cores have responsed on a barrier
+ */
+ void barrier();
+
+ /**
+ * a core has reached the event
*/
- void handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id);
+ void on_core_reporting_in(int event_id, int thread_id);
private:
- static const int EVENT_ID_INVALID = -1;
+ TrexDpPortEvent *lookup(int event_id);
- TrexDpPortEvent m_events[TrexDpPortEvent::EVENT_MAX];
+ static const int EVENT_ID_INVALID = -1;
+ std::unordered_map<int, TrexDpPortEvent *> m_events;
int m_event_id_counter;
TrexStatelessPort *m_port;
diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp
index a4522837..9e24802b 100644
--- a/src/stateless/cp/trex_stateless.cpp
+++ b/src/stateless/cp/trex_stateless.cpp
@@ -40,7 +40,7 @@ TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) {
/* create RPC servers */
/* set both servers to mutex each other */
- m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_rpc_async_cfg, &m_global_cp_lock);
+ m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_global_lock);
m_rpc_server->set_verbose(cfg.m_rpc_server_verbose);
/* configure ports */
diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h
index cc47da6b..6e5e0c44 100644
--- a/src/stateless/cp/trex_stateless.h
+++ b/src/stateless/cp/trex_stateless.h
@@ -92,18 +92,19 @@ public:
TrexStatelessCfg() {
m_port_count = 0;
m_rpc_req_resp_cfg = NULL;
- m_rpc_async_cfg = NULL;
m_rpc_server_verbose = false;
m_platform_api = NULL;
m_publisher = NULL;
+ m_global_lock = NULL;
}
const TrexRpcServerConfig *m_rpc_req_resp_cfg;
- const TrexRpcServerConfig *m_rpc_async_cfg;
const TrexPlatformApi *m_platform_api;
bool m_rpc_server_verbose;
uint8_t m_port_count;
TrexPublisher *m_publisher;
+ std::mutex *m_global_lock;
+
};
/**
@@ -186,7 +187,6 @@ protected:
TrexPublisher *m_publisher;
- std::mutex m_global_cp_lock;
};
/**
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index c60b0e85..257af546 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -48,11 +48,35 @@ port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &core
using namespace std;
+
+
+/***************************
+ * trex DP events handlers
+ *
+ **************************/
+class AsyncStopEvent : public TrexDpPortEvent {
+
+protected:
+ /**
+ * when an async event occurs (all cores have reported in)
+ *
+ * @author imarom (29-Feb-16)
+ */
+ virtual void on_event() {
+ get_port()->change_state(TrexStatelessPort::PORT_STATE_STREAMS);
+
+ get_port()->common_port_stop_actions(true);
+
+ assert(get_port()->m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID);
+ get_port()->m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
+ }
+};
+
/***************************
* trex stateless port
*
**************************/
-TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) {
+TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this) {
std::vector<std::pair<uint8_t, uint8_t>> core_pair_list;
m_port_id = port_id;
@@ -73,16 +97,20 @@ TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api
m_cores_id_list.push_back(core_pair.first);
}
- /* init the events DP DB */
- m_dp_events.create(this);
-
m_graph_obj = NULL;
+
+ m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
}
TrexStatelessPort::~TrexStatelessPort() {
if (m_graph_obj) {
delete m_graph_obj;
}
+
+ if (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) {
+ m_dp_events.destroy_event(m_pending_async_stop_event);
+ m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
+ }
}
/**
@@ -170,16 +198,14 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration,
}
/* generate a message to all the relevant DP cores to start transmitting */
- int event_id = m_dp_events.generate_event_id();
-
- /* mark that DP event of stoppped is possible */
- m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, event_id);
-
+ assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID);
+ m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent());
/* update object status */
m_factor = factor;
m_last_all_streams_continues = compiled_objs[0]->get_all_streams_continues();
m_last_duration = duration;
+
change_state(PORT_STATE_TX);
/* update the DP - messages will be freed by the DP */
@@ -188,17 +214,23 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration,
/* was the core assigned a compiled object ? */
if (compiled_objs[index]) {
- TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_objs[index], duration);
+ TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id,
+ m_pending_async_stop_event,
+ compiled_objs[index],
+ duration);
send_message_to_dp(core_id, start_msg);
} else {
/* mimic an end event */
- m_dp_events.handle_event(TrexDpPortEvent::EVENT_STOP, core_id, event_id);
+ m_dp_events.on_core_reporting_in(m_pending_async_stop_event, core_id);
}
index++;
}
+ /* for debug - this can be turn on */
+ //m_dp_events.barrier();
+
/* update subscribers */
Json::Value data;
data["port_id"] = m_port_id;
@@ -225,17 +257,23 @@ TrexStatelessPort::stop_traffic(void) {
/* delete any previous graphs */
delete_streams_graph();
- /* mask out the DP stop event */
- m_dp_events.disable(TrexDpPortEvent::EVENT_STOP);
+ /* to avoid race, first destroy any previous stop/pause events */
+ if (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) {
+ m_dp_events.destroy_event(m_pending_async_stop_event);
+ m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
+ }
+
/* generate a message to all the relevant DP cores to start transmitting */
TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id);
-
send_message_to_all_dp(stop_msg);
- /* continue to general actions */
+ /* a barrier - make sure all the DP cores stopped */
+ m_dp_events.barrier();
+
+ change_state(PORT_STATE_STREAMS);
+
common_port_stop_actions(false);
-
}
/**
@@ -243,14 +281,12 @@ TrexStatelessPort::stop_traffic(void) {
*
*/
void
-TrexStatelessPort::common_port_stop_actions(bool event_triggered) {
+TrexStatelessPort::common_port_stop_actions(bool async) {
- change_state(PORT_STATE_STREAMS);
-
Json::Value data;
data["port_id"] = m_port_id;
- if (event_triggered) {
+ if (async) {
get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_FINISHED_TX, data);
} else {
get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STOPPED, data);
@@ -274,17 +310,18 @@ TrexStatelessPort::pause_traffic(void) {
throw TrexException(" pause is supported when duration is not enable is start command ");
}
+ /* send a pause message */
TrexStatelessCpToDpMsgBase *pause_msg = new TrexStatelessDpPause(m_port_id);
-
send_message_to_all_dp(pause_msg);
- change_state(PORT_STATE_PAUSE);
+ /* make sure all DP cores paused */
+ m_dp_events.barrier();
- Json::Value data;
- data["port_id"] = m_port_id;
- get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_PAUSED, data);
+ /* change state */
+ change_state(PORT_STATE_PAUSE);
}
+
void
TrexStatelessPort::resume_traffic(void) {
@@ -294,7 +331,6 @@ TrexStatelessPort::resume_traffic(void) {
TrexStatelessCpToDpMsgBase *resume_msg = new TrexStatelessDpResume(m_port_id);
send_message_to_all_dp(resume_msg);
-
change_state(PORT_STATE_TX);
@@ -335,7 +371,6 @@ TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul, bool force) {
}
TrexStatelessCpToDpMsgBase *update_msg = new TrexStatelessDpUpdate(m_port_id, factor);
-
send_message_to_all_dp(update_msg);
m_factor *= factor;
@@ -439,25 +474,6 @@ TrexStatelessPort::send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBas
ring->Enqueue((CGenNode *)msg);
}
-/**
- * when a DP (async) event occurs - handle it
- *
- */
-void
-TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) {
- Json::Value data;
-
- switch (event_type) {
-
- case TrexDpPortEvent::EVENT_STOP:
- common_port_stop_actions(true);
- break;
-
- default:
- assert(0);
-
- }
-}
uint64_t
TrexStatelessPort::get_port_speed_bps() const {
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index 192d0d19..d3c4dcb9 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -102,13 +102,17 @@ private:
};
+class AsyncStopEvent;
+
/**
* describes a stateless port
*
* @author imarom (31-Aug-15)
*/
class TrexStatelessPort {
- friend class TrexDpPortEvent;
+ friend TrexDpPortEvents;
+ friend TrexDpPortEvent;
+ friend AsyncStopEvent;
public:
@@ -363,18 +367,12 @@ private:
*/
void send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg);
- /**
- * triggered when event occurs
- *
- */
- void on_dp_event_occured(TrexDpPortEvent::event_e event_type);
-
/**
* when a port stops, perform various actions
*
*/
- void common_port_stop_actions(bool event_triggered);
+ void common_port_stop_actions(bool async);
/**
* calculate effective M per core
@@ -421,6 +419,8 @@ private:
/* owner information */
TrexPortOwner m_owner;
+
+ int m_pending_async_stop_event;
};
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index 0f578b99..19eface1 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -265,9 +265,9 @@ bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
}
-bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
- bool stop_on_id,
- int event_id){
+bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
+ bool stop_on_id,
+ int event_id){
if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
@@ -829,9 +829,9 @@ TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
void
-TrexStatelessDpCore::stop_traffic(uint8_t port_id,
- bool stop_on_id,
- int event_id) {
+TrexStatelessDpCore::stop_traffic(uint8_t port_id,
+ bool stop_on_id,
+ int event_id) {
/* we cannot remove nodes not from the top of the queue so
for every active node - make sure next time
the scheduler invokes it, it will be free */
@@ -843,20 +843,19 @@ TrexStatelessDpCore::stop_traffic(uint8_t port_id,
//printf(" skip .. %f\n",m_core->m_cur_time_sec);
return;
}
-
-#if 0
- if ( are_all_ports_idle() ) {
- /* just a place holder if we will need to do somthing in that case */
- }
-#endif
/* inform the control plane we stopped - this might be a async stop
(streams ended)
- */
+ */
+ #if 0
+ if ( are_all_ports_idle() ) {
+ /* just a place holder if we will need to do somthing in that case */
+ }
+ #endif
+
CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
port_id,
- TrexDpPortEvent::EVENT_STOP,
lp_port->get_event_id());
ring->Enqueue((CGenNode *)event_msg);
@@ -872,3 +871,12 @@ TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
delete msg;
}
+void
+TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
+
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
+ TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
+ port_id,
+ event_id);
+ ring->Enqueue((CGenNode *)event_msg);
+}
diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h
index 3d214655..cb102b8d 100644
--- a/src/stateless/dp/trex_stateless_dp_core.h
+++ b/src/stateless/dp/trex_stateless_dp_core.h
@@ -235,7 +235,8 @@ public:
return (&m_ports[local_port_id]);
}
-
+ /* simply sends a message back (acts as a barrier for previous messages) */
+ void barrier(uint8_t port_id, int event_id);
private:
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index 257de168..333aec88 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -180,11 +180,29 @@ TrexStatelessDpUpdate::clone() {
return new_msg;
}
+/*************************
+ barrier message
+ ************************/
+
+bool
+TrexStatelessDpBarrier::handle(TrexStatelessDpCore *dp_core) {
+ dp_core->barrier(m_port_id, m_event_id);
+ return true;
+}
+
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpBarrier::clone() {
+
+ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpBarrier(m_port_id, m_event_id);
+
+ return new_msg;
+}
+
/************************* messages from DP to CP **********************/
bool
TrexDpPortEventMsg::handle() {
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(m_port_id);
- port->get_dp_events().handle_event(m_event_type, m_thread_id, m_event_id);
+ port->get_dp_events().on_core_reporting_in(m_event_id, m_thread_id);
return (true);
}
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index d56596bf..dda086b7 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -145,7 +145,7 @@ public:
TrexStatelessDpStop(uint8_t port_id) : m_port_id(port_id) {
m_stop_only_for_event_id=false;
- m_event_id=0;
+ m_event_id = 0;
m_core = NULL;
}
@@ -245,6 +245,26 @@ private:
double m_factor;
};
+/**
+ * barrier message for DP core
+ *
+ */
+class TrexStatelessDpBarrier : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpBarrier(uint8_t port_id, int event_id) {
+ m_port_id = port_id;
+ m_event_id = event_id;
+ }
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+private:
+ uint8_t m_port_id;
+ int m_event_id;
+};
/************************* messages from DP to CP **********************/
@@ -282,10 +302,9 @@ public:
class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase {
public:
- TrexDpPortEventMsg(int thread_id, uint8_t port_id, TrexDpPortEvent::event_e type, int event_id) {
+ TrexDpPortEventMsg(int thread_id, uint8_t port_id, int event_id) {
m_thread_id = thread_id;
m_port_id = port_id;
- m_event_type = type;
m_event_id = event_id;
}
@@ -299,10 +318,6 @@ public:
return m_port_id;
}
- TrexDpPortEvent::event_e get_event_type() {
- return m_event_type;
- }
-
int get_event_id() {
return m_event_id;
}
@@ -310,7 +325,6 @@ public:
private:
int m_thread_id;
uint8_t m_port_id;
- TrexDpPortEvent::event_e m_event_type;
int m_event_id;
};