From 3408c03067a85789b2128352fdc2343ab707ae32 Mon Sep 17 00:00:00 2001 From: Hanoh Haim Date: Sun, 22 Nov 2015 13:02:08 +0200 Subject: fix stop on duration per port --- .gitignore | 2 + scripts/exp/stl_bb_start_stop_delay1-0-ex.erf | Bin 0 -> 440 bytes scripts/exp/stl_bb_start_stop_delay2-0-ex.erf | Bin 0 -> 1496 bytes src/bp_sim.cpp | 5 +- src/bp_sim.h | 1 + src/gtest/trex_stateless_gtest.cpp | 253 +++++++++++++++++++++ src/stateless/dp/trex_stateless_dp_core.cpp | 56 +++-- src/stateless/dp/trex_stateless_dp_core.h | 13 +- src/stateless/dp/trex_stream_node.h | 1 + .../messaging/trex_stateless_messaging.cpp | 25 +- src/stateless/messaging/trex_stateless_messaging.h | 41 ++++ 11 files changed, 368 insertions(+), 29 deletions(-) create mode 100644 scripts/exp/stl_bb_start_stop_delay1-0-ex.erf create mode 100644 scripts/exp/stl_bb_start_stop_delay2-0-ex.erf diff --git a/.gitignore b/.gitignore index 39bea09a..839008be 100644 --- a/.gitignore +++ b/.gitignore @@ -28,6 +28,8 @@ scripts/exp/stl_simple_prog4-0.erf scripts/exp/stl_bb_start_stop-0.erf scripts/exp/stl_bb_start_stop2-0.erf scripts/exp/stl_bb_start_stop3-0.erf +scripts/exp/stl_bb_start_stop_delay1-0.erf +scripts/exp/stl_bb_start_stop_delay2-0.erf diff --git a/scripts/exp/stl_bb_start_stop_delay1-0-ex.erf b/scripts/exp/stl_bb_start_stop_delay1-0-ex.erf new file mode 100644 index 00000000..08afdf4b Binary files /dev/null and b/scripts/exp/stl_bb_start_stop_delay1-0-ex.erf differ diff --git a/scripts/exp/stl_bb_start_stop_delay2-0-ex.erf b/scripts/exp/stl_bb_start_stop_delay2-0-ex.erf new file mode 100644 index 00000000..01a77466 Binary files /dev/null and b/scripts/exp/stl_bb_start_stop_delay2-0-ex.erf differ diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp index f8dd20a1..a61fbb8f 100755 --- a/src/bp_sim.cpp +++ b/src/bp_sim.cpp @@ -3186,6 +3186,7 @@ bool CFlowGenListPerThread::Create(uint32_t thread_id, uint32_t max_threads){ + m_non_active_nodes = 0; m_terminated_by_master=false; m_flow_list =flow_list; m_core_id= core_id; @@ -3655,8 +3656,8 @@ CNodeGenerator::handle_slow_messages(uint8_t type, thread->check_msgs(); /* check messages */ m_v_if->flush_tx_queue(); /* flush pkt each timeout */ - /* on always (clean queue path) and queue empty - exit */ - if ( always && (m_p_queue.empty()) ) { + /* exit in case this is the last node*/ + if ( m_p_queue.size() == m_parent->m_non_active_nodes ) { thread->free_node(node); exit_scheduler = true; } else { diff --git a/src/bp_sim.h b/src/bp_sim.h index fcca2428..0da7fb99 100755 --- a/src/bp_sim.h +++ b/src/bp_sim.h @@ -3541,6 +3541,7 @@ public: CNodeGenerator m_node_gen; public: uint32_t m_cur_template; + uint32_t m_non_active_nodes; /* the number of non active nodes -> nodes that try to stop somthing */ uint64_t m_cur_flow_id; double m_cur_time_sec; double m_stop_time_sec; diff --git a/src/gtest/trex_stateless_gtest.cpp b/src/gtest/trex_stateless_gtest.cpp index 83721f0d..5bb7fca1 100644 --- a/src/gtest/trex_stateless_gtest.cpp +++ b/src/gtest/trex_stateless_gtest.cpp @@ -77,10 +77,32 @@ public: void add_msg(TrexStatelessCpToDpMsgBase * msg){ m_msgs.push_back(msg); } + void add_command(CBasicStlDelayCommand & command){ m_commands.push_back(command); } + /* only if both port are idle we can exit */ + void add_command(CFlowGenListPerThread * core, + TrexStatelessCpToDpMsgBase * msg, + double time){ + + CGenNodeCommand *node = (CGenNodeCommand *)core->create_node() ; + + node->m_type = CGenNode::COMMAND; + + node->m_cmd = msg; + + /* make sure it will be scheduled after the current node */ + node->m_time = time ; + + CBasicStlDelayCommand command; + command.m_node =node; + + add_command(command); + } + + void clear(){ m_msgs.clear(); m_commands.clear(); @@ -94,6 +116,20 @@ protected: }; + +class CBasicStlSink { + +public: + CBasicStlSink(){ + m_core=0; + } + virtual void call_after_init(CBasicStl * m_obj)=0; + virtual void call_after_run(CBasicStl * m_obj)=0; + + CFlowGenListPerThread * m_core; +}; + + /** * handler for DP to CP messages * @@ -115,6 +151,7 @@ public: m_dump_json=false; m_dp_to_cp_handler = NULL; m_msg = NULL; + m_sink = NULL; } @@ -161,6 +198,10 @@ public: lpt=fl.m_threads_info[0]; + if ( m_sink ){ + m_sink->m_core =lpt; + } + char buf[100]; char buf_ex[100]; sprintf(buf,"%s-%d.erf",CGlobalInfo::m_options.out_file.c_str(),0); @@ -178,6 +219,10 @@ public: } } + if (m_sink) { + m_sink->call_after_init(this); + } + /* add the commands */ if (m_msg_queue.m_commands.size()>0) { for (auto cmd : m_msg_queue.m_commands) { @@ -203,6 +248,10 @@ public: printf(" %s \n",s.c_str()); } + if (m_sink) { + m_sink->call_after_run(this); + } + flush_dp_to_cp_messages(); m_msg_queue.clear(); @@ -216,6 +265,7 @@ public: double m_time_diff; bool m_dump_json; DpToCpHandler *m_dp_to_cp_handler; + CBasicStlSink * m_sink; TrexStatelessCpToDpMsgBase * m_msg; CNodeRing *m_ring_from_cp; @@ -329,6 +379,209 @@ TEST_F(basic_stl, load_pcap_file) { + + + + + + + +class CBBStartStopDelay2: public CBasicStlSink { +public: + + virtual void call_after_init(CBasicStl * m_obj); + virtual void call_after_run(CBasicStl * m_obj){ + }; + uint8_t m_port_id; +}; + + + +void CBBStartStopDelay2::call_after_init(CBasicStl * m_obj){ + + TrexStatelessDpStop * lpStopCmd = new TrexStatelessDpStop(m_port_id); + TrexStatelessDpStop * lpStopCmd1 = new TrexStatelessDpStop(m_port_id); + + + TrexStreamsCompiler compile; + + uint8_t port_id=0; + + std::vector streams; + + TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0); + stream1->set_pps(1.0); + + + stream1->m_enabled = true; + stream1->m_self_start = true; + stream1->m_port_id= port_id; + + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000002); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + + // stream - clean + + TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/); + + assert(compile.compile(streams, comp_obj) ); + + + /* start with different event id */ + TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(m_port_id, 1, comp_obj.clone(), 10.0 /*sec */ ); + + + m_obj->m_msg_queue.add_command(m_core,lpStopCmd, 5.0); /* command in delay of 5 sec */ + m_obj->m_msg_queue.add_command(m_core,lpStopCmd1, 7.0);/* command in delay of 7 sec */ + m_obj->m_msg_queue.add_command(m_core,lpStartCmd, 7.5);/* command in delay of 7 sec */ + + delete stream1 ; + + +} + + + +/* start/stop/stop back to back */ +TEST_F(basic_stl, single_pkt_bb_start_stop_delay2) { + + CBasicStl t1; + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(7); + po->preview.setFileWrite(true); + po->out_file ="exp/stl_bb_start_stop_delay2"; + + TrexStreamsCompiler compile; + + uint8_t port_id=0; + + std::vector streams; + + TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0); + stream1->set_pps(1.0); + + + stream1->m_enabled = true; + stream1->m_self_start = true; + stream1->m_port_id= port_id; + + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + + // stream - clean + + TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/); + + assert(compile.compile(streams, comp_obj) ); + + TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ ); + + t1.m_msg_queue.add_msg(lpStartCmd); + + + CBBStartStopDelay2 sink; + sink.m_port_id = port_id; + t1.m_sink = &sink; + + bool res=t1.init(); + + delete stream1 ; + + EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; +} + + + + + + + +class CBBStartStopDelay1: public CBasicStlSink { +public: + + virtual void call_after_init(CBasicStl * m_obj); + virtual void call_after_run(CBasicStl * m_obj){ + }; + uint8_t m_port_id; +}; + + + +void CBBStartStopDelay1::call_after_init(CBasicStl * m_obj){ + + TrexStatelessDpStop * lpStopCmd = new TrexStatelessDpStop(m_port_id); + TrexStatelessDpStop * lpStopCmd1 = new TrexStatelessDpStop(m_port_id); + + m_obj->m_msg_queue.add_command(m_core,lpStopCmd, 5.0); /* command in delay of 5 sec */ + m_obj->m_msg_queue.add_command(m_core,lpStopCmd1, 7.0);/* command in delay of 7 sec */ +} + + + +/* start/stop/stop back to back */ +TEST_F(basic_stl, single_pkt_bb_start_stop_delay1) { + + CBasicStl t1; + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(7); + po->preview.setFileWrite(true); + po->out_file ="exp/stl_bb_start_stop_delay1"; + + TrexStreamsCompiler compile; + + uint8_t port_id=0; + + std::vector streams; + + TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0); + stream1->set_pps(1.0); + + + stream1->m_enabled = true; + stream1->m_self_start = true; + stream1->m_port_id= port_id; + + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + + // stream - clean + + TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/); + + assert(compile.compile(streams, comp_obj) ); + + TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ ); + + t1.m_msg_queue.add_msg(lpStartCmd); + + + CBBStartStopDelay1 sink; + sink.m_port_id = port_id; + t1.m_sink = &sink; + + bool res=t1.init(); + + delete stream1 ; + + EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; +} + + /* start/stop/stop back to back */ TEST_F(basic_stl, single_pkt_bb_start_stop3) { diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index 6430e520..f8afb3bb 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -78,9 +78,10 @@ void CGenNodeStateless::refresh(){ } - void CGenNodeCommand::free_command(){ + assert(m_cmd); + m_cmd->on_node_remove(); delete m_cmd; } @@ -124,14 +125,23 @@ bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){ } -bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id){ +bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id, + bool stop_on_id, + int event_id){ - /* there could be race of stop after stop */ if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) { assert(m_active_streams==0); return false; } + /* there could be race of stop after stop */ + if ( stop_on_id ) { + if (event_id != m_event_id){ + /* we can't stop it is an old message */ + return false; + } + } + for (auto dp_stream : m_active_nodes) { CGenNodeStateless * node =dp_stream.m_node; assert(node->get_port_id() == port_id); @@ -215,7 +225,7 @@ bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node, if ( to_stop_port ) { /* call stop port explictly to move the state */ - stop_traffic(cur_node->m_port_id); + stop_traffic(cur_node->m_port_id,false,0); } return ( schedule ); @@ -330,7 +340,8 @@ TrexStatelessDpCore::add_global_duration(double duration){ /* add per port exit */ void TrexStatelessDpCore::add_port_duration(double duration, - uint8_t port_id){ + uint8_t port_id, + int event_id){ if (duration > 0.0) { CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ; @@ -339,7 +350,16 @@ TrexStatelessDpCore::add_port_duration(double duration, /* make sure it will be scheduled after the current node */ node->m_time = m_core->m_cur_time_sec + duration ; - node->m_cmd = new TrexStatelessDpStop(port_id); + TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id); + + + /* test this */ + m_core->m_non_active_nodes++; + cmd->set_core_ptr(m_core); + cmd->set_event_id(event_id); + cmd->set_wait_for_event_id(true); + + node->m_cmd = cmd; m_core->m_node_gen.add_node((CGenNode *)node); } @@ -450,15 +470,14 @@ TrexStatelessDpCore::add_cont_stream(TrexStatelessDpPerPort * lp_port, void TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj, - double duration) { + double duration, + int event_id) { -#if 0 - /* TBD to remove ! */ - obj->Dump(stdout); -#endif TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id()); lp_port->m_active_streams = 0; + lp_port->set_event_id(event_id); + /* no nodes in the list */ assert(lp_port->m_active_nodes.size()==0); @@ -491,7 +510,7 @@ TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj, if ( duration > 0.0 ){ - add_port_duration( duration ,obj->get_port_id() ); + add_port_duration( duration ,obj->get_port_id(),event_id ); } } @@ -511,23 +530,26 @@ bool TrexStatelessDpCore::are_all_ports_idle(){ void -TrexStatelessDpCore::stop_traffic(uint8_t port_id) { +TrexStatelessDpCore::stop_traffic(uint8_t port_id, + bool stop_on_id, + int event_id) { /* we cannot remove nodes not from the top of the queue so for every active node - make sure next time the scheduler invokes it, it will be free */ TrexStatelessDpPerPort * lp_port = get_port_db(port_id); - if ( lp_port->stop_traffic(port_id) == false){ + if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){ /* nothing to do ! already stopped */ + //printf(" skip .. %f\n",m_core->m_cur_time_sec); return; } - +#if 0 if ( are_all_ports_idle() ) { - - schedule_exit(); + /* just a place holder if we will need to do somthing in that case */ } +#endif /* inform the control plane we stopped - this might be a async stop (streams ended) diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h index 326bbe30..187c40d8 100644 --- a/src/stateless/dp/trex_stateless_dp_core.h +++ b/src/stateless/dp/trex_stateless_dp_core.h @@ -62,7 +62,9 @@ public: void create(CFlowGenListPerThread * core); - bool stop_traffic(uint8_t port_id); + bool stop_traffic(uint8_t port_id, + bool stop_on_id, + int event_id); bool update_number_of_active_streams(uint32_t d); @@ -137,13 +139,15 @@ public: * @param pkt * @param pkt_len */ - void start_traffic(TrexStreamsCompiledObj *obj, double duration = -1); + void start_traffic(TrexStreamsCompiledObj *obj, + double duration, + int m_event_id); /** * stop all traffic for this core * */ - void stop_traffic(uint8_t port_id); + void stop_traffic(uint8_t port_id,bool stop_on_id, int event_id); /* return if all ports are idel */ @@ -225,7 +229,8 @@ private: void add_port_duration(double duration, - uint8_t port_id); + uint8_t port_id, + int event_id); void add_global_duration(double duration); diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h index 1e53887b..20e32b78 100644 --- a/src/stateless/dp/trex_stream_node.h +++ b/src/stateless/dp/trex_stream_node.h @@ -29,6 +29,7 @@ class TrexStatelessDpCore; #include class TrexStatelessCpToDpMsgBase; +class CFlowGenListPerThread; struct CGenNodeCommand : public CGenNodeBase { diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index c861d0fa..bbd4b68c 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -23,6 +23,7 @@ limitations under the License. #include #include #include +#include #include @@ -60,11 +61,8 @@ TrexStatelessDpStart::~TrexStatelessDpStart() { bool TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) { - /* mark the event id for DP response */ - dp_core->get_port_db(m_port_id)->set_event_id(m_event_id); - /* staet traffic */ - dp_core->start_traffic(m_obj, m_duration); + dp_core->start_traffic(m_obj, m_duration,m_event_id); return true; } @@ -75,17 +73,32 @@ TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) { bool TrexStatelessDpStop::handle(TrexStatelessDpCore *dp_core) { - dp_core->stop_traffic(m_port_id); + + dp_core->stop_traffic(m_port_id,m_stop_only_for_event_id,m_event_id); return true; } + +void TrexStatelessDpStop::on_node_remove(){ + if ( m_core ) { + assert(m_core->m_non_active_nodes>0); + m_core->m_non_active_nodes--; + } +} + + /** * clone for DP stop message * */ TrexStatelessCpToDpMsgBase * TrexStatelessDpStop::clone() { - TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpStop(m_port_id); + TrexStatelessDpStop *new_msg = new TrexStatelessDpStop(m_port_id); + + new_msg->set_event_id(m_event_id); + new_msg->set_wait_for_event_id(m_stop_only_for_event_id); + /* set back pointer to master */ + new_msg->set_core_ptr(m_core); return new_msg; } diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index 2fb5a024..afa5953a 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -27,6 +27,7 @@ limitations under the License. class TrexStatelessDpCore; class TrexStreamsCompiledObj; +class CFlowGenListPerThread; /** * defines the base class for CP to DP messages @@ -61,6 +62,10 @@ public: return ( m_quit_scheduler); } + /* this node is called from scheduler in case the node is free */ + virtual void on_node_remove(){ + } + /* no copy constructor */ TrexStatelessCpToDpMsgBase(TrexStatelessCpToDpMsgBase &) = delete; @@ -103,14 +108,50 @@ class TrexStatelessDpStop : public TrexStatelessCpToDpMsgBase { public: TrexStatelessDpStop(uint8_t port_id) : m_port_id(port_id) { + m_stop_only_for_event_id=false; + m_event_id=0; + m_core = NULL; } virtual TrexStatelessCpToDpMsgBase * clone(); + virtual bool handle(TrexStatelessDpCore *dp_core); + void set_core_ptr(CFlowGenListPerThread * core){ + m_core = core; + } + + CFlowGenListPerThread * get_core_ptr(){ + return ( m_core); + } + + + void set_event_id(int event_id){ + m_event_id = event_id; + } + + void set_wait_for_event_id(bool wait){ + m_stop_only_for_event_id = wait; + } + + virtual void on_node_remove(); + + + bool get_is_stop_by_event_id(){ + return (m_stop_only_for_event_id); + } + + int get_event_id(){ + return (m_event_id); + } + private: uint8_t m_port_id; + bool m_stop_only_for_event_id; + int m_event_id; + CFlowGenListPerThread * m_core ; + }; /** -- cgit 1.2.3-korg