diff options
author | 2015-11-24 11:54:58 +0200 | |
---|---|---|
committer | 2015-11-24 11:54:58 +0200 | |
commit | ac6f8e7c7c4e957a31c19c1fe4a0997009c6e733 (patch) | |
tree | 721101231fb692fed6028b21628cb861f3c64019 | |
parent | 161a85c57c3d2a165e4fa94140d67db05714a7d3 (diff) | |
parent | 1e1c11059b7f7fcb5c160dffe2df832a8adf719d (diff) |
Merge branch 'master' of csi-sceasr-b45:/auto/proj-pcube-b/apps/PL-b/tools/repo//trex-core
20 files changed, 797 insertions, 43 deletions
@@ -28,6 +28,9 @@ scripts/exp/stl_simple_prog4-0.erf scripts/exp/stl_bb_start_stop-0.erf scripts/exp/stl_bb_start_stop2-0.erf scripts/exp/stl_bb_start_stop3-0.erf +scripts/exp/stl_bb_start_stop_delay1-0.erf +scripts/exp/stl_bb_start_stop_delay2-0.erf +scripts/exp/stl_basic_pause_resume0-0.erf diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 164cdb90..7bcbf2c7 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -331,6 +331,40 @@ class Port(object): return self.ok() + def pause (self): + + if (self.state != self.STATE_TX) : + return self.err("port is not transmitting") + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc, data = self.transmit("pause_traffic", params) + if not rc: + return self.err(data) + + # only valid state after stop + self.state = self.STATE_PAUSE + + return self.ok() + + def resume (self): + + if (self.state != self.STATE_PAUSE) : + return self.err("port is not in pause mode") + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc, data = self.transmit("resume_traffic", params) + if not rc: + return self.err(data) + + # only valid state after stop + self.state = self.STATE_TX + + return self.ok() + ################# events handler ###################### def async_event_port_stopped (self): self.state = self.STATE_STREAMS @@ -615,6 +649,25 @@ class CTRexStatelessClient(object): return rc + def resume_traffic (self, port_id_list = None, force = False): + + port_id_list = self.__ports(port_id_list) + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].resume()) + + return rc + + def pause_traffic (self, port_id_list = None, force = False): + + port_id_list = self.__ports(port_id_list) + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].pause()) + + return rc def stop_traffic (self, port_id_list = None, force = False): @@ -707,6 +760,71 @@ class CTRexStatelessClient(object): return RC_OK() + # pause cmd + def cmd_pause (self, port_id_list): + + # find the relveant ports + active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) + + if not active_ports: + msg = "No active traffic on porvided ports" + print format_text(msg, 'bold') + return RC_ERR(msg) + + rc = self.pause_traffic(active_ports) + rc.annotate("Pausing traffic on port(s) {0}:".format(port_id_list)) + if rc.bad(): + return rc + + return RC_OK() + + def cmd_pause_line (self, line): + '''Pause active traffic in specified ports on TRex\n''' + parser = parsing_opts.gen_parser(self, + "pause", + self.cmd_stop_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL) + + opts = parser.parse_args(line.split()) + if opts is None: + return RC_ERR("bad command line paramters") + + return self.cmd_pause(opts.ports) + + + # resume cmd + def cmd_resume (self, port_id_list): + + # find the relveant ports + active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) + + if not active_ports: + msg = "No active traffic on porvided ports" + print format_text(msg, 'bold') + return RC_ERR(msg) + + rc = self.resume_traffic(active_ports) + rc.annotate("Resume traffic on port(s) {0}:".format(port_id_list)) + if rc.bad(): + return rc + + return RC_OK() + + + def cmd_resume_line (self, line): + '''Resume active traffic in specified ports on TRex\n''' + parser = parsing_opts.gen_parser(self, + "resume", + self.cmd_stop_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL) + + opts = parser.parse_args(line.split()) + if opts is None: + return RC_ERR("bad command line paramters") + + return self.cmd_resume(opts.ports) + + # start cmd def cmd_start (self, port_id_list, stream_list, mult, force, duration): diff --git a/scripts/automation/trex_control_plane/console/old_console.py b/scripts/automation/trex_control_plane/console/old_console.py index 93c7e3f4..9d61a3a6 100644 --- a/scripts/automation/trex_control_plane/console/old_console.py +++ b/scripts/automation/trex_control_plane/console/old_console.py @@ -636,6 +636,18 @@ class TRexConsole1(cmd.Cmd): res_ok = self.stop_traffic(port_list) return + def do_pause(self, line): + '''Pause active traffic in specified ports on TRex\n''' + parser = parsing_opts.gen_parser("stop", self.do_stop.__doc__, + parsing_opts.PORT_LIST_WITH_ALL) + opts = parser.parse_args(line.split()) + if opts is None: + # avoid further processing in this command + return + port_list = self.extract_port_list(opts) + res_ok = self.stop_traffic(port_list) + return + def help_stop(self): self.do_stop("-h") diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py index 995965fd..2a6fd2eb 100755 --- a/scripts/automation/trex_control_plane/console/trex_console.py +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -263,6 +263,16 @@ class TRexConsole(TRexGeneralCmd): '''stops port(s) transmitting traffic\n''' self.stateless_client.cmd_stop_line(line) + ############# pause + def do_pause(self, line): + '''pause port(s) transmitting traffic\n''' + self.stateless_client.cmd_pause_line(line) + + ############# resume + def do_resume(self, line): + '''resume port(s) transmitting traffic\n''' + self.stateless_client.cmd_resume_line(line) + def help_stop(self): diff --git a/scripts/exp/stl_basic_pause_resume0-0-ex.erf b/scripts/exp/stl_basic_pause_resume0-0-ex.erf Binary files differnew file mode 100644 index 00000000..7f833920 --- /dev/null +++ b/scripts/exp/stl_basic_pause_resume0-0-ex.erf diff --git a/scripts/exp/stl_bb_start_stop_delay1-0-ex.erf b/scripts/exp/stl_bb_start_stop_delay1-0-ex.erf Binary files differnew file mode 100644 index 00000000..08afdf4b --- /dev/null +++ b/scripts/exp/stl_bb_start_stop_delay1-0-ex.erf diff --git a/scripts/exp/stl_bb_start_stop_delay2-0-ex.erf b/scripts/exp/stl_bb_start_stop_delay2-0-ex.erf Binary files differnew file mode 100644 index 00000000..01a77466 --- /dev/null +++ b/scripts/exp/stl_bb_start_stop_delay2-0-ex.erf diff --git a/scripts/stl/burst_1000_pkt.yaml b/scripts/stl/burst_1000_pkt.yaml new file mode 100644 index 00000000..a2c3a908 --- /dev/null +++ b/scripts/stl/burst_1000_pkt.yaml @@ -0,0 +1,36 @@ +### Single stream UDP packet, 64B ### +##################################### +- name: stream0 + stream: + self_start: True + next_stream_id: stream1 + packet: + binary: cap2/udp_64B.pcap + mode: + type: single_burst + pps: 100 + total_pkts : 10000 + rx_stats: [] + +- name: stream1 + stream: + self_start: False + next_stream_id: stream2 + packet: + binary: cap2/udp_64B.pcap + mode: + type: single_burst + pps: 100 + total_pkts : 10000 + rx_stats: [] + +- name: stream2 + stream: + self_start: False + packet: + binary: cap2/udp_64B.pcap + mode: + type: single_burst + pps: 100 + total_pkts : 10000 + rx_stats: []
\ No newline at end of file diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp index f8dd20a1..a61fbb8f 100755 --- a/src/bp_sim.cpp +++ b/src/bp_sim.cpp @@ -3186,6 +3186,7 @@ bool CFlowGenListPerThread::Create(uint32_t thread_id, uint32_t max_threads){ + m_non_active_nodes = 0; m_terminated_by_master=false; m_flow_list =flow_list; m_core_id= core_id; @@ -3655,8 +3656,8 @@ CNodeGenerator::handle_slow_messages(uint8_t type, thread->check_msgs(); /* check messages */ m_v_if->flush_tx_queue(); /* flush pkt each timeout */ - /* on always (clean queue path) and queue empty - exit */ - if ( always && (m_p_queue.empty()) ) { + /* exit in case this is the last node*/ + if ( m_p_queue.size() == m_parent->m_non_active_nodes ) { thread->free_node(node); exit_scheduler = true; } else { diff --git a/src/bp_sim.h b/src/bp_sim.h index fcca2428..0da7fb99 100755 --- a/src/bp_sim.h +++ b/src/bp_sim.h @@ -3541,6 +3541,7 @@ public: CNodeGenerator m_node_gen; public: uint32_t m_cur_template; + uint32_t m_non_active_nodes; /* the number of non active nodes -> nodes that try to stop somthing */ uint64_t m_cur_flow_id; double m_cur_time_sec; double m_stop_time_sec; diff --git a/src/gtest/trex_stateless_gtest.cpp b/src/gtest/trex_stateless_gtest.cpp index 72a99e93..5b298023 100644 --- a/src/gtest/trex_stateless_gtest.cpp +++ b/src/gtest/trex_stateless_gtest.cpp @@ -77,10 +77,32 @@ public: void add_msg(TrexStatelessCpToDpMsgBase * msg){ m_msgs.push_back(msg); } + void add_command(CBasicStlDelayCommand & command){ m_commands.push_back(command); } + /* only if both port are idle we can exit */ + void add_command(CFlowGenListPerThread * core, + TrexStatelessCpToDpMsgBase * msg, + double time){ + + CGenNodeCommand *node = (CGenNodeCommand *)core->create_node() ; + + node->m_type = CGenNode::COMMAND; + + node->m_cmd = msg; + + /* make sure it will be scheduled after the current node */ + node->m_time = time ; + + CBasicStlDelayCommand command; + command.m_node =node; + + add_command(command); + } + + void clear(){ m_msgs.clear(); m_commands.clear(); @@ -94,6 +116,20 @@ protected: }; + +class CBasicStlSink { + +public: + CBasicStlSink(){ + m_core=0; + } + virtual void call_after_init(CBasicStl * m_obj)=0; + virtual void call_after_run(CBasicStl * m_obj)=0; + + CFlowGenListPerThread * m_core; +}; + + /** * handler for DP to CP messages * @@ -115,6 +151,7 @@ public: m_dump_json=false; m_dp_to_cp_handler = NULL; m_msg = NULL; + m_sink = NULL; } @@ -161,6 +198,10 @@ public: lpt=fl.m_threads_info[0]; + if ( m_sink ){ + m_sink->m_core =lpt; + } + char buf[100]; char buf_ex[100]; sprintf(buf,"%s-%d.erf",CGlobalInfo::m_options.out_file.c_str(),0); @@ -178,6 +219,10 @@ public: } } + if (m_sink) { + m_sink->call_after_init(this); + } + /* add the commands */ if (m_msg_queue.m_commands.size()>0) { for (auto cmd : m_msg_queue.m_commands) { @@ -203,6 +248,10 @@ public: printf(" %s \n",s.c_str()); } + if (m_sink) { + m_sink->call_after_run(this); + } + flush_dp_to_cp_messages(); m_msg_queue.clear(); @@ -216,6 +265,7 @@ public: double m_time_diff; bool m_dump_json; DpToCpHandler *m_dp_to_cp_handler; + CBasicStlSink * m_sink; TrexStatelessCpToDpMsgBase * m_msg; CNodeRing *m_ring_from_cp; @@ -329,6 +379,282 @@ TEST_F(basic_stl, load_pcap_file) { +class CBBStartPause0: public CBasicStlSink { +public: + + virtual void call_after_init(CBasicStl * m_obj); + virtual void call_after_run(CBasicStl * m_obj){ + }; + uint8_t m_port_id; +}; + + + +void CBBStartPause0::call_after_init(CBasicStl * m_obj){ + + TrexStatelessDpPause * lpPauseCmd = new TrexStatelessDpPause(m_port_id); + TrexStatelessDpResume * lpResumeCmd1 = new TrexStatelessDpResume(m_port_id); + + m_obj->m_msg_queue.add_command(m_core,lpPauseCmd, 5.0); /* command in delay of 5 sec */ + m_obj->m_msg_queue.add_command(m_core,lpResumeCmd1, 7.0);/* command in delay of 7 sec */ + +} + + + +/* start/stop/stop back to back */ +TEST_F(basic_stl, basic_pause_resume0) { + + CBasicStl t1; + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(7); + po->preview.setFileWrite(true); + po->out_file ="exp/stl_basic_pause_resume0"; + + TrexStreamsCompiler compile; + + uint8_t port_id=0; + + std::vector<TrexStream *> streams; + + TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0); + stream1->set_pps(1.0); + + + stream1->m_enabled = true; + stream1->m_self_start = true; + stream1->m_port_id= port_id; + + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + + // stream - clean + + TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/); + + assert(compile.compile(streams, comp_obj) ); + + TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ ); + + t1.m_msg_queue.add_msg(lpStartCmd); + + + CBBStartPause0 sink; + sink.m_port_id = port_id; + t1.m_sink = &sink; + + bool res=t1.init(); + + delete stream1 ; + + EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; +} + + +////////////////////////////////////////////////////////////// + + +class CBBStartStopDelay2: public CBasicStlSink { +public: + + virtual void call_after_init(CBasicStl * m_obj); + virtual void call_after_run(CBasicStl * m_obj){ + }; + uint8_t m_port_id; +}; + + + +void CBBStartStopDelay2::call_after_init(CBasicStl * m_obj){ + + TrexStatelessDpStop * lpStopCmd = new TrexStatelessDpStop(m_port_id); + TrexStatelessDpStop * lpStopCmd1 = new TrexStatelessDpStop(m_port_id); + + + TrexStreamsCompiler compile; + + uint8_t port_id=0; + + std::vector<TrexStream *> streams; + + TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0); + stream1->set_pps(1.0); + + + stream1->m_enabled = true; + stream1->m_self_start = true; + stream1->m_port_id= port_id; + + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000002); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + + // stream - clean + + TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/); + + assert(compile.compile(streams, comp_obj) ); + + + /* start with different event id */ + TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(m_port_id, 1, comp_obj.clone(), 10.0 /*sec */ ); + + + m_obj->m_msg_queue.add_command(m_core,lpStopCmd, 5.0); /* command in delay of 5 sec */ + m_obj->m_msg_queue.add_command(m_core,lpStopCmd1, 7.0);/* command in delay of 7 sec */ + m_obj->m_msg_queue.add_command(m_core,lpStartCmd, 7.5);/* command in delay of 7 sec */ + + delete stream1 ; + + +} + + + +/* start/stop/stop back to back */ +TEST_F(basic_stl, single_pkt_bb_start_stop_delay2) { + + CBasicStl t1; + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(7); + po->preview.setFileWrite(true); + po->out_file ="exp/stl_bb_start_stop_delay2"; + + TrexStreamsCompiler compile; + + uint8_t port_id=0; + + std::vector<TrexStream *> streams; + + TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0); + stream1->set_pps(1.0); + + + stream1->m_enabled = true; + stream1->m_self_start = true; + stream1->m_port_id= port_id; + + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + + // stream - clean + + TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/); + + assert(compile.compile(streams, comp_obj) ); + + TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ ); + + t1.m_msg_queue.add_msg(lpStartCmd); + + + CBBStartStopDelay2 sink; + sink.m_port_id = port_id; + t1.m_sink = &sink; + + bool res=t1.init(); + + delete stream1 ; + + EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; +} + + + + + + + +class CBBStartStopDelay1: public CBasicStlSink { +public: + + virtual void call_after_init(CBasicStl * m_obj); + virtual void call_after_run(CBasicStl * m_obj){ + }; + uint8_t m_port_id; +}; + + + +void CBBStartStopDelay1::call_after_init(CBasicStl * m_obj){ + + TrexStatelessDpStop * lpStopCmd = new TrexStatelessDpStop(m_port_id); + TrexStatelessDpStop * lpStopCmd1 = new TrexStatelessDpStop(m_port_id); + + m_obj->m_msg_queue.add_command(m_core,lpStopCmd, 5.0); /* command in delay of 5 sec */ + m_obj->m_msg_queue.add_command(m_core,lpStopCmd1, 7.0);/* command in delay of 7 sec */ +} + + + +/* start/stop/stop back to back */ +TEST_F(basic_stl, single_pkt_bb_start_stop_delay1) { + + CBasicStl t1; + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(7); + po->preview.setFileWrite(true); + po->out_file ="exp/stl_bb_start_stop_delay1"; + + TrexStreamsCompiler compile; + + uint8_t port_id=0; + + std::vector<TrexStream *> streams; + + TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0); + stream1->set_pps(1.0); + + + stream1->m_enabled = true; + stream1->m_self_start = true; + stream1->m_port_id= port_id; + + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + + // stream - clean + + TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/); + + assert(compile.compile(streams, comp_obj) ); + + TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ ); + + t1.m_msg_queue.add_msg(lpStartCmd); + + + CBBStartStopDelay1 sink; + sink.m_port_id = port_id; + t1.m_sink = &sink; + + bool res=t1.init(); + + delete stream1 ; + + EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; +} + + /* start/stop/stop back to back */ TEST_F(basic_stl, single_pkt_bb_start_stop3) { diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index 7dc217a3..be1bea12 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -130,6 +130,9 @@ TrexStatelessPort::start_traffic(double mul, double duration) { TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_obj, duration); + m_last_all_streams_continues = compiled_obj->get_all_streams_continues(); + m_last_duration =duration; + change_state(PORT_STATE_TX); send_message_to_dp(start_msg); @@ -174,7 +177,8 @@ TrexStatelessPort::start_traffic_max_pps(double max_pps, double duration) { void TrexStatelessPort::stop_traffic(void) { - if (m_port_state != PORT_STATE_TX) { + if (!( (m_port_state == PORT_STATE_TX) + || (m_port_state ==PORT_STATE_PAUSE) )) { return; } @@ -195,14 +199,18 @@ TrexStatelessPort::pause_traffic(void) { verify_state(PORT_STATE_TX); - #if 0 - /* generate a message to all the relevant DP cores to start transmitting */ - TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id); + if (m_last_all_streams_continues == false) { + throw TrexRpcException(" pause is supported when all streams are in continues mode "); + } + + if ( m_last_duration>0.0 ) { + throw TrexRpcException(" pause is supported when duration is not enable is start command "); + } + + TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpPause(m_port_id); send_message_to_dp(stop_msg); - m_port_state = PORT_STATE_UP_IDLE; - #endif change_state(PORT_STATE_PAUSE); } @@ -211,14 +219,11 @@ TrexStatelessPort::resume_traffic(void) { verify_state(PORT_STATE_PAUSE); - #if 0 /* generate a message to all the relevant DP cores to start transmitting */ - TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id); + TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpResume(m_port_id); send_message_to_dp(stop_msg); - m_port_state = PORT_STATE_UP_IDLE; - #endif change_state(PORT_STATE_TX); } diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index 9b74741c..6adb5fef 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -209,6 +209,8 @@ public: return m_dp_events; } + + private: @@ -262,6 +264,9 @@ private: /* holds the DP cores associated with this port */ std::vector<int> m_cores_id_list; + bool m_last_all_streams_continues; + double m_last_duration; + TrexDpPortEvents m_dp_events; }; diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp index f97b15b9..b28989be 100644 --- a/src/stateless/cp/trex_streams_compiler.cpp +++ b/src/stateless/cp/trex_streams_compiler.cpp @@ -143,6 +143,7 @@ private: * stream compiled object *************************************/ TrexStreamsCompiledObj::TrexStreamsCompiledObj(uint8_t port_id, double mul) : m_port_id(port_id), m_mul(mul) { + m_all_continues=false; } TrexStreamsCompiledObj::~TrexStreamsCompiledObj() { @@ -408,6 +409,7 @@ TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams, } + bool all_continues=true; /* for now we do something trivial, */ for (auto stream : streams) { @@ -415,6 +417,9 @@ TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams, if (!stream->m_enabled) { continue; } + if (stream->get_type() != TrexStream::stCONTINUOUS ) { + all_continues=false; + } int new_id= nodes.get(stream->m_stream_id)->m_compressed_stream_id; assert(new_id>=0); @@ -431,7 +436,7 @@ TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams, ); } - + obj.m_all_continues =all_continues; return true; } diff --git a/src/stateless/cp/trex_streams_compiler.h b/src/stateless/cp/trex_streams_compiler.h index 9f0c1f8e..70a31c5e 100644 --- a/src/stateless/cp/trex_streams_compiler.h +++ b/src/stateless/cp/trex_streams_compiler.h @@ -66,6 +66,10 @@ public: return (m_mul); } + bool get_all_streams_continues(){ + return (m_all_continues); + } + void Dump(FILE *fd); private: @@ -75,6 +79,7 @@ private: std::vector<obj_st> m_objs; + bool m_all_continues; uint8_t m_port_id; double m_mul; }; diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index 6430e520..9b4a6ad9 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -78,9 +78,10 @@ void CGenNodeStateless::refresh(){ } - void CGenNodeCommand::free_command(){ + assert(m_cmd); + m_cmd->on_node_remove(); delete m_cmd; } @@ -123,15 +124,56 @@ bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){ return (false); } +bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){ + + /* we are working with continues streams so we must be in transmit mode */ + assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE); + + for (auto dp_stream : m_active_nodes) { + CGenNodeStateless * node =dp_stream.m_node; + assert(node->get_port_id() == port_id); + assert(node->is_pause() == true); + node->set_pause(false); + } + m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING; + return (true); +} + + +bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){ + + /* we are working with continues streams so we must be in transmit mode */ + assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING); + + for (auto dp_stream : m_active_nodes) { + CGenNodeStateless * node =dp_stream.m_node; + assert(node->get_port_id() == port_id); + assert(node->is_pause() == false); + node->set_pause(true); + } + m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE; + return (true); +} + + +bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id, + bool stop_on_id, + int event_id){ -bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id){ - /* there could be race of stop after stop */ if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) { assert(m_active_streams==0); return false; } + /* there could be race of stop after stop */ + if ( stop_on_id ) { + if (event_id != m_event_id){ + /* we can't stop it is an old message */ + return false; + } + } + for (auto dp_stream : m_active_nodes) { CGenNodeStateless * node =dp_stream.m_node; assert(node->get_port_id() == port_id); @@ -215,7 +257,7 @@ bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node, if ( to_stop_port ) { /* call stop port explictly to move the state */ - stop_traffic(cur_node->m_port_id); + stop_traffic(cur_node->m_port_id,false,0); } return ( schedule ); @@ -284,6 +326,8 @@ TrexStatelessDpCore::run_once(){ } + + void TrexStatelessDpCore::start() { @@ -330,7 +374,8 @@ TrexStatelessDpCore::add_global_duration(double duration){ /* add per port exit */ void TrexStatelessDpCore::add_port_duration(double duration, - uint8_t port_id){ + uint8_t port_id, + int event_id){ if (duration > 0.0) { CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ; @@ -339,7 +384,16 @@ TrexStatelessDpCore::add_port_duration(double duration, /* make sure it will be scheduled after the current node */ node->m_time = m_core->m_cur_time_sec + duration ; - node->m_cmd = new TrexStatelessDpStop(port_id); + TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id); + + + /* test this */ + m_core->m_non_active_nodes++; + cmd->set_core_ptr(m_core); + cmd->set_event_id(event_id); + cmd->set_wait_for_event_id(true); + + node->m_cmd = cmd; m_core->m_node_gen.add_node((CGenNode *)node); } @@ -382,6 +436,7 @@ TrexStatelessDpCore::add_cont_stream(TrexStatelessDpPerPort * lp_port, uint16_t pkt_size = stream->m_pkt.len; const uint8_t *stream_pkt = stream->m_pkt.binary; + node->m_pause =0; node->m_stream_type = stream->m_type; node->m_next_time_offset = 1.0 / (stream->get_pps() * comp->get_multiplier()) ; @@ -450,15 +505,14 @@ TrexStatelessDpCore::add_cont_stream(TrexStatelessDpPerPort * lp_port, void TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj, - double duration) { + double duration, + int event_id) { -#if 0 - /* TBD to remove ! */ - obj->Dump(stdout); -#endif TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id()); lp_port->m_active_streams = 0; + lp_port->set_event_id(event_id); + /* no nodes in the list */ assert(lp_port->m_active_nodes.size()==0); @@ -491,7 +545,7 @@ TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj, if ( duration > 0.0 ){ - add_port_duration( duration ,obj->get_port_id() ); + add_port_duration( duration ,obj->get_port_id(),event_id ); } } @@ -510,24 +564,45 @@ bool TrexStatelessDpCore::are_all_ports_idle(){ } +void +TrexStatelessDpCore::resume_traffic(uint8_t port_id){ + + TrexStatelessDpPerPort * lp_port = get_port_db(port_id); + + lp_port->resume_traffic(port_id); +} + + +void +TrexStatelessDpCore::pause_traffic(uint8_t port_id){ + + TrexStatelessDpPerPort * lp_port = get_port_db(port_id); + + lp_port->pause_traffic(port_id); +} + + void -TrexStatelessDpCore::stop_traffic(uint8_t port_id) { +TrexStatelessDpCore::stop_traffic(uint8_t port_id, + bool stop_on_id, + int event_id) { /* we cannot remove nodes not from the top of the queue so for every active node - make sure next time the scheduler invokes it, it will be free */ TrexStatelessDpPerPort * lp_port = get_port_db(port_id); - if ( lp_port->stop_traffic(port_id) == false){ + if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){ /* nothing to do ! already stopped */ + //printf(" skip .. %f\n",m_core->m_cur_time_sec); return; } - +#if 0 if ( are_all_ports_idle() ) { - - schedule_exit(); + /* just a place holder if we will need to do somthing in that case */ } +#endif /* inform the control plane we stopped - this might be a async stop (streams ended) diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h index 326bbe30..eda1ae59 100644 --- a/src/stateless/dp/trex_stateless_dp_core.h +++ b/src/stateless/dp/trex_stateless_dp_core.h @@ -53,7 +53,9 @@ public: /* states */ enum state_e { ppSTATE_IDLE, - ppSTATE_TRANSMITTING + ppSTATE_TRANSMITTING, + ppSTATE_PAUSE + }; public: @@ -62,7 +64,13 @@ public: void create(CFlowGenListPerThread * core); - bool stop_traffic(uint8_t port_id); + bool pause_traffic(uint8_t port_id); + + bool resume_traffic(uint8_t port_id); + + bool stop_traffic(uint8_t port_id, + bool stop_on_id, + int event_id); bool update_number_of_active_streams(uint32_t d); @@ -137,13 +145,25 @@ public: * @param pkt * @param pkt_len */ - void start_traffic(TrexStreamsCompiledObj *obj, double duration = -1); + void start_traffic(TrexStreamsCompiledObj *obj, + double duration, + int m_event_id); + + + /* pause the streams, work only if all are continues */ + void pause_traffic(uint8_t port_id); + + + + void resume_traffic(uint8_t port_id); + /** + * * stop all traffic for this core * */ - void stop_traffic(uint8_t port_id); + void stop_traffic(uint8_t port_id,bool stop_on_id, int event_id); /* return if all ports are idel */ @@ -225,7 +245,8 @@ private: void add_port_duration(double duration, - uint8_t port_id); + uint8_t port_id, + int event_id); void add_global_duration(double duration); diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h index 1e53887b..ccf99eaa 100644 --- a/src/stateless/dp/trex_stream_node.h +++ b/src/stateless/dp/trex_stream_node.h @@ -29,6 +29,7 @@ class TrexStatelessDpCore; #include <trex_stream.h> class TrexStatelessCpToDpMsgBase; +class CFlowGenListPerThread; struct CGenNodeCommand : public CGenNodeBase { @@ -47,6 +48,7 @@ public: static_assert(sizeof(CGenNodeCommand) == sizeof(CGenNode), "sizeof(CGenNodeCommand) != sizeof(CGenNode)" ); + /* this is a event for stateless */ struct CGenNodeStateless : public CGenNodeBase { friend class TrexStatelessDpCore; @@ -73,7 +75,7 @@ private: stream_state_t m_state; uint8_t m_port_id; uint8_t m_stream_type; /* see TrexStream::STREAM_TYPE ,stream_type_t */ - uint8_t m_pad; + uint8_t m_pause; uint32_t m_single_burst; /* the number of bursts in case of burst */ uint32_t m_single_burst_refill; @@ -111,7 +113,18 @@ public: /* only to be safe */ m_ref_stream_info= NULL; m_next_stream= NULL; + } + bool is_pause(){ + return (m_pause==1?true:false); + } + + void set_pause(bool enable){ + if ( enable ){ + m_pause=1; + }else{ + m_pause=0; + } } inline uint8_t get_stream_type(){ @@ -142,7 +155,10 @@ public: void refresh(); inline void handle_continues(CFlowGenListPerThread *thread) { - thread->m_node_gen.m_v_if->send_node( (CGenNode *)this); + + if (unlikely (is_pause()==false)) { + thread->m_node_gen.m_v_if->send_node( (CGenNode *)this); + } /* in case of continues */ m_time += m_next_time_offset; diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index c861d0fa..ec8b7839 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -23,6 +23,7 @@ limitations under the License. #include <trex_stateless_dp_core.h> #include <trex_streams_compiler.h> #include <trex_stateless.h> +#include <bp_sim.h> #include <string.h> @@ -60,11 +61,8 @@ TrexStatelessDpStart::~TrexStatelessDpStart() { bool TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) { - /* mark the event id for DP response */ - dp_core->get_port_db(m_port_id)->set_event_id(m_event_id); - /* staet traffic */ - dp_core->start_traffic(m_obj, m_duration); + dp_core->start_traffic(m_obj, m_duration,m_event_id); return true; } @@ -75,17 +73,57 @@ TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) { bool TrexStatelessDpStop::handle(TrexStatelessDpCore *dp_core) { - dp_core->stop_traffic(m_port_id); + + dp_core->stop_traffic(m_port_id,m_stop_only_for_event_id,m_event_id); return true; } + +void TrexStatelessDpStop::on_node_remove(){ + if ( m_core ) { + assert(m_core->m_non_active_nodes>0); + m_core->m_non_active_nodes--; + } +} + + +TrexStatelessCpToDpMsgBase * TrexStatelessDpPause::clone(){ + + TrexStatelessDpPause *new_msg = new TrexStatelessDpPause(m_port_id); + return new_msg; +} + + +bool TrexStatelessDpPause::handle(TrexStatelessDpCore *dp_core){ + dp_core->pause_traffic(m_port_id); + return (true); +} + + + +TrexStatelessCpToDpMsgBase * TrexStatelessDpResume::clone(){ + TrexStatelessDpResume *new_msg = new TrexStatelessDpResume(m_port_id); + return new_msg; +} + +bool TrexStatelessDpResume::handle(TrexStatelessDpCore *dp_core){ + dp_core->resume_traffic(m_port_id); + return (true); +} + + /** * clone for DP stop message * */ TrexStatelessCpToDpMsgBase * TrexStatelessDpStop::clone() { - TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpStop(m_port_id); + TrexStatelessDpStop *new_msg = new TrexStatelessDpStop(m_port_id); + + new_msg->set_event_id(m_event_id); + new_msg->set_wait_for_event_id(m_stop_only_for_event_id); + /* set back pointer to master */ + new_msg->set_core_ptr(m_core); return new_msg; } diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index 2fb5a024..6bd0dbe3 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -27,6 +27,7 @@ limitations under the License. class TrexStatelessDpCore; class TrexStreamsCompiledObj; +class CFlowGenListPerThread; /** * defines the base class for CP to DP messages @@ -61,6 +62,10 @@ public: return ( m_quit_scheduler); } + /* this node is called from scheduler in case the node is free */ + virtual void on_node_remove(){ + } + /* no copy constructor */ TrexStatelessCpToDpMsgBase(TrexStatelessCpToDpMsgBase &) = delete; @@ -94,6 +99,42 @@ private: }; +class TrexStatelessDpPause : public TrexStatelessCpToDpMsgBase { +public: + + TrexStatelessDpPause(uint8_t port_id) : m_port_id(port_id) { + } + + + virtual TrexStatelessCpToDpMsgBase * clone(); + + + virtual bool handle(TrexStatelessDpCore *dp_core); + + +private: + uint8_t m_port_id; +}; + + +class TrexStatelessDpResume : public TrexStatelessCpToDpMsgBase { +public: + + TrexStatelessDpResume(uint8_t port_id) : m_port_id(port_id) { + } + + + virtual TrexStatelessCpToDpMsgBase * clone(); + + + virtual bool handle(TrexStatelessDpCore *dp_core); + + +private: + uint8_t m_port_id; +}; + + /** * a message to stop traffic * @@ -103,14 +144,50 @@ class TrexStatelessDpStop : public TrexStatelessCpToDpMsgBase { public: TrexStatelessDpStop(uint8_t port_id) : m_port_id(port_id) { + m_stop_only_for_event_id=false; + m_event_id=0; + m_core = NULL; } virtual TrexStatelessCpToDpMsgBase * clone(); + virtual bool handle(TrexStatelessDpCore *dp_core); + void set_core_ptr(CFlowGenListPerThread * core){ + m_core = core; + } + + CFlowGenListPerThread * get_core_ptr(){ + return ( m_core); + } + + + void set_event_id(int event_id){ + m_event_id = event_id; + } + + void set_wait_for_event_id(bool wait){ + m_stop_only_for_event_id = wait; + } + + virtual void on_node_remove(); + + + bool get_is_stop_by_event_id(){ + return (m_stop_only_for_event_id); + } + + int get_event_id(){ + return (m_event_id); + } + private: uint8_t m_port_id; + bool m_stop_only_for_event_id; + int m_event_id; + CFlowGenListPerThread * m_core ; + }; /** |