summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitignore2
-rw-r--r--scripts/exp/stl_bb_start_stop_delay1-0-ex.erfbin0 -> 440 bytes
-rw-r--r--scripts/exp/stl_bb_start_stop_delay2-0-ex.erfbin0 -> 1496 bytes
-rwxr-xr-xsrc/bp_sim.cpp5
-rwxr-xr-xsrc/bp_sim.h1
-rw-r--r--src/gtest/trex_stateless_gtest.cpp253
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp56
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.h13
-rw-r--r--src/stateless/dp/trex_stream_node.h1
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp25
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h41
11 files changed, 368 insertions, 29 deletions
diff --git a/.gitignore b/.gitignore
index 39bea09a..839008be 100644
--- a/.gitignore
+++ b/.gitignore
@@ -28,6 +28,8 @@ scripts/exp/stl_simple_prog4-0.erf
scripts/exp/stl_bb_start_stop-0.erf
scripts/exp/stl_bb_start_stop2-0.erf
scripts/exp/stl_bb_start_stop3-0.erf
+scripts/exp/stl_bb_start_stop_delay1-0.erf
+scripts/exp/stl_bb_start_stop_delay2-0.erf
diff --git a/scripts/exp/stl_bb_start_stop_delay1-0-ex.erf b/scripts/exp/stl_bb_start_stop_delay1-0-ex.erf
new file mode 100644
index 00000000..08afdf4b
--- /dev/null
+++ b/scripts/exp/stl_bb_start_stop_delay1-0-ex.erf
Binary files differ
diff --git a/scripts/exp/stl_bb_start_stop_delay2-0-ex.erf b/scripts/exp/stl_bb_start_stop_delay2-0-ex.erf
new file mode 100644
index 00000000..01a77466
--- /dev/null
+++ b/scripts/exp/stl_bb_start_stop_delay2-0-ex.erf
Binary files differ
diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp
index f8dd20a1..a61fbb8f 100755
--- a/src/bp_sim.cpp
+++ b/src/bp_sim.cpp
@@ -3186,6 +3186,7 @@ bool CFlowGenListPerThread::Create(uint32_t thread_id,
uint32_t max_threads){
+ m_non_active_nodes = 0;
m_terminated_by_master=false;
m_flow_list =flow_list;
m_core_id= core_id;
@@ -3655,8 +3656,8 @@ CNodeGenerator::handle_slow_messages(uint8_t type,
thread->check_msgs(); /* check messages */
m_v_if->flush_tx_queue(); /* flush pkt each timeout */
- /* on always (clean queue path) and queue empty - exit */
- if ( always && (m_p_queue.empty()) ) {
+ /* exit in case this is the last node*/
+ if ( m_p_queue.size() == m_parent->m_non_active_nodes ) {
thread->free_node(node);
exit_scheduler = true;
} else {
diff --git a/src/bp_sim.h b/src/bp_sim.h
index fcca2428..0da7fb99 100755
--- a/src/bp_sim.h
+++ b/src/bp_sim.h
@@ -3541,6 +3541,7 @@ public:
CNodeGenerator m_node_gen;
public:
uint32_t m_cur_template;
+ uint32_t m_non_active_nodes; /* the number of non active nodes -> nodes that try to stop somthing */
uint64_t m_cur_flow_id;
double m_cur_time_sec;
double m_stop_time_sec;
diff --git a/src/gtest/trex_stateless_gtest.cpp b/src/gtest/trex_stateless_gtest.cpp
index 83721f0d..5bb7fca1 100644
--- a/src/gtest/trex_stateless_gtest.cpp
+++ b/src/gtest/trex_stateless_gtest.cpp
@@ -77,10 +77,32 @@ public:
void add_msg(TrexStatelessCpToDpMsgBase * msg){
m_msgs.push_back(msg);
}
+
void add_command(CBasicStlDelayCommand & command){
m_commands.push_back(command);
}
+ /* only if both port are idle we can exit */
+ void add_command(CFlowGenListPerThread * core,
+ TrexStatelessCpToDpMsgBase * msg,
+ double time){
+
+ CGenNodeCommand *node = (CGenNodeCommand *)core->create_node() ;
+
+ node->m_type = CGenNode::COMMAND;
+
+ node->m_cmd = msg;
+
+ /* make sure it will be scheduled after the current node */
+ node->m_time = time ;
+
+ CBasicStlDelayCommand command;
+ command.m_node =node;
+
+ add_command(command);
+ }
+
+
void clear(){
m_msgs.clear();
m_commands.clear();
@@ -94,6 +116,20 @@ protected:
};
+
+class CBasicStlSink {
+
+public:
+ CBasicStlSink(){
+ m_core=0;
+ }
+ virtual void call_after_init(CBasicStl * m_obj)=0;
+ virtual void call_after_run(CBasicStl * m_obj)=0;
+
+ CFlowGenListPerThread * m_core;
+};
+
+
/**
* handler for DP to CP messages
*
@@ -115,6 +151,7 @@ public:
m_dump_json=false;
m_dp_to_cp_handler = NULL;
m_msg = NULL;
+ m_sink = NULL;
}
@@ -161,6 +198,10 @@ public:
lpt=fl.m_threads_info[0];
+ if ( m_sink ){
+ m_sink->m_core =lpt;
+ }
+
char buf[100];
char buf_ex[100];
sprintf(buf,"%s-%d.erf",CGlobalInfo::m_options.out_file.c_str(),0);
@@ -178,6 +219,10 @@ public:
}
}
+ if (m_sink) {
+ m_sink->call_after_init(this);
+ }
+
/* add the commands */
if (m_msg_queue.m_commands.size()>0) {
for (auto cmd : m_msg_queue.m_commands) {
@@ -203,6 +248,10 @@ public:
printf(" %s \n",s.c_str());
}
+ if (m_sink) {
+ m_sink->call_after_run(this);
+ }
+
flush_dp_to_cp_messages();
m_msg_queue.clear();
@@ -216,6 +265,7 @@ public:
double m_time_diff;
bool m_dump_json;
DpToCpHandler *m_dp_to_cp_handler;
+ CBasicStlSink * m_sink;
TrexStatelessCpToDpMsgBase * m_msg;
CNodeRing *m_ring_from_cp;
@@ -329,6 +379,209 @@ TEST_F(basic_stl, load_pcap_file) {
+
+
+
+
+
+
+
+class CBBStartStopDelay2: public CBasicStlSink {
+public:
+
+ virtual void call_after_init(CBasicStl * m_obj);
+ virtual void call_after_run(CBasicStl * m_obj){
+ };
+ uint8_t m_port_id;
+};
+
+
+
+void CBBStartStopDelay2::call_after_init(CBasicStl * m_obj){
+
+ TrexStatelessDpStop * lpStopCmd = new TrexStatelessDpStop(m_port_id);
+ TrexStatelessDpStop * lpStopCmd1 = new TrexStatelessDpStop(m_port_id);
+
+
+ TrexStreamsCompiler compile;
+
+ uint8_t port_id=0;
+
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0);
+ stream1->set_pps(1.0);
+
+
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_port_id= port_id;
+
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000002);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+ // stream - clean
+
+ TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
+
+ assert(compile.compile(streams, comp_obj) );
+
+
+ /* start with different event id */
+ TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(m_port_id, 1, comp_obj.clone(), 10.0 /*sec */ );
+
+
+ m_obj->m_msg_queue.add_command(m_core,lpStopCmd, 5.0); /* command in delay of 5 sec */
+ m_obj->m_msg_queue.add_command(m_core,lpStopCmd1, 7.0);/* command in delay of 7 sec */
+ m_obj->m_msg_queue.add_command(m_core,lpStartCmd, 7.5);/* command in delay of 7 sec */
+
+ delete stream1 ;
+
+
+}
+
+
+
+/* start/stop/stop back to back */
+TEST_F(basic_stl, single_pkt_bb_start_stop_delay2) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/stl_bb_start_stop_delay2";
+
+ TrexStreamsCompiler compile;
+
+ uint8_t port_id=0;
+
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0);
+ stream1->set_pps(1.0);
+
+
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_port_id= port_id;
+
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+ // stream - clean
+
+ TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
+
+ assert(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ );
+
+ t1.m_msg_queue.add_msg(lpStartCmd);
+
+
+ CBBStartStopDelay2 sink;
+ sink.m_port_id = port_id;
+ t1.m_sink = &sink;
+
+ bool res=t1.init();
+
+ delete stream1 ;
+
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+
+
+
+
+
+
+class CBBStartStopDelay1: public CBasicStlSink {
+public:
+
+ virtual void call_after_init(CBasicStl * m_obj);
+ virtual void call_after_run(CBasicStl * m_obj){
+ };
+ uint8_t m_port_id;
+};
+
+
+
+void CBBStartStopDelay1::call_after_init(CBasicStl * m_obj){
+
+ TrexStatelessDpStop * lpStopCmd = new TrexStatelessDpStop(m_port_id);
+ TrexStatelessDpStop * lpStopCmd1 = new TrexStatelessDpStop(m_port_id);
+
+ m_obj->m_msg_queue.add_command(m_core,lpStopCmd, 5.0); /* command in delay of 5 sec */
+ m_obj->m_msg_queue.add_command(m_core,lpStopCmd1, 7.0);/* command in delay of 7 sec */
+}
+
+
+
+/* start/stop/stop back to back */
+TEST_F(basic_stl, single_pkt_bb_start_stop_delay1) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/stl_bb_start_stop_delay1";
+
+ TrexStreamsCompiler compile;
+
+ uint8_t port_id=0;
+
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0);
+ stream1->set_pps(1.0);
+
+
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_port_id= port_id;
+
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+ // stream - clean
+
+ TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
+
+ assert(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ );
+
+ t1.m_msg_queue.add_msg(lpStartCmd);
+
+
+ CBBStartStopDelay1 sink;
+ sink.m_port_id = port_id;
+ t1.m_sink = &sink;
+
+ bool res=t1.init();
+
+ delete stream1 ;
+
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+
/* start/stop/stop back to back */
TEST_F(basic_stl, single_pkt_bb_start_stop3) {
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index 6430e520..f8afb3bb 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -78,9 +78,10 @@ void CGenNodeStateless::refresh(){
}
-
void CGenNodeCommand::free_command(){
+
assert(m_cmd);
+ m_cmd->on_node_remove();
delete m_cmd;
}
@@ -124,14 +125,23 @@ bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
}
-bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id){
+bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id,
+ bool stop_on_id,
+ int event_id){
- /* there could be race of stop after stop */
if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) {
assert(m_active_streams==0);
return false;
}
+ /* there could be race of stop after stop */
+ if ( stop_on_id ) {
+ if (event_id != m_event_id){
+ /* we can't stop it is an old message */
+ return false;
+ }
+ }
+
for (auto dp_stream : m_active_nodes) {
CGenNodeStateless * node =dp_stream.m_node;
assert(node->get_port_id() == port_id);
@@ -215,7 +225,7 @@ bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
if ( to_stop_port ) {
/* call stop port explictly to move the state */
- stop_traffic(cur_node->m_port_id);
+ stop_traffic(cur_node->m_port_id,false,0);
}
return ( schedule );
@@ -330,7 +340,8 @@ TrexStatelessDpCore::add_global_duration(double duration){
/* add per port exit */
void
TrexStatelessDpCore::add_port_duration(double duration,
- uint8_t port_id){
+ uint8_t port_id,
+ int event_id){
if (duration > 0.0) {
CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
@@ -339,7 +350,16 @@ TrexStatelessDpCore::add_port_duration(double duration,
/* make sure it will be scheduled after the current node */
node->m_time = m_core->m_cur_time_sec + duration ;
- node->m_cmd = new TrexStatelessDpStop(port_id);
+ TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id);
+
+
+ /* test this */
+ m_core->m_non_active_nodes++;
+ cmd->set_core_ptr(m_core);
+ cmd->set_event_id(event_id);
+ cmd->set_wait_for_event_id(true);
+
+ node->m_cmd = cmd;
m_core->m_node_gen.add_node((CGenNode *)node);
}
@@ -450,15 +470,14 @@ TrexStatelessDpCore::add_cont_stream(TrexStatelessDpPerPort * lp_port,
void
TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
- double duration) {
+ double duration,
+ int event_id) {
-#if 0
- /* TBD to remove ! */
- obj->Dump(stdout);
-#endif
TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
lp_port->m_active_streams = 0;
+ lp_port->set_event_id(event_id);
+
/* no nodes in the list */
assert(lp_port->m_active_nodes.size()==0);
@@ -491,7 +510,7 @@ TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
if ( duration > 0.0 ){
- add_port_duration( duration ,obj->get_port_id() );
+ add_port_duration( duration ,obj->get_port_id(),event_id );
}
}
@@ -511,23 +530,26 @@ bool TrexStatelessDpCore::are_all_ports_idle(){
void
-TrexStatelessDpCore::stop_traffic(uint8_t port_id) {
+TrexStatelessDpCore::stop_traffic(uint8_t port_id,
+ bool stop_on_id,
+ int event_id) {
/* we cannot remove nodes not from the top of the queue so
for every active node - make sure next time
the scheduler invokes it, it will be free */
TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
- if ( lp_port->stop_traffic(port_id) == false){
+ if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){
/* nothing to do ! already stopped */
+ //printf(" skip .. %f\n",m_core->m_cur_time_sec);
return;
}
-
+#if 0
if ( are_all_ports_idle() ) {
-
- schedule_exit();
+ /* just a place holder if we will need to do somthing in that case */
}
+#endif
/* inform the control plane we stopped - this might be a async stop
(streams ended)
diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h
index 326bbe30..187c40d8 100644
--- a/src/stateless/dp/trex_stateless_dp_core.h
+++ b/src/stateless/dp/trex_stateless_dp_core.h
@@ -62,7 +62,9 @@ public:
void create(CFlowGenListPerThread * core);
- bool stop_traffic(uint8_t port_id);
+ bool stop_traffic(uint8_t port_id,
+ bool stop_on_id,
+ int event_id);
bool update_number_of_active_streams(uint32_t d);
@@ -137,13 +139,15 @@ public:
* @param pkt
* @param pkt_len
*/
- void start_traffic(TrexStreamsCompiledObj *obj, double duration = -1);
+ void start_traffic(TrexStreamsCompiledObj *obj,
+ double duration,
+ int m_event_id);
/**
* stop all traffic for this core
*
*/
- void stop_traffic(uint8_t port_id);
+ void stop_traffic(uint8_t port_id,bool stop_on_id, int event_id);
/* return if all ports are idel */
@@ -225,7 +229,8 @@ private:
void add_port_duration(double duration,
- uint8_t port_id);
+ uint8_t port_id,
+ int event_id);
void add_global_duration(double duration);
diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h
index 1e53887b..20e32b78 100644
--- a/src/stateless/dp/trex_stream_node.h
+++ b/src/stateless/dp/trex_stream_node.h
@@ -29,6 +29,7 @@ class TrexStatelessDpCore;
#include <trex_stream.h>
class TrexStatelessCpToDpMsgBase;
+class CFlowGenListPerThread;
struct CGenNodeCommand : public CGenNodeBase {
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index c861d0fa..bbd4b68c 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -23,6 +23,7 @@ limitations under the License.
#include <trex_stateless_dp_core.h>
#include <trex_streams_compiler.h>
#include <trex_stateless.h>
+#include <bp_sim.h>
#include <string.h>
@@ -60,11 +61,8 @@ TrexStatelessDpStart::~TrexStatelessDpStart() {
bool
TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) {
- /* mark the event id for DP response */
- dp_core->get_port_db(m_port_id)->set_event_id(m_event_id);
-
/* staet traffic */
- dp_core->start_traffic(m_obj, m_duration);
+ dp_core->start_traffic(m_obj, m_duration,m_event_id);
return true;
}
@@ -75,17 +73,32 @@ TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) {
bool
TrexStatelessDpStop::handle(TrexStatelessDpCore *dp_core) {
- dp_core->stop_traffic(m_port_id);
+
+ dp_core->stop_traffic(m_port_id,m_stop_only_for_event_id,m_event_id);
return true;
}
+
+void TrexStatelessDpStop::on_node_remove(){
+ if ( m_core ) {
+ assert(m_core->m_non_active_nodes>0);
+ m_core->m_non_active_nodes--;
+ }
+}
+
+
/**
* clone for DP stop message
*
*/
TrexStatelessCpToDpMsgBase *
TrexStatelessDpStop::clone() {
- TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpStop(m_port_id);
+ TrexStatelessDpStop *new_msg = new TrexStatelessDpStop(m_port_id);
+
+ new_msg->set_event_id(m_event_id);
+ new_msg->set_wait_for_event_id(m_stop_only_for_event_id);
+ /* set back pointer to master */
+ new_msg->set_core_ptr(m_core);
return new_msg;
}
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index 2fb5a024..afa5953a 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -27,6 +27,7 @@ limitations under the License.
class TrexStatelessDpCore;
class TrexStreamsCompiledObj;
+class CFlowGenListPerThread;
/**
* defines the base class for CP to DP messages
@@ -61,6 +62,10 @@ public:
return ( m_quit_scheduler);
}
+ /* this node is called from scheduler in case the node is free */
+ virtual void on_node_remove(){
+ }
+
/* no copy constructor */
TrexStatelessCpToDpMsgBase(TrexStatelessCpToDpMsgBase &) = delete;
@@ -103,14 +108,50 @@ class TrexStatelessDpStop : public TrexStatelessCpToDpMsgBase {
public:
TrexStatelessDpStop(uint8_t port_id) : m_port_id(port_id) {
+ m_stop_only_for_event_id=false;
+ m_event_id=0;
+ m_core = NULL;
}
virtual TrexStatelessCpToDpMsgBase * clone();
+
virtual bool handle(TrexStatelessDpCore *dp_core);
+ void set_core_ptr(CFlowGenListPerThread * core){
+ m_core = core;
+ }
+
+ CFlowGenListPerThread * get_core_ptr(){
+ return ( m_core);
+ }
+
+
+ void set_event_id(int event_id){
+ m_event_id = event_id;
+ }
+
+ void set_wait_for_event_id(bool wait){
+ m_stop_only_for_event_id = wait;
+ }
+
+ virtual void on_node_remove();
+
+
+ bool get_is_stop_by_event_id(){
+ return (m_stop_only_for_event_id);
+ }
+
+ int get_event_id(){
+ return (m_event_id);
+ }
+
private:
uint8_t m_port_id;
+ bool m_stop_only_for_event_id;
+ int m_event_id;
+ CFlowGenListPerThread * m_core ;
+
};
/**