summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2016-03-02 11:05:51 +0200
committerimarom <imarom@cisco.com>2016-03-02 13:35:09 +0200
commit59a3b58d240661a2bc06c6ede473d2eda4eb5e55 (patch)
tree37dd8d43c4bc6a0421d5964d7d1c57be3cca51a2
parent70cfb9f88b00016f1413754e5625b5b05acc2063 (diff)
TX barrier
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py9
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py28
-rw-r--r--src/gtest/trex_stateless_gtest.cpp1
-rw-r--r--src/internal_api/trex_platform_api.h5
-rw-r--r--src/main_dpdk.cpp37
-rw-r--r--src/rpc-server/trex_rpc_req_resp_server.cpp35
-rw-r--r--src/rpc-server/trex_rpc_req_resp_server.h2
-rw-r--r--src/rpc-server/trex_rpc_server.cpp32
-rw-r--r--src/rpc-server/trex_rpc_server_api.h2
-rw-r--r--src/sim/trex_sim.h1
-rw-r--r--src/sim/trex_sim_stateless.cpp1
-rw-r--r--src/stateless/cp/trex_dp_port_events.cpp217
-rw-r--r--src/stateless/cp/trex_dp_port_events.h121
-rw-r--r--src/stateless/cp/trex_stateless.cpp2
-rw-r--r--src/stateless/cp/trex_stateless.h6
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp108
-rw-r--r--src/stateless/cp/trex_stateless_port.h16
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp36
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.h3
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp20
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h30
21 files changed, 339 insertions, 373 deletions
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
index 50566853..04dd77ec 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
@@ -227,7 +227,7 @@ class AsyncEventHandler(object):
ev = "Port {0} job done".format(port_id)
# call the handler
- self.__async_event_port_stopped(port_id)
+ self.__async_event_port_job_done(port_id)
show_event = True
# port was stolen...
@@ -264,6 +264,9 @@ class AsyncEventHandler(object):
# private functions
+ def __async_event_port_job_done (self, port_id):
+ self.client.ports[port_id].async_event_port_job_done()
+
def __async_event_port_stopped (self, port_id):
self.client.ports[port_id].async_event_port_stopped()
@@ -1332,7 +1335,6 @@ class STLClient(object):
:parameters:
ports : list
ports to execute the command
-
:raises:
+ :exc:`STLError`
@@ -1354,7 +1356,6 @@ class STLClient(object):
if not rc:
raise STLError(rc)
-
"""
update traffic on port(s)
@@ -1433,8 +1434,6 @@ class STLClient(object):
if not rc:
raise STLError(rc)
-
-
"""
resume traffic on port(s)
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
index 6aa18847..4529efa9 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py
@@ -334,7 +334,7 @@ class Port(object):
"mul": mul,
"duration": duration,
"force": force}
-
+
rc = self.transmit("start_traffic", params)
if rc.bad():
return self.err(rc.err())
@@ -363,7 +363,6 @@ class Port(object):
if rc.bad():
return self.err(rc.err())
- # only valid state after stop
self.state = self.STATE_STREAMS
return self.ok()
@@ -383,7 +382,6 @@ class Port(object):
if rc.bad():
return self.err(rc.err())
- # only valid state after stop
self.state = self.STATE_PAUSE
return self.ok()
@@ -400,11 +398,12 @@ class Port(object):
params = {"handler": self.handler,
"port_id": self.port_id}
+ # only valid state after stop
+
rc = self.transmit("resume_traffic", params)
if rc.bad():
return self.err(rc.err())
- # only valid state after stop
self.state = self.STATE_TX
return self.ok()
@@ -591,21 +590,26 @@ class Port(object):
- ################# events handler ######################
- def async_event_port_stopped (self):
+ ################# events handler ######################
+ def async_event_port_job_done (self):
self.state = self.STATE_STREAMS
-
- def async_event_port_started (self):
- self.state = self.STATE_TX
-
+ # rest of the events are used for TUI / read only sessions
+ def async_event_port_stopped (self):
+ if not self.is_acquired():
+ self.state = self.STATE_STREAMS
def async_event_port_paused (self):
- self.state = self.STATE_PAUSE
+ if not self.is_acquired():
+ self.state = self.STATE_PAUSE
+ def async_event_port_started (self):
+ if not self.is_acquired():
+ self.state = self.STATE_TX
def async_event_port_resumed (self):
- self.state = self.STATE_TX
+ if not self.is_acquired():
+ self.state = self.STATE_TX
def async_event_forced_acquired (self):
self.handler = None
diff --git a/src/gtest/trex_stateless_gtest.cpp b/src/gtest/trex_stateless_gtest.cpp
index 4cc40cdb..c3dfcb95 100644
--- a/src/gtest/trex_stateless_gtest.cpp
+++ b/src/gtest/trex_stateless_gtest.cpp
@@ -3206,7 +3206,6 @@ public:
/* first the message must be an event */
TrexDpPortEventMsg *event = dynamic_cast<TrexDpPortEventMsg *>(msg);
EXPECT_TRUE(event != NULL);
- EXPECT_TRUE(event->get_event_type() == TrexDpPortEvent::EVENT_STOP);
EXPECT_TRUE(event->get_event_id() == m_event_id);
EXPECT_TRUE(event->get_port_id() == 0);
diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h
index 7f7ca218..f6d7278e 100644
--- a/src/internal_api/trex_platform_api.h
+++ b/src/internal_api/trex_platform_api.h
@@ -148,6 +148,7 @@ public:
virtual int del_rx_flow_stat_rule(uint8_t port_id, uint8_t type, uint16_t proto, uint16_t id) const = 0;
virtual void set_promiscuous(uint8_t port_id, bool enabled) const = 0;
virtual bool get_promiscuous(uint8_t port_id) const = 0;
+ virtual void flush_dp_messages() const = 0;
virtual ~TrexPlatformApi() {}
};
@@ -176,6 +177,7 @@ public:
int del_rx_flow_stat_rule(uint8_t port_id, uint8_t type, uint16_t proto, uint16_t id) const;
void set_promiscuous(uint8_t port_id, bool enabled) const;
bool get_promiscuous(uint8_t port_id) const;
+ void flush_dp_messages() const;
};
@@ -234,6 +236,9 @@ public:
return false;
}
+ void flush_dp_messages() const {
+ }
+
private:
int m_dp_core_count;
};
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index af6efe1d..3404d6be 100644
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -2599,9 +2599,9 @@ private:
/* send message to all dp cores */
int send_message_all_dp(TrexStatelessCpToDpMsgBase *msg);
void check_for_dp_message_from_core(int thread_id);
- void check_for_dp_messages();
-
+
public:
+ void check_for_dp_messages();
int start_master_statefull();
int start_master_stateless();
int run_in_core(virtual_thread_id_t virt_core_id);
@@ -2640,7 +2640,7 @@ private:
public:
- void publish_async_data();
+ void publish_async_data(bool sync_now);
void publish_async_barrier(uint32_t key);
void dump_stats(FILE *fd,
@@ -2686,9 +2686,11 @@ private:
CLatencyPktInfo m_latency_pkt;
TrexPublisher m_zmq_publisher;
CGlobalStats m_stats;
+ std::mutex m_cp_lock;
public:
TrexStateless *m_trex_stateless;
+
};
int CGlobalTRex::reset_counters(){
@@ -2738,6 +2740,7 @@ CGlobalTRex::check_for_dp_message_from_core(int thread_id) {
*/
void
CGlobalTRex::check_for_dp_messages() {
+
/* for all the cores - check for a new message */
for (int i = 0; i < get_cores_tx(); i++) {
check_for_dp_message_from_core(i);
@@ -3070,10 +3073,10 @@ bool CGlobalTRex::Create(){
cfg.m_port_count = CGlobalInfo::m_options.m_expected_portd;
cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg;
- cfg.m_rpc_async_cfg = NULL;
cfg.m_rpc_server_verbose = false;
cfg.m_platform_api = new TrexDpdkPlatformApi();
cfg.m_publisher = &m_zmq_publisher;
+ cfg.m_global_lock = &m_cp_lock;
m_trex_stateless = new TrexStateless(cfg);
}
@@ -3531,9 +3534,15 @@ void CGlobalTRex::dump_stats(FILE *fd, CGlobalStats::DumpFormat format){
void
-CGlobalTRex::publish_async_data() {
+CGlobalTRex::publish_async_data(bool sync_now) {
std::string json;
+ /* refactor to update, dump, and etc. */
+ if (sync_now) {
+ update_stats();
+ get_stats(m_stats);
+ }
+
m_stats.dump_json(json);
m_zmq_publisher.publish_json(json);
@@ -3572,7 +3581,7 @@ CGlobalTRex::publish_async_barrier(uint32_t key) {
}
int CGlobalTRex::run_in_master() {
-
+
bool was_stopped=false;
@@ -3580,6 +3589,9 @@ int CGlobalTRex::run_in_master() {
m_trex_stateless->launch_control_plane();
}
+ /* exception and scope safe */
+ std::unique_lock<std::mutex> cp_lock(m_cp_lock);
+
while ( true ) {
if ( CGlobalInfo::m_options.preview.get_no_keyboard() ==false ){
@@ -3669,18 +3681,23 @@ int CGlobalTRex::run_in_master() {
}
/* publish data */
- publish_async_data();
+ publish_async_data(false);
/* check from messages from DP */
check_for_dp_messages();
+ cp_lock.unlock();
delay(500);
+ cp_lock.lock();
if ( is_all_cores_finished() ) {
break;
}
}
+ /* on exit release the lock */
+ cp_lock.unlock();
+
if (!is_all_cores_finished()) {
/* probably CLTR-C */
try_stop_all_dp();
@@ -5177,7 +5194,7 @@ TrexDpdkPlatformApi::get_interface_info(uint8_t interface_id, intf_info_st &info
void
TrexDpdkPlatformApi::publish_async_data_now(uint32_t key) const {
- g_trex.publish_async_data();
+ g_trex.publish_async_data(true);
g_trex.publish_async_barrier(key);
}
@@ -5213,4 +5230,6 @@ bool TrexDpdkPlatformApi::get_promiscuous(uint8_t port_id) const {
return g_trex.m_ports[port_id].get_promiscuous();
}
-
+void TrexDpdkPlatformApi::flush_dp_messages() const {
+ g_trex.check_for_dp_messages();
+}
diff --git a/src/rpc-server/trex_rpc_req_resp_server.cpp b/src/rpc-server/trex_rpc_req_resp_server.cpp
index da7e8c55..5c587e0f 100644
--- a/src/rpc-server/trex_rpc_req_resp_server.cpp
+++ b/src/rpc-server/trex_rpc_req_resp_server.cpp
@@ -173,10 +173,8 @@ void TrexRpcServerReqRes::process_request_raw(const std::string &request, std::s
int index = 0;
- /* if lock was provided, take it */
- if (m_lock) {
- m_lock->lock();
- }
+ /* expcetion safe */
+ std::unique_lock<std::mutex> lock(*m_lock);
/* for every command parsed - launch it */
for (auto command : commands) {
@@ -190,9 +188,7 @@ void TrexRpcServerReqRes::process_request_raw(const std::string &request, std::s
}
/* done with the lock */
- if (m_lock) {
- m_lock->unlock();
- }
+ lock.unlock();
/* write the JSON to string and sever on ZMQ */
@@ -254,28 +250,3 @@ TrexRpcServerReqRes::test_inject_request(const std::string &req) {
return response;
}
-
-/**
- * MOCK req resp server
- */
-TrexRpcServerReqResMock::TrexRpcServerReqResMock(const TrexRpcServerConfig &cfg) : TrexRpcServerReqRes(cfg) {
-}
-
-/**
- * override start
- *
- */
-void
-TrexRpcServerReqResMock::start() {
-
-}
-
-
-/**
- * override stop
- */
-void
-TrexRpcServerReqResMock::stop() {
-
-}
-
diff --git a/src/rpc-server/trex_rpc_req_resp_server.h b/src/rpc-server/trex_rpc_req_resp_server.h
index 979bf9af..26b3248f 100644
--- a/src/rpc-server/trex_rpc_req_resp_server.h
+++ b/src/rpc-server/trex_rpc_req_resp_server.h
@@ -55,7 +55,6 @@ protected:
void *m_socket;
};
-
/**
* a mock req resp server (for tests)
*
@@ -73,5 +72,6 @@ public:
};
+
#endif /* __TREX_RPC_REQ_RESP_API_H__ */
diff --git a/src/rpc-server/trex_rpc_server.cpp b/src/rpc-server/trex_rpc_server.cpp
index 1dfc4494..7d2e31a5 100644
--- a/src/rpc-server/trex_rpc_server.cpp
+++ b/src/rpc-server/trex_rpc_server.cpp
@@ -33,6 +33,9 @@ limitations under the License.
TrexRpcServerInterface::TrexRpcServerInterface(const TrexRpcServerConfig &cfg, const std::string &name, std::mutex *lock) : m_cfg(cfg), m_name(name), m_lock(lock) {
m_is_running = false;
m_is_verbose = false;
+ if (m_lock == NULL) {
+ m_lock = &m_dummy_lock;
+ }
}
TrexRpcServerInterface::~TrexRpcServerInterface() {
@@ -117,7 +120,6 @@ get_current_date_time() {
const std::string TrexRpcServer::s_server_uptime = get_current_date_time();
TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg,
- const TrexRpcServerConfig *async_cfg,
std::mutex *lock) {
m_req_resp = NULL;
@@ -134,10 +136,6 @@ TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg,
m_servers.push_back(m_req_resp);
}
- /* add async publisher */
- if (async_cfg) {
- m_servers.push_back(new TrexRpcServerAsync(*async_cfg, lock));
- }
}
TrexRpcServer::~TrexRpcServer() {
@@ -187,3 +185,27 @@ TrexRpcServer::test_inject_request(const std::string &req_str) {
return "";
}
}
+
+/**
+ * MOCK req resp server
+ */
+TrexRpcServerReqResMock::TrexRpcServerReqResMock(const TrexRpcServerConfig &cfg) : TrexRpcServerReqRes(cfg) {
+}
+
+/**
+ * override start
+ *
+ */
+void
+TrexRpcServerReqResMock::start() {
+
+}
+
+
+/**
+ * override stop
+ */
+void
+TrexRpcServerReqResMock::stop() {
+
+}
diff --git a/src/rpc-server/trex_rpc_server_api.h b/src/rpc-server/trex_rpc_server_api.h
index 1ab5dce9..a02b2cc0 100644
--- a/src/rpc-server/trex_rpc_server_api.h
+++ b/src/rpc-server/trex_rpc_server_api.h
@@ -133,6 +133,7 @@ protected:
std::thread *m_thread;
std::string m_name;
std::mutex *m_lock;
+ std::mutex m_dummy_lock;
};
/**
@@ -147,7 +148,6 @@ public:
/* creates the collection of servers using configurations */
TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg,
- const TrexRpcServerConfig *async_cfg,
std::mutex *m_lock = NULL);
~TrexRpcServer();
diff --git a/src/sim/trex_sim.h b/src/sim/trex_sim.h
index 3a3a62ea..59184b75 100644
--- a/src/sim/trex_sim.h
+++ b/src/sim/trex_sim.h
@@ -77,7 +77,6 @@ public:
cfg.m_port_count = 2;
cfg.m_rpc_req_resp_cfg = NULL;
- cfg.m_rpc_async_cfg = NULL;
cfg.m_rpc_server_verbose = false;
cfg.m_platform_api = new SimPlatformApi(1);
cfg.m_publisher = NULL;
diff --git a/src/sim/trex_sim_stateless.cpp b/src/sim/trex_sim_stateless.cpp
index 30d60b15..87c61ae2 100644
--- a/src/sim/trex_sim_stateless.cpp
+++ b/src/sim/trex_sim_stateless.cpp
@@ -186,7 +186,6 @@ SimStateless::prepare_control_plane() {
cfg.m_port_count = m_port_count;
cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg;
- cfg.m_rpc_async_cfg = NULL;
cfg.m_rpc_server_verbose = false;
cfg.m_platform_api = new SimPlatformApi(m_dp_core_count);
cfg.m_publisher = m_publisher;
diff --git a/src/stateless/cp/trex_dp_port_events.cpp b/src/stateless/cp/trex_dp_port_events.cpp
index ba327e59..8e098adf 100644
--- a/src/stateless/cp/trex_dp_port_events.cpp
+++ b/src/stateless/cp/trex_dp_port_events.cpp
@@ -20,6 +20,7 @@ limitations under the License.
*/
#include <trex_dp_port_events.h>
+#include <trex_stateless_messaging.h>
#include <sstream>
#include <os_time.h>
#include <trex_stateless.h>
@@ -27,24 +28,20 @@ limitations under the License.
/**
* port events
*/
-void
-TrexDpPortEvents::create(TrexStatelessPort *port) {
+TrexDpPortEvents::TrexDpPortEvents(TrexStatelessPort *port) {
m_port = port;
-
- for (int i = 0; i < TrexDpPortEvent::EVENT_MAX; i++) {
- m_events[i].create((TrexDpPortEvent::event_e) i, port);
- }
-
m_event_id_counter = EVENT_ID_INVALID;
}
-/**
- * generate a new event ID
- *
- */
-int
-TrexDpPortEvents::generate_event_id() {
- return (++m_event_id_counter);
+TrexDpPortEvent *
+TrexDpPortEvents::lookup(int event_id) {
+ auto search = m_events.find(event_id);
+
+ if (search != m_events.end()) {
+ return search->second;
+ } else {
+ return NULL;
+ }
}
/**
@@ -52,21 +49,49 @@ TrexDpPortEvents::generate_event_id() {
* all other events will be disabled
*
*/
-void
-TrexDpPortEvents::wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms) {
+int
+TrexDpPortEvents::create_event(TrexDpPortEvent *event, int timeout_ms) {
+ /* allocate ID for event */
+ int event_id = ++m_event_id_counter;
- /* first disable all events */
- for (TrexDpPortEvent & e : m_events) {
- e.disable();
- }
+ /* init and add */
+ event->init(m_port, event_id, timeout_ms);
+ m_events[event_id] = event;
- /* mark this event as allowed */
- m_events[ev].wait_for_event(event_id, timeout_ms);
+ return event_id;
}
void
-TrexDpPortEvents::disable(TrexDpPortEvent::event_e ev) {
- m_events[ev].disable();
+TrexDpPortEvents::destroy_event(int event_id) {
+ TrexDpPortEvent *event = lookup(event_id);
+ if (!event) {
+ /* cannot find event */
+ throw TrexException("internal error - cannot find event");
+ }
+
+ m_events.erase(event_id);
+ delete event;
+}
+
+class DPBarrier : public TrexDpPortEvent {
+protected:
+ virtual void on_event() {
+ /* do nothing */
+ }
+};
+
+void
+TrexDpPortEvents::barrier() {
+ int barrier_id = create_event(new DPBarrier());
+
+ TrexStatelessCpToDpMsgBase *barrier_msg = new TrexStatelessDpBarrier(m_port->m_port_id, barrier_id);
+ m_port->send_message_to_all_dp(barrier_msg);
+
+ get_stateless_obj()->get_platform_api()->flush_dp_messages();
+ while (lookup(barrier_id) != NULL) {
+ delay(1);
+ get_stateless_obj()->get_platform_api()->flush_dp_messages();
+ }
}
/**
@@ -74,39 +99,33 @@ TrexDpPortEvents::disable(TrexDpPortEvent::event_e ev) {
*
*/
void
-TrexDpPortEvents::handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id) {
- m_events[ev].handle_event(thread_id, event_id);
-}
-
-/***********
- * single event object
- *
- */
+TrexDpPortEvents::on_core_reporting_in(int event_id, int thread_id) {
+ TrexDpPortEvent *event = lookup(event_id);
+ /* event might have been deleted */
+ if (!event) {
+ return;
+ }
-void
-TrexDpPortEvent::create(event_e type, TrexStatelessPort *port) {
- m_event_type = type;
- m_port = port;
+ bool done = event->on_core_reporting_in(thread_id);
- /* add the core ids to the hash */
- m_signal.clear();
- for (int core_id : m_port->get_core_id_list()) {
- m_signal[core_id] = false;
+ if (done) {
+ destroy_event(event_id);
}
-
- /* event is disabled */
- disable();
}
-/**
- * wait the event using event id and timeout
+/***************************
+ * event
*
- */
-void
-TrexDpPortEvent::wait_for_event(int event_id, int timeout_ms) {
+ **************************/
+TrexDpPortEvent::TrexDpPortEvent() {
+ m_port = NULL;
+ m_event_id = -1;
+}
- /* set a new event id */
+void
+TrexDpPortEvent::init(TrexStatelessPort *port, int event_id, int timeout_ms) {
+ m_port = port;
m_event_id = event_id;
/* do we have a timeout ? */
@@ -118,103 +137,33 @@ TrexDpPortEvent::wait_for_event(int event_id, int timeout_ms) {
/* prepare the signal array */
m_pending_cnt = 0;
- for (auto & core_pair : m_signal) {
- core_pair.second = false;
+ for (int core_id : m_port->get_core_id_list()) {
+ m_signal[core_id] = false;
m_pending_cnt++;
}
}
-void
-TrexDpPortEvent::disable() {
- m_event_id = TrexDpPortEvents::EVENT_ID_INVALID;
-}
-
-/**
- * get the event status
- *
- */
-
-TrexDpPortEvent::event_status_e
-TrexDpPortEvent::status() {
-
- /* is it even active ? */
- if (m_event_id == TrexDpPortEvents::EVENT_ID_INVALID) {
- return (EVENT_DISABLE);
- }
-
- /* did it occured ? */
- if (m_pending_cnt == 0) {
- return (EVENT_OCCURED);
- }
-
- /* so we are enabled and the event did not occur - maybe we timed out ? */
- if ( (m_expire_limit_ms > 0) && (os_get_time_msec() > m_expire_limit_ms) ) {
- return (EVENT_TIMED_OUT);
- }
-
- /* so we are still waiting... */
- return (EVENT_PENDING);
-
-}
-
-void
-TrexDpPortEvent::err(int thread_id, int event_id, const std::string &err_msg) {
- std::stringstream err;
- err << "DP event '" << event_name(m_event_type) << "' on thread id '" << thread_id << "' with key '" << event_id <<"' - ";
-}
-
-/**
- * event occured
- *
- */
-void
-TrexDpPortEvent::handle_event(int thread_id, int event_id) {
-
- /* if the event is disabled - we don't care */
- if (!is_active()) {
- return;
- }
-
- /* check the event id is matching the required event - if not maybe its an old signal */
- if (event_id != m_event_id) {
- return;
- }
-
+bool
+TrexDpPortEvent::on_core_reporting_in(int thread_id) {
/* mark sure no double signal */
if (m_signal.at(thread_id)) {
- err(thread_id, event_id, "double signal");
+ std::stringstream err;
+ err << "double signal detected on event id: " << m_event_id;
+ throw TrexException(err.str());
- } else {
- /* mark */
- m_signal.at(thread_id) = true;
- m_pending_cnt--;
}
+ /* mark */
+ m_signal.at(thread_id) = true;
+ m_pending_cnt--;
+
/* event occured */
if (m_pending_cnt == 0) {
- m_port->on_dp_event_occured(m_event_type);
- m_event_id = TrexDpPortEvents::EVENT_ID_INVALID;
+ on_event();
+ return true;
+ } else {
+ return false;
}
}
-bool
-TrexDpPortEvent::is_active() {
- return (status() != EVENT_DISABLE);
-}
-
-bool
-TrexDpPortEvent::has_timeout_expired() {
- return (status() == EVENT_TIMED_OUT);
-}
-
-const char *
-TrexDpPortEvent::event_name(event_e type) {
- switch (type) {
- case EVENT_STOP:
- return "DP STOP";
- default:
- throw TrexException("unknown event type");
- }
-
-}
diff --git a/src/stateless/cp/trex_dp_port_events.h b/src/stateless/cp/trex_dp_port_events.h
index 557e590b..3b8c8633 100644
--- a/src/stateless/cp/trex_dp_port_events.h
+++ b/src/stateless/cp/trex_dp_port_events.h
@@ -25,95 +25,43 @@ limitations under the License.
#include <string>
class TrexStatelessPort;
+class TrexDpPortEvents;
/**
- * describes a single DP event related to port
+ * interface class for DP events
*
- * @author imarom (18-Nov-15)
+ * @author imarom (29-Feb-16)
*/
class TrexDpPortEvent {
-public:
-
- enum event_e {
- EVENT_STOP = 1,
- EVENT_MAX
- };
-
- /**
- * status of the event for the port
- */
- enum event_status_e {
- EVENT_DISABLE,
- EVENT_PENDING,
- EVENT_TIMED_OUT,
- EVENT_OCCURED
- };
-
- /**
- * init for the event
- *
- */
- void create(event_e type, TrexStatelessPort *port);
-
- /**
- * create a new pending event
- *
- */
- void wait_for_event(int event_id, int timeout_ms = -1);
-
- /**
- * mark event as not allowed to happen
- *
- */
- void disable();
-
- /**
- * get the event status
- *
- */
- event_status_e status();
-
- /**
- * event occured
- *
- */
- void handle_event(int thread_id, int event_id);
+ friend TrexDpPortEvents;
- /**
- * returns true if event is active
- *
- */
- bool is_active();
-
- /**
- * has timeout already expired ?
- *
- */
- bool has_timeout_expired();
-
- /**
- * generate error
- *
- */
- void err(int thread_id, int event_id, const std::string &err_msg);
+public:
+ TrexDpPortEvent();
+ virtual ~TrexDpPortEvent() {}
+protected:
/**
- * event to name
+ * what to do when an event has been completed (all cores
+ * reported in
*
+ * @author imarom (29-Feb-16)
*/
- static const char * event_name(event_e type);
+ virtual void on_event() = 0;
+ TrexStatelessPort *get_port() {
+ return m_port;
+ }
private:
+ void init(TrexStatelessPort *port, int event_id, int timeout_ms);
+ bool on_core_reporting_in(int thread_id);
- event_e m_event_type;
std::unordered_map<int, bool> m_signal;
int m_pending_cnt;
TrexStatelessPort *m_port;
int m_event_id;
int m_expire_limit_ms;
-
};
/**
@@ -124,44 +72,39 @@ class TrexDpPortEvents {
public:
friend class TrexDpPortEvent;
- void create(TrexStatelessPort *port);
+ static const int INVALID_ID = -1;
- /**
- * generate a new event ID to be used with wait_for_event
- *
- */
- int generate_event_id();
+ TrexDpPortEvents(TrexStatelessPort *port);
/**
* wait a new DP event on the port
* returns a key which will be used to identify
* the event happened
*
- * @author imarom (18-Nov-15)
- *
- * @param ev - type of event
- * @param event_id - a unique identifier for the event
- * @param timeout_ms - does it has a timeout ?
- *
*/
- void wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms = -1);
+ int create_event(TrexDpPortEvent *event, int timeout_ms = -1);
/**
- * disable an event (don't care)
+ * destroy an event
*
*/
- void disable(TrexDpPortEvent::event_e ev);
+ void destroy_event(int event_id);
/**
- * event has occured
- *
+ * return when all DP cores have responsed on a barrier
+ */
+ void barrier();
+
+ /**
+ * a core has reached the event
*/
- void handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id);
+ void on_core_reporting_in(int event_id, int thread_id);
private:
- static const int EVENT_ID_INVALID = -1;
+ TrexDpPortEvent *lookup(int event_id);
- TrexDpPortEvent m_events[TrexDpPortEvent::EVENT_MAX];
+ static const int EVENT_ID_INVALID = -1;
+ std::unordered_map<int, TrexDpPortEvent *> m_events;
int m_event_id_counter;
TrexStatelessPort *m_port;
diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp
index a4522837..9e24802b 100644
--- a/src/stateless/cp/trex_stateless.cpp
+++ b/src/stateless/cp/trex_stateless.cpp
@@ -40,7 +40,7 @@ TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) {
/* create RPC servers */
/* set both servers to mutex each other */
- m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_rpc_async_cfg, &m_global_cp_lock);
+ m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_global_lock);
m_rpc_server->set_verbose(cfg.m_rpc_server_verbose);
/* configure ports */
diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h
index cc47da6b..6e5e0c44 100644
--- a/src/stateless/cp/trex_stateless.h
+++ b/src/stateless/cp/trex_stateless.h
@@ -92,18 +92,19 @@ public:
TrexStatelessCfg() {
m_port_count = 0;
m_rpc_req_resp_cfg = NULL;
- m_rpc_async_cfg = NULL;
m_rpc_server_verbose = false;
m_platform_api = NULL;
m_publisher = NULL;
+ m_global_lock = NULL;
}
const TrexRpcServerConfig *m_rpc_req_resp_cfg;
- const TrexRpcServerConfig *m_rpc_async_cfg;
const TrexPlatformApi *m_platform_api;
bool m_rpc_server_verbose;
uint8_t m_port_count;
TrexPublisher *m_publisher;
+ std::mutex *m_global_lock;
+
};
/**
@@ -186,7 +187,6 @@ protected:
TrexPublisher *m_publisher;
- std::mutex m_global_cp_lock;
};
/**
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index c60b0e85..257af546 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -48,11 +48,35 @@ port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &core
using namespace std;
+
+
+/***************************
+ * trex DP events handlers
+ *
+ **************************/
+class AsyncStopEvent : public TrexDpPortEvent {
+
+protected:
+ /**
+ * when an async event occurs (all cores have reported in)
+ *
+ * @author imarom (29-Feb-16)
+ */
+ virtual void on_event() {
+ get_port()->change_state(TrexStatelessPort::PORT_STATE_STREAMS);
+
+ get_port()->common_port_stop_actions(true);
+
+ assert(get_port()->m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID);
+ get_port()->m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
+ }
+};
+
/***************************
* trex stateless port
*
**************************/
-TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) {
+TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this) {
std::vector<std::pair<uint8_t, uint8_t>> core_pair_list;
m_port_id = port_id;
@@ -73,16 +97,20 @@ TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api
m_cores_id_list.push_back(core_pair.first);
}
- /* init the events DP DB */
- m_dp_events.create(this);
-
m_graph_obj = NULL;
+
+ m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
}
TrexStatelessPort::~TrexStatelessPort() {
if (m_graph_obj) {
delete m_graph_obj;
}
+
+ if (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) {
+ m_dp_events.destroy_event(m_pending_async_stop_event);
+ m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
+ }
}
/**
@@ -170,16 +198,14 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration,
}
/* generate a message to all the relevant DP cores to start transmitting */
- int event_id = m_dp_events.generate_event_id();
-
- /* mark that DP event of stoppped is possible */
- m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, event_id);
-
+ assert(m_pending_async_stop_event == TrexDpPortEvents::INVALID_ID);
+ m_pending_async_stop_event = m_dp_events.create_event(new AsyncStopEvent());
/* update object status */
m_factor = factor;
m_last_all_streams_continues = compiled_objs[0]->get_all_streams_continues();
m_last_duration = duration;
+
change_state(PORT_STATE_TX);
/* update the DP - messages will be freed by the DP */
@@ -188,17 +214,23 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration,
/* was the core assigned a compiled object ? */
if (compiled_objs[index]) {
- TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_objs[index], duration);
+ TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id,
+ m_pending_async_stop_event,
+ compiled_objs[index],
+ duration);
send_message_to_dp(core_id, start_msg);
} else {
/* mimic an end event */
- m_dp_events.handle_event(TrexDpPortEvent::EVENT_STOP, core_id, event_id);
+ m_dp_events.on_core_reporting_in(m_pending_async_stop_event, core_id);
}
index++;
}
+ /* for debug - this can be turn on */
+ //m_dp_events.barrier();
+
/* update subscribers */
Json::Value data;
data["port_id"] = m_port_id;
@@ -225,17 +257,23 @@ TrexStatelessPort::stop_traffic(void) {
/* delete any previous graphs */
delete_streams_graph();
- /* mask out the DP stop event */
- m_dp_events.disable(TrexDpPortEvent::EVENT_STOP);
+ /* to avoid race, first destroy any previous stop/pause events */
+ if (m_pending_async_stop_event != TrexDpPortEvents::INVALID_ID) {
+ m_dp_events.destroy_event(m_pending_async_stop_event);
+ m_pending_async_stop_event = TrexDpPortEvents::INVALID_ID;
+ }
+
/* generate a message to all the relevant DP cores to start transmitting */
TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id);
-
send_message_to_all_dp(stop_msg);
- /* continue to general actions */
+ /* a barrier - make sure all the DP cores stopped */
+ m_dp_events.barrier();
+
+ change_state(PORT_STATE_STREAMS);
+
common_port_stop_actions(false);
-
}
/**
@@ -243,14 +281,12 @@ TrexStatelessPort::stop_traffic(void) {
*
*/
void
-TrexStatelessPort::common_port_stop_actions(bool event_triggered) {
+TrexStatelessPort::common_port_stop_actions(bool async) {
- change_state(PORT_STATE_STREAMS);
-
Json::Value data;
data["port_id"] = m_port_id;
- if (event_triggered) {
+ if (async) {
get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_FINISHED_TX, data);
} else {
get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STOPPED, data);
@@ -274,17 +310,18 @@ TrexStatelessPort::pause_traffic(void) {
throw TrexException(" pause is supported when duration is not enable is start command ");
}
+ /* send a pause message */
TrexStatelessCpToDpMsgBase *pause_msg = new TrexStatelessDpPause(m_port_id);
-
send_message_to_all_dp(pause_msg);
- change_state(PORT_STATE_PAUSE);
+ /* make sure all DP cores paused */
+ m_dp_events.barrier();
- Json::Value data;
- data["port_id"] = m_port_id;
- get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_PAUSED, data);
+ /* change state */
+ change_state(PORT_STATE_PAUSE);
}
+
void
TrexStatelessPort::resume_traffic(void) {
@@ -294,7 +331,6 @@ TrexStatelessPort::resume_traffic(void) {
TrexStatelessCpToDpMsgBase *resume_msg = new TrexStatelessDpResume(m_port_id);
send_message_to_all_dp(resume_msg);
-
change_state(PORT_STATE_TX);
@@ -335,7 +371,6 @@ TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul, bool force) {
}
TrexStatelessCpToDpMsgBase *update_msg = new TrexStatelessDpUpdate(m_port_id, factor);
-
send_message_to_all_dp(update_msg);
m_factor *= factor;
@@ -439,25 +474,6 @@ TrexStatelessPort::send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBas
ring->Enqueue((CGenNode *)msg);
}
-/**
- * when a DP (async) event occurs - handle it
- *
- */
-void
-TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) {
- Json::Value data;
-
- switch (event_type) {
-
- case TrexDpPortEvent::EVENT_STOP:
- common_port_stop_actions(true);
- break;
-
- default:
- assert(0);
-
- }
-}
uint64_t
TrexStatelessPort::get_port_speed_bps() const {
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index 192d0d19..d3c4dcb9 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -102,13 +102,17 @@ private:
};
+class AsyncStopEvent;
+
/**
* describes a stateless port
*
* @author imarom (31-Aug-15)
*/
class TrexStatelessPort {
- friend class TrexDpPortEvent;
+ friend TrexDpPortEvents;
+ friend TrexDpPortEvent;
+ friend AsyncStopEvent;
public:
@@ -363,18 +367,12 @@ private:
*/
void send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg);
- /**
- * triggered when event occurs
- *
- */
- void on_dp_event_occured(TrexDpPortEvent::event_e event_type);
-
/**
* when a port stops, perform various actions
*
*/
- void common_port_stop_actions(bool event_triggered);
+ void common_port_stop_actions(bool async);
/**
* calculate effective M per core
@@ -421,6 +419,8 @@ private:
/* owner information */
TrexPortOwner m_owner;
+
+ int m_pending_async_stop_event;
};
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index 0f578b99..19eface1 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -265,9 +265,9 @@ bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
}
-bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
- bool stop_on_id,
- int event_id){
+bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
+ bool stop_on_id,
+ int event_id){
if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
@@ -829,9 +829,9 @@ TrexStatelessDpCore::update_traffic(uint8_t port_id, double factor) {
void
-TrexStatelessDpCore::stop_traffic(uint8_t port_id,
- bool stop_on_id,
- int event_id) {
+TrexStatelessDpCore::stop_traffic(uint8_t port_id,
+ bool stop_on_id,
+ int event_id) {
/* we cannot remove nodes not from the top of the queue so
for every active node - make sure next time
the scheduler invokes it, it will be free */
@@ -843,20 +843,19 @@ TrexStatelessDpCore::stop_traffic(uint8_t port_id,
//printf(" skip .. %f\n",m_core->m_cur_time_sec);
return;
}
-
-#if 0
- if ( are_all_ports_idle() ) {
- /* just a place holder if we will need to do somthing in that case */
- }
-#endif
/* inform the control plane we stopped - this might be a async stop
(streams ended)
- */
+ */
+ #if 0
+ if ( are_all_ports_idle() ) {
+ /* just a place holder if we will need to do somthing in that case */
+ }
+ #endif
+
CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
port_id,
- TrexDpPortEvent::EVENT_STOP,
lp_port->get_event_id());
ring->Enqueue((CGenNode *)event_msg);
@@ -872,3 +871,12 @@ TrexStatelessDpCore::handle_cp_msg(TrexStatelessCpToDpMsgBase *msg) {
delete msg;
}
+void
+TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) {
+
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id);
+ TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id,
+ port_id,
+ event_id);
+ ring->Enqueue((CGenNode *)event_msg);
+}
diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h
index 3d214655..cb102b8d 100644
--- a/src/stateless/dp/trex_stateless_dp_core.h
+++ b/src/stateless/dp/trex_stateless_dp_core.h
@@ -235,7 +235,8 @@ public:
return (&m_ports[local_port_id]);
}
-
+ /* simply sends a message back (acts as a barrier for previous messages) */
+ void barrier(uint8_t port_id, int event_id);
private:
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index 257de168..333aec88 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -180,11 +180,29 @@ TrexStatelessDpUpdate::clone() {
return new_msg;
}
+/*************************
+ barrier message
+ ************************/
+
+bool
+TrexStatelessDpBarrier::handle(TrexStatelessDpCore *dp_core) {
+ dp_core->barrier(m_port_id, m_event_id);
+ return true;
+}
+
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpBarrier::clone() {
+
+ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpBarrier(m_port_id, m_event_id);
+
+ return new_msg;
+}
+
/************************* messages from DP to CP **********************/
bool
TrexDpPortEventMsg::handle() {
TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(m_port_id);
- port->get_dp_events().handle_event(m_event_type, m_thread_id, m_event_id);
+ port->get_dp_events().on_core_reporting_in(m_event_id, m_thread_id);
return (true);
}
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index d56596bf..dda086b7 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -145,7 +145,7 @@ public:
TrexStatelessDpStop(uint8_t port_id) : m_port_id(port_id) {
m_stop_only_for_event_id=false;
- m_event_id=0;
+ m_event_id = 0;
m_core = NULL;
}
@@ -245,6 +245,26 @@ private:
double m_factor;
};
+/**
+ * barrier message for DP core
+ *
+ */
+class TrexStatelessDpBarrier : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpBarrier(uint8_t port_id, int event_id) {
+ m_port_id = port_id;
+ m_event_id = event_id;
+ }
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+private:
+ uint8_t m_port_id;
+ int m_event_id;
+};
/************************* messages from DP to CP **********************/
@@ -282,10 +302,9 @@ public:
class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase {
public:
- TrexDpPortEventMsg(int thread_id, uint8_t port_id, TrexDpPortEvent::event_e type, int event_id) {
+ TrexDpPortEventMsg(int thread_id, uint8_t port_id, int event_id) {
m_thread_id = thread_id;
m_port_id = port_id;
- m_event_type = type;
m_event_id = event_id;
}
@@ -299,10 +318,6 @@ public:
return m_port_id;
}
- TrexDpPortEvent::event_e get_event_type() {
- return m_event_type;
- }
-
int get_event_id() {
return m_event_id;
}
@@ -310,7 +325,6 @@ public:
private:
int m_thread_id;
uint8_t m_port_id;
- TrexDpPortEvent::event_e m_event_type;
int m_event_id;
};