summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2015-11-24 11:54:58 +0200
committerimarom <imarom@cisco.com>2015-11-24 11:54:58 +0200
commitac6f8e7c7c4e957a31c19c1fe4a0997009c6e733 (patch)
tree721101231fb692fed6028b21628cb861f3c64019 /src
parent161a85c57c3d2a165e4fa94140d67db05714a7d3 (diff)
parent1e1c11059b7f7fcb5c160dffe2df832a8adf719d (diff)
Merge branch 'master' of csi-sceasr-b45:/auto/proj-pcube-b/apps/PL-b/tools/repo//trex-core
Diffstat (limited to 'src')
-rwxr-xr-xsrc/bp_sim.cpp5
-rwxr-xr-xsrc/bp_sim.h1
-rw-r--r--src/gtest/trex_stateless_gtest.cpp326
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp25
-rw-r--r--src/stateless/cp/trex_stateless_port.h5
-rw-r--r--src/stateless/cp/trex_streams_compiler.cpp7
-rw-r--r--src/stateless/cp/trex_streams_compiler.h5
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp109
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.h31
-rw-r--r--src/stateless/dp/trex_stream_node.h20
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp50
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h77
12 files changed, 618 insertions, 43 deletions
diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp
index f8dd20a1..a61fbb8f 100755
--- a/src/bp_sim.cpp
+++ b/src/bp_sim.cpp
@@ -3186,6 +3186,7 @@ bool CFlowGenListPerThread::Create(uint32_t thread_id,
uint32_t max_threads){
+ m_non_active_nodes = 0;
m_terminated_by_master=false;
m_flow_list =flow_list;
m_core_id= core_id;
@@ -3655,8 +3656,8 @@ CNodeGenerator::handle_slow_messages(uint8_t type,
thread->check_msgs(); /* check messages */
m_v_if->flush_tx_queue(); /* flush pkt each timeout */
- /* on always (clean queue path) and queue empty - exit */
- if ( always && (m_p_queue.empty()) ) {
+ /* exit in case this is the last node*/
+ if ( m_p_queue.size() == m_parent->m_non_active_nodes ) {
thread->free_node(node);
exit_scheduler = true;
} else {
diff --git a/src/bp_sim.h b/src/bp_sim.h
index fcca2428..0da7fb99 100755
--- a/src/bp_sim.h
+++ b/src/bp_sim.h
@@ -3541,6 +3541,7 @@ public:
CNodeGenerator m_node_gen;
public:
uint32_t m_cur_template;
+ uint32_t m_non_active_nodes; /* the number of non active nodes -> nodes that try to stop somthing */
uint64_t m_cur_flow_id;
double m_cur_time_sec;
double m_stop_time_sec;
diff --git a/src/gtest/trex_stateless_gtest.cpp b/src/gtest/trex_stateless_gtest.cpp
index 72a99e93..5b298023 100644
--- a/src/gtest/trex_stateless_gtest.cpp
+++ b/src/gtest/trex_stateless_gtest.cpp
@@ -77,10 +77,32 @@ public:
void add_msg(TrexStatelessCpToDpMsgBase * msg){
m_msgs.push_back(msg);
}
+
void add_command(CBasicStlDelayCommand & command){
m_commands.push_back(command);
}
+ /* only if both port are idle we can exit */
+ void add_command(CFlowGenListPerThread * core,
+ TrexStatelessCpToDpMsgBase * msg,
+ double time){
+
+ CGenNodeCommand *node = (CGenNodeCommand *)core->create_node() ;
+
+ node->m_type = CGenNode::COMMAND;
+
+ node->m_cmd = msg;
+
+ /* make sure it will be scheduled after the current node */
+ node->m_time = time ;
+
+ CBasicStlDelayCommand command;
+ command.m_node =node;
+
+ add_command(command);
+ }
+
+
void clear(){
m_msgs.clear();
m_commands.clear();
@@ -94,6 +116,20 @@ protected:
};
+
+class CBasicStlSink {
+
+public:
+ CBasicStlSink(){
+ m_core=0;
+ }
+ virtual void call_after_init(CBasicStl * m_obj)=0;
+ virtual void call_after_run(CBasicStl * m_obj)=0;
+
+ CFlowGenListPerThread * m_core;
+};
+
+
/**
* handler for DP to CP messages
*
@@ -115,6 +151,7 @@ public:
m_dump_json=false;
m_dp_to_cp_handler = NULL;
m_msg = NULL;
+ m_sink = NULL;
}
@@ -161,6 +198,10 @@ public:
lpt=fl.m_threads_info[0];
+ if ( m_sink ){
+ m_sink->m_core =lpt;
+ }
+
char buf[100];
char buf_ex[100];
sprintf(buf,"%s-%d.erf",CGlobalInfo::m_options.out_file.c_str(),0);
@@ -178,6 +219,10 @@ public:
}
}
+ if (m_sink) {
+ m_sink->call_after_init(this);
+ }
+
/* add the commands */
if (m_msg_queue.m_commands.size()>0) {
for (auto cmd : m_msg_queue.m_commands) {
@@ -203,6 +248,10 @@ public:
printf(" %s \n",s.c_str());
}
+ if (m_sink) {
+ m_sink->call_after_run(this);
+ }
+
flush_dp_to_cp_messages();
m_msg_queue.clear();
@@ -216,6 +265,7 @@ public:
double m_time_diff;
bool m_dump_json;
DpToCpHandler *m_dp_to_cp_handler;
+ CBasicStlSink * m_sink;
TrexStatelessCpToDpMsgBase * m_msg;
CNodeRing *m_ring_from_cp;
@@ -329,6 +379,282 @@ TEST_F(basic_stl, load_pcap_file) {
+class CBBStartPause0: public CBasicStlSink {
+public:
+
+ virtual void call_after_init(CBasicStl * m_obj);
+ virtual void call_after_run(CBasicStl * m_obj){
+ };
+ uint8_t m_port_id;
+};
+
+
+
+void CBBStartPause0::call_after_init(CBasicStl * m_obj){
+
+ TrexStatelessDpPause * lpPauseCmd = new TrexStatelessDpPause(m_port_id);
+ TrexStatelessDpResume * lpResumeCmd1 = new TrexStatelessDpResume(m_port_id);
+
+ m_obj->m_msg_queue.add_command(m_core,lpPauseCmd, 5.0); /* command in delay of 5 sec */
+ m_obj->m_msg_queue.add_command(m_core,lpResumeCmd1, 7.0);/* command in delay of 7 sec */
+
+}
+
+
+
+/* start/stop/stop back to back */
+TEST_F(basic_stl, basic_pause_resume0) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/stl_basic_pause_resume0";
+
+ TrexStreamsCompiler compile;
+
+ uint8_t port_id=0;
+
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0);
+ stream1->set_pps(1.0);
+
+
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_port_id= port_id;
+
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+ // stream - clean
+
+ TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
+
+ assert(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ );
+
+ t1.m_msg_queue.add_msg(lpStartCmd);
+
+
+ CBBStartPause0 sink;
+ sink.m_port_id = port_id;
+ t1.m_sink = &sink;
+
+ bool res=t1.init();
+
+ delete stream1 ;
+
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+
+//////////////////////////////////////////////////////////////
+
+
+class CBBStartStopDelay2: public CBasicStlSink {
+public:
+
+ virtual void call_after_init(CBasicStl * m_obj);
+ virtual void call_after_run(CBasicStl * m_obj){
+ };
+ uint8_t m_port_id;
+};
+
+
+
+void CBBStartStopDelay2::call_after_init(CBasicStl * m_obj){
+
+ TrexStatelessDpStop * lpStopCmd = new TrexStatelessDpStop(m_port_id);
+ TrexStatelessDpStop * lpStopCmd1 = new TrexStatelessDpStop(m_port_id);
+
+
+ TrexStreamsCompiler compile;
+
+ uint8_t port_id=0;
+
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0);
+ stream1->set_pps(1.0);
+
+
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_port_id= port_id;
+
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000002);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+ // stream - clean
+
+ TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
+
+ assert(compile.compile(streams, comp_obj) );
+
+
+ /* start with different event id */
+ TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(m_port_id, 1, comp_obj.clone(), 10.0 /*sec */ );
+
+
+ m_obj->m_msg_queue.add_command(m_core,lpStopCmd, 5.0); /* command in delay of 5 sec */
+ m_obj->m_msg_queue.add_command(m_core,lpStopCmd1, 7.0);/* command in delay of 7 sec */
+ m_obj->m_msg_queue.add_command(m_core,lpStartCmd, 7.5);/* command in delay of 7 sec */
+
+ delete stream1 ;
+
+
+}
+
+
+
+/* start/stop/stop back to back */
+TEST_F(basic_stl, single_pkt_bb_start_stop_delay2) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/stl_bb_start_stop_delay2";
+
+ TrexStreamsCompiler compile;
+
+ uint8_t port_id=0;
+
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0);
+ stream1->set_pps(1.0);
+
+
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_port_id= port_id;
+
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+ // stream - clean
+
+ TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
+
+ assert(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ );
+
+ t1.m_msg_queue.add_msg(lpStartCmd);
+
+
+ CBBStartStopDelay2 sink;
+ sink.m_port_id = port_id;
+ t1.m_sink = &sink;
+
+ bool res=t1.init();
+
+ delete stream1 ;
+
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+
+
+
+
+
+
+class CBBStartStopDelay1: public CBasicStlSink {
+public:
+
+ virtual void call_after_init(CBasicStl * m_obj);
+ virtual void call_after_run(CBasicStl * m_obj){
+ };
+ uint8_t m_port_id;
+};
+
+
+
+void CBBStartStopDelay1::call_after_init(CBasicStl * m_obj){
+
+ TrexStatelessDpStop * lpStopCmd = new TrexStatelessDpStop(m_port_id);
+ TrexStatelessDpStop * lpStopCmd1 = new TrexStatelessDpStop(m_port_id);
+
+ m_obj->m_msg_queue.add_command(m_core,lpStopCmd, 5.0); /* command in delay of 5 sec */
+ m_obj->m_msg_queue.add_command(m_core,lpStopCmd1, 7.0);/* command in delay of 7 sec */
+}
+
+
+
+/* start/stop/stop back to back */
+TEST_F(basic_stl, single_pkt_bb_start_stop_delay1) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/stl_bb_start_stop_delay1";
+
+ TrexStreamsCompiler compile;
+
+ uint8_t port_id=0;
+
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0);
+ stream1->set_pps(1.0);
+
+
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_port_id= port_id;
+
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+ // stream - clean
+
+ TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
+
+ assert(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ );
+
+ t1.m_msg_queue.add_msg(lpStartCmd);
+
+
+ CBBStartStopDelay1 sink;
+ sink.m_port_id = port_id;
+ t1.m_sink = &sink;
+
+ bool res=t1.init();
+
+ delete stream1 ;
+
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+
/* start/stop/stop back to back */
TEST_F(basic_stl, single_pkt_bb_start_stop3) {
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index 7dc217a3..be1bea12 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -130,6 +130,9 @@ TrexStatelessPort::start_traffic(double mul, double duration) {
TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_obj, duration);
+ m_last_all_streams_continues = compiled_obj->get_all_streams_continues();
+ m_last_duration =duration;
+
change_state(PORT_STATE_TX);
send_message_to_dp(start_msg);
@@ -174,7 +177,8 @@ TrexStatelessPort::start_traffic_max_pps(double max_pps, double duration) {
void
TrexStatelessPort::stop_traffic(void) {
- if (m_port_state != PORT_STATE_TX) {
+ if (!( (m_port_state == PORT_STATE_TX)
+ || (m_port_state ==PORT_STATE_PAUSE) )) {
return;
}
@@ -195,14 +199,18 @@ TrexStatelessPort::pause_traffic(void) {
verify_state(PORT_STATE_TX);
- #if 0
- /* generate a message to all the relevant DP cores to start transmitting */
- TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id);
+ if (m_last_all_streams_continues == false) {
+ throw TrexRpcException(" pause is supported when all streams are in continues mode ");
+ }
+
+ if ( m_last_duration>0.0 ) {
+ throw TrexRpcException(" pause is supported when duration is not enable is start command ");
+ }
+
+ TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpPause(m_port_id);
send_message_to_dp(stop_msg);
- m_port_state = PORT_STATE_UP_IDLE;
- #endif
change_state(PORT_STATE_PAUSE);
}
@@ -211,14 +219,11 @@ TrexStatelessPort::resume_traffic(void) {
verify_state(PORT_STATE_PAUSE);
- #if 0
/* generate a message to all the relevant DP cores to start transmitting */
- TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id);
+ TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpResume(m_port_id);
send_message_to_dp(stop_msg);
- m_port_state = PORT_STATE_UP_IDLE;
- #endif
change_state(PORT_STATE_TX);
}
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index 9b74741c..6adb5fef 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -209,6 +209,8 @@ public:
return m_dp_events;
}
+
+
private:
@@ -262,6 +264,9 @@ private:
/* holds the DP cores associated with this port */
std::vector<int> m_cores_id_list;
+ bool m_last_all_streams_continues;
+ double m_last_duration;
+
TrexDpPortEvents m_dp_events;
};
diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp
index f97b15b9..b28989be 100644
--- a/src/stateless/cp/trex_streams_compiler.cpp
+++ b/src/stateless/cp/trex_streams_compiler.cpp
@@ -143,6 +143,7 @@ private:
* stream compiled object
*************************************/
TrexStreamsCompiledObj::TrexStreamsCompiledObj(uint8_t port_id, double mul) : m_port_id(port_id), m_mul(mul) {
+ m_all_continues=false;
}
TrexStreamsCompiledObj::~TrexStreamsCompiledObj() {
@@ -408,6 +409,7 @@ TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams,
}
+ bool all_continues=true;
/* for now we do something trivial, */
for (auto stream : streams) {
@@ -415,6 +417,9 @@ TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams,
if (!stream->m_enabled) {
continue;
}
+ if (stream->get_type() != TrexStream::stCONTINUOUS ) {
+ all_continues=false;
+ }
int new_id= nodes.get(stream->m_stream_id)->m_compressed_stream_id;
assert(new_id>=0);
@@ -431,7 +436,7 @@ TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams,
);
}
-
+ obj.m_all_continues =all_continues;
return true;
}
diff --git a/src/stateless/cp/trex_streams_compiler.h b/src/stateless/cp/trex_streams_compiler.h
index 9f0c1f8e..70a31c5e 100644
--- a/src/stateless/cp/trex_streams_compiler.h
+++ b/src/stateless/cp/trex_streams_compiler.h
@@ -66,6 +66,10 @@ public:
return (m_mul);
}
+ bool get_all_streams_continues(){
+ return (m_all_continues);
+ }
+
void Dump(FILE *fd);
private:
@@ -75,6 +79,7 @@ private:
std::vector<obj_st> m_objs;
+ bool m_all_continues;
uint8_t m_port_id;
double m_mul;
};
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index 6430e520..9b4a6ad9 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -78,9 +78,10 @@ void CGenNodeStateless::refresh(){
}
-
void CGenNodeCommand::free_command(){
+
assert(m_cmd);
+ m_cmd->on_node_remove();
delete m_cmd;
}
@@ -123,15 +124,56 @@ bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
return (false);
}
+bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){
+
+ /* we are working with continues streams so we must be in transmit mode */
+ assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE);
+
+ for (auto dp_stream : m_active_nodes) {
+ CGenNodeStateless * node =dp_stream.m_node;
+ assert(node->get_port_id() == port_id);
+ assert(node->is_pause() == true);
+ node->set_pause(false);
+ }
+ m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
+ return (true);
+}
+
+
+bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){
+
+ /* we are working with continues streams so we must be in transmit mode */
+ assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
+
+ for (auto dp_stream : m_active_nodes) {
+ CGenNodeStateless * node =dp_stream.m_node;
+ assert(node->get_port_id() == port_id);
+ assert(node->is_pause() == false);
+ node->set_pause(true);
+ }
+ m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE;
+ return (true);
+}
+
+
+bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
+ bool stop_on_id,
+ int event_id){
-bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id){
- /* there could be race of stop after stop */
if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
assert(m_active_streams==0);
return false;
}
+ /* there could be race of stop after stop */
+ if ( stop_on_id ) {
+ if (event_id != m_event_id){
+ /* we can't stop it is an old message */
+ return false;
+ }
+ }
+
for (auto dp_stream : m_active_nodes) {
CGenNodeStateless * node =dp_stream.m_node;
assert(node->get_port_id() == port_id);
@@ -215,7 +257,7 @@ bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
if ( to_stop_port ) {
/* call stop port explictly to move the state */
- stop_traffic(cur_node->m_port_id);
+ stop_traffic(cur_node->m_port_id,false,0);
}
return ( schedule );
@@ -284,6 +326,8 @@ TrexStatelessDpCore::run_once(){
}
+
+
void
TrexStatelessDpCore::start() {
@@ -330,7 +374,8 @@ TrexStatelessDpCore::add_global_duration(double duration){
/* add per port exit */
void
TrexStatelessDpCore::add_port_duration(double duration,
- uint8_t port_id){
+ uint8_t port_id,
+ int event_id){
if (duration > 0.0) {
CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
@@ -339,7 +384,16 @@ TrexStatelessDpCore::add_port_duration(double duration,
/* make sure it will be scheduled after the current node */
node->m_time = m_core->m_cur_time_sec + duration ;
- node->m_cmd = new TrexStatelessDpStop(port_id);
+ TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
+
+
+ /* test this */
+ m_core->m_non_active_nodes++;
+ cmd->set_core_ptr(m_core);
+ cmd->set_event_id(event_id);
+ cmd->set_wait_for_event_id(true);
+
+ node->m_cmd = cmd;
m_core->m_node_gen.add_node((CGenNode *)node);
}
@@ -382,6 +436,7 @@ TrexStatelessDpCore::add_cont_stream(TrexStatelessDpPerPort * lp_port,
uint16_t pkt_size = stream->m_pkt.len;
const uint8_t *stream_pkt = stream->m_pkt.binary;
+ node->m_pause =0;
node->m_stream_type = stream->m_type;
node->m_next_time_offset = 1.0 / (stream->get_pps() * comp->get_multiplier()) ;
@@ -450,15 +505,14 @@ TrexStatelessDpCore::add_cont_stream(TrexStatelessDpPerPort * lp_port,
void
TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
- double duration) {
+ double duration,
+ int event_id) {
-#if 0
- /* TBD to remove ! */
- obj->Dump(stdout);
-#endif
TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
lp_port->m_active_streams = 0;
+ lp_port->set_event_id(event_id);
+
/* no nodes in the list */
assert(lp_port->m_active_nodes.size()==0);
@@ -491,7 +545,7 @@ TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
if ( duration > 0.0 ){
- add_port_duration( duration ,obj->get_port_id() );
+ add_port_duration( duration ,obj->get_port_id(),event_id );
}
}
@@ -510,24 +564,45 @@ bool TrexStatelessDpCore::are_all_ports_idle(){
}
+void
+TrexStatelessDpCore::resume_traffic(uint8_t port_id){
+
+ TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
+
+ lp_port->resume_traffic(port_id);
+}
+
+
+void
+TrexStatelessDpCore::pause_traffic(uint8_t port_id){
+
+ TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
+
+ lp_port->pause_traffic(port_id);
+}
+
+
void
-TrexStatelessDpCore::stop_traffic(uint8_t port_id) {
+TrexStatelessDpCore::stop_traffic(uint8_t port_id,
+ bool stop_on_id,
+ int event_id) {
/* we cannot remove nodes not from the top of the queue so
for every active node - make sure next time
the scheduler invokes it, it will be free */
TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
- if ( lp_port->stop_traffic(port_id) == false){
+ if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
/* nothing to do ! already stopped */
+ //printf(" skip .. %f\n",m_core->m_cur_time_sec);
return;
}
-
+#if 0
if ( are_all_ports_idle() ) {
-
- schedule_exit();
+ /* just a place holder if we will need to do somthing in that case */
}
+#endif
/* inform the control plane we stopped - this might be a async stop
(streams ended)
diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h
index 326bbe30..eda1ae59 100644
--- a/src/stateless/dp/trex_stateless_dp_core.h
+++ b/src/stateless/dp/trex_stateless_dp_core.h
@@ -53,7 +53,9 @@ public:
/* states */
enum state_e {
ppSTATE_IDLE,
- ppSTATE_TRANSMITTING
+ ppSTATE_TRANSMITTING,
+ ppSTATE_PAUSE
+
};
public:
@@ -62,7 +64,13 @@ public:
void create(CFlowGenListPerThread * core);
- bool stop_traffic(uint8_t port_id);
+ bool pause_traffic(uint8_t port_id);
+
+ bool resume_traffic(uint8_t port_id);
+
+ bool stop_traffic(uint8_t port_id,
+ bool stop_on_id,
+ int event_id);
bool update_number_of_active_streams(uint32_t d);
@@ -137,13 +145,25 @@ public:
* @param pkt
* @param pkt_len
*/
- void start_traffic(TrexStreamsCompiledObj *obj, double duration = -1);
+ void start_traffic(TrexStreamsCompiledObj *obj,
+ double duration,
+ int m_event_id);
+
+
+ /* pause the streams, work only if all are continues */
+ void pause_traffic(uint8_t port_id);
+
+
+
+ void resume_traffic(uint8_t port_id);
+
/**
+ *
* stop all traffic for this core
*
*/
- void stop_traffic(uint8_t port_id);
+ void stop_traffic(uint8_t port_id,bool stop_on_id, int event_id);
/* return if all ports are idel */
@@ -225,7 +245,8 @@ private:
void add_port_duration(double duration,
- uint8_t port_id);
+ uint8_t port_id,
+ int event_id);
void add_global_duration(double duration);
diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h
index 1e53887b..ccf99eaa 100644
--- a/src/stateless/dp/trex_stream_node.h
+++ b/src/stateless/dp/trex_stream_node.h
@@ -29,6 +29,7 @@ class TrexStatelessDpCore;
#include <trex_stream.h>
class TrexStatelessCpToDpMsgBase;
+class CFlowGenListPerThread;
struct CGenNodeCommand : public CGenNodeBase {
@@ -47,6 +48,7 @@ public:
static_assert(sizeof(CGenNodeCommand) == sizeof(CGenNode), "sizeof(CGenNodeCommand) != sizeof(CGenNode)" );
+
/* this is a event for stateless */
struct CGenNodeStateless : public CGenNodeBase {
friend class TrexStatelessDpCore;
@@ -73,7 +75,7 @@ private:
stream_state_t m_state;
uint8_t m_port_id;
uint8_t m_stream_type; /* see TrexStream::STREAM_TYPE ,stream_type_t */
- uint8_t m_pad;
+ uint8_t m_pause;
uint32_t m_single_burst; /* the number of bursts in case of burst */
uint32_t m_single_burst_refill;
@@ -111,7 +113,18 @@ public:
/* only to be safe */
m_ref_stream_info= NULL;
m_next_stream= NULL;
+ }
+ bool is_pause(){
+ return (m_pause==1?true:false);
+ }
+
+ void set_pause(bool enable){
+ if ( enable ){
+ m_pause=1;
+ }else{
+ m_pause=0;
+ }
}
inline uint8_t get_stream_type(){
@@ -142,7 +155,10 @@ public:
void refresh();
inline void handle_continues(CFlowGenListPerThread *thread) {
- thread->m_node_gen.m_v_if->send_node( (CGenNode *)this);
+
+ if (unlikely (is_pause()==false)) {
+ thread->m_node_gen.m_v_if->send_node( (CGenNode *)this);
+ }
/* in case of continues */
m_time += m_next_time_offset;
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index c861d0fa..ec8b7839 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -23,6 +23,7 @@ limitations under the License.
#include <trex_stateless_dp_core.h>
#include <trex_streams_compiler.h>
#include <trex_stateless.h>
+#include <bp_sim.h>
#include <string.h>
@@ -60,11 +61,8 @@ TrexStatelessDpStart::~TrexStatelessDpStart() {
bool
TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) {
- /* mark the event id for DP response */
- dp_core->get_port_db(m_port_id)->set_event_id(m_event_id);
-
/* staet traffic */
- dp_core->start_traffic(m_obj, m_duration);
+ dp_core->start_traffic(m_obj, m_duration,m_event_id);
return true;
}
@@ -75,17 +73,57 @@ TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) {
bool
TrexStatelessDpStop::handle(TrexStatelessDpCore *dp_core) {
- dp_core->stop_traffic(m_port_id);
+
+ dp_core->stop_traffic(m_port_id,m_stop_only_for_event_id,m_event_id);
return true;
}
+
+void TrexStatelessDpStop::on_node_remove(){
+ if ( m_core ) {
+ assert(m_core->m_non_active_nodes>0);
+ m_core->m_non_active_nodes--;
+ }
+}
+
+
+TrexStatelessCpToDpMsgBase * TrexStatelessDpPause::clone(){
+
+ TrexStatelessDpPause *new_msg = new TrexStatelessDpPause(m_port_id);
+ return new_msg;
+}
+
+
+bool TrexStatelessDpPause::handle(TrexStatelessDpCore *dp_core){
+ dp_core->pause_traffic(m_port_id);
+ return (true);
+}
+
+
+
+TrexStatelessCpToDpMsgBase * TrexStatelessDpResume::clone(){
+ TrexStatelessDpResume *new_msg = new TrexStatelessDpResume(m_port_id);
+ return new_msg;
+}
+
+bool TrexStatelessDpResume::handle(TrexStatelessDpCore *dp_core){
+ dp_core->resume_traffic(m_port_id);
+ return (true);
+}
+
+
/**
* clone for DP stop message
*
*/
TrexStatelessCpToDpMsgBase *
TrexStatelessDpStop::clone() {
- TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpStop(m_port_id);
+ TrexStatelessDpStop *new_msg = new TrexStatelessDpStop(m_port_id);
+
+ new_msg->set_event_id(m_event_id);
+ new_msg->set_wait_for_event_id(m_stop_only_for_event_id);
+ /* set back pointer to master */
+ new_msg->set_core_ptr(m_core);
return new_msg;
}
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index 2fb5a024..6bd0dbe3 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -27,6 +27,7 @@ limitations under the License.
class TrexStatelessDpCore;
class TrexStreamsCompiledObj;
+class CFlowGenListPerThread;
/**
* defines the base class for CP to DP messages
@@ -61,6 +62,10 @@ public:
return ( m_quit_scheduler);
}
+ /* this node is called from scheduler in case the node is free */
+ virtual void on_node_remove(){
+ }
+
/* no copy constructor */
TrexStatelessCpToDpMsgBase(TrexStatelessCpToDpMsgBase &) = delete;
@@ -94,6 +99,42 @@ private:
};
+class TrexStatelessDpPause : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpPause(uint8_t port_id) : m_port_id(port_id) {
+ }
+
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+
+private:
+ uint8_t m_port_id;
+};
+
+
+class TrexStatelessDpResume : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpResume(uint8_t port_id) : m_port_id(port_id) {
+ }
+
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+
+private:
+ uint8_t m_port_id;
+};
+
+
/**
* a message to stop traffic
*
@@ -103,14 +144,50 @@ class TrexStatelessDpStop : public TrexStatelessCpToDpMsgBase {
public:
TrexStatelessDpStop(uint8_t port_id) : m_port_id(port_id) {
+ m_stop_only_for_event_id=false;
+ m_event_id=0;
+ m_core = NULL;
}
virtual TrexStatelessCpToDpMsgBase * clone();
+
virtual bool handle(TrexStatelessDpCore *dp_core);
+ void set_core_ptr(CFlowGenListPerThread * core){
+ m_core = core;
+ }
+
+ CFlowGenListPerThread * get_core_ptr(){
+ return ( m_core);
+ }
+
+
+ void set_event_id(int event_id){
+ m_event_id = event_id;
+ }
+
+ void set_wait_for_event_id(bool wait){
+ m_stop_only_for_event_id = wait;
+ }
+
+ virtual void on_node_remove();
+
+
+ bool get_is_stop_by_event_id(){
+ return (m_stop_only_for_event_id);
+ }
+
+ int get_event_id(){
+ return (m_event_id);
+ }
+
private:
uint8_t m_port_id;
+ bool m_stop_only_for_event_id;
+ int m_event_id;
+ CFlowGenListPerThread * m_core ;
+
};
/**