summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitignore4
-rw-r--r--scripts/exp/stl_simple_prog1-0-ex.erfbin0 -> 880 bytes
-rw-r--r--scripts/exp/stl_simple_prog2-0-ex.erfbin0 -> 792 bytes
-rw-r--r--scripts/exp/stl_simple_prog3-0-ex.erfbin0 -> 4312 bytes
-rw-r--r--scripts/stl/burst_1pkt.yaml36
-rw-r--r--scripts/stl/burst_1pkt_1burst.yaml13
-rwxr-xr-xsrc/bp_sim.cpp60
-rwxr-xr-xsrc/bp_sim.h23
-rw-r--r--src/gtest/trex_stateless_gtest.cpp454
-rwxr-xr-xsrc/main.cpp1
-rwxr-xr-xsrc/main_dpdk.cpp98
-rw-r--r--src/rpc-server/commands/trex_rpc_cmd_stream.cpp6
-rw-r--r--src/stateless/cp/trex_stream.cpp65
-rw-r--r--src/stateless/cp/trex_stream.h29
-rw-r--r--src/stateless/cp/trex_streams_compiler.cpp353
-rw-r--r--src/stateless/cp/trex_streams_compiler.h36
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp341
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.h95
-rw-r--r--src/stateless/dp/trex_stream_node.h101
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp39
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h47
21 files changed, 1680 insertions, 121 deletions
diff --git a/.gitignore b/.gitignore
index e35015d6..183484fb 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,6 +21,10 @@ scripts/exp/stl_multi_pkt2-0.erf
scripts/exp/stl_single_pkt_burst1-0.erf
scripts/exp/stl_single_sctp_pkt-0.erf
scripts/exp/stl_single_stream-0.erf
+scripts/exp/stl_simple_prog1-0.erf
+scripts/exp/stl_simple_prog2-0.erf
+scripts/exp/stl_simple_prog3-0.erf
+
*.pyc
diff --git a/scripts/exp/stl_simple_prog1-0-ex.erf b/scripts/exp/stl_simple_prog1-0-ex.erf
new file mode 100644
index 00000000..374256c2
--- /dev/null
+++ b/scripts/exp/stl_simple_prog1-0-ex.erf
Binary files differ
diff --git a/scripts/exp/stl_simple_prog2-0-ex.erf b/scripts/exp/stl_simple_prog2-0-ex.erf
new file mode 100644
index 00000000..3ec46e2f
--- /dev/null
+++ b/scripts/exp/stl_simple_prog2-0-ex.erf
Binary files differ
diff --git a/scripts/exp/stl_simple_prog3-0-ex.erf b/scripts/exp/stl_simple_prog3-0-ex.erf
new file mode 100644
index 00000000..4da3438c
--- /dev/null
+++ b/scripts/exp/stl_simple_prog3-0-ex.erf
Binary files differ
diff --git a/scripts/stl/burst_1pkt.yaml b/scripts/stl/burst_1pkt.yaml
new file mode 100644
index 00000000..9ddc077a
--- /dev/null
+++ b/scripts/stl/burst_1pkt.yaml
@@ -0,0 +1,36 @@
+### Single stream UDP packet, 64B ###
+#####################################
+- name: stream0
+ stream:
+ self_start: True
+ next_stream_id: stream1
+ packet:
+ binary: cap2/udp_64B.pcap
+ mode:
+ type: single_burst
+ pps: 100
+ total_pkts : 100
+ rx_stats: []
+
+- name: stream1
+ stream:
+ self_start: False
+ next_stream_id: stream2
+ packet:
+ binary: cap2/udp_64B.pcap
+ mode:
+ type: single_burst
+ pps: 100
+ total_pkts : 200
+ rx_stats: []
+
+- name: stream2
+ stream:
+ self_start: False
+ packet:
+ binary: cap2/udp_64B.pcap
+ mode:
+ type: single_burst
+ pps: 100
+ total_pkts : 300
+ rx_stats: [] \ No newline at end of file
diff --git a/scripts/stl/burst_1pkt_1burst.yaml b/scripts/stl/burst_1pkt_1burst.yaml
new file mode 100644
index 00000000..3bf6a064
--- /dev/null
+++ b/scripts/stl/burst_1pkt_1burst.yaml
@@ -0,0 +1,13 @@
+### Single stream UDP packet, 64B ###
+#####################################
+- name: stream0
+ stream:
+ self_start: True
+ packet:
+ binary: cap2/udp_64B.pcap
+ mode:
+ type: single_burst
+ pps: 100
+ total_pkts : 100
+ rx_stats: []
+
diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp
index 39d46d16..d35ae68a 100755
--- a/src/bp_sim.cpp
+++ b/src/bp_sim.cpp
@@ -26,6 +26,7 @@ limitations under the License.
#include <common/basic_utils.h>
#include <trex_stream_node.h>
+#include <trex_stateless_messaging.h>
#undef VALG
@@ -218,7 +219,7 @@ bool CPlatformSocketInfoConfig::init(){
m_max_threads_per_dual_if = num_threads;
}else{
if (lp->m_threads.size() != num_threads) {
- printf("ERROR number of threads per dual ports should be the same for all dual ports\n");
+ printf("ERROR, the number of threads per dual ports should be the same for all dual ports\n");
exit(1);
}
}
@@ -230,7 +231,7 @@ bool CPlatformSocketInfoConfig::init(){
uint8_t phy_thread = lp->m_threads[j];
if (phy_thread>MAX_THREADS_SUPPORTED) {
- printf("ERROR physical thread id is %d higher than max %d \n",phy_thread,MAX_THREADS_SUPPORTED);
+ printf("ERROR, physical thread id is %d higher than max %d \n",phy_thread,MAX_THREADS_SUPPORTED);
exit(1);
}
@@ -478,8 +479,8 @@ void CPlatformSocketInfo::dump(FILE *fd){
void CRteMemPool::dump_in_case_of_error(FILE *fd){
- fprintf(fd," ERROR ERROR there is no enough memory in socket %d \n",m_pool_id);
- fprintf(fd," Try to enlarge the memory values in the configuration file /etc/trex_cfg.yaml \n");
+ fprintf(fd," ERROR,there is no enough memory in socket %d \n",m_pool_id);
+ fprintf(fd," Try to enlarge the memory values in the configuration file -/etc/trex_cfg.yaml ,see manual for more detail \n");
dump(fd);
}
@@ -3129,6 +3130,13 @@ void CNodeGenerator::remove_all(CFlowGenListPerThread * thread){
while (!m_p_queue.empty()) {
node = m_p_queue.top();
m_p_queue.pop();
+ /* sanity check */
+ if (node->m_type == CGenNode::STATELESS_PKT) {
+ CGenNodeStateless * p=(CGenNodeStateless *)node;
+ /* need to be changed in Pause support */
+ assert(p->is_mask_for_free());
+ }
+
thread->free_node( node);
}
}
@@ -3144,6 +3152,7 @@ int CNodeGenerator::open_file(std::string file_name,
return (0);
}
+
int CNodeGenerator::close_file(CFlowGenListPerThread * thread){
remove_all(thread);
BP_ASSERT(m_v_if);
@@ -3153,7 +3162,7 @@ int CNodeGenerator::close_file(CFlowGenListPerThread * thread){
int CNodeGenerator::update_stl_stats(CGenNodeStateless *node_sl){
if ( m_preview_mode.getVMode() >2 ){
- fprintf(stdout," %llu ,", (unsigned long long)m_cnt);
+ fprintf(stdout," %4lu ,", (ulong)m_cnt);
node_sl->Dump(stdout);
m_cnt++;
}
@@ -3177,6 +3186,7 @@ bool CFlowGenListPerThread::Create(uint32_t thread_id,
uint32_t max_threads){
+ m_terminated_by_master=false;
m_flow_list =flow_list;
m_core_id= core_id;
m_tcp_dpc= 0;
@@ -3515,7 +3525,7 @@ int CNodeGenerator::flush_file(dsec_t max_time,
#endif
/* if the stream has been deactivated - end */
- if (unlikely(!node_sl->is_active())) {
+ if ( unlikely( node_sl->is_mask_for_free() ) ) {
thread->free_node(node);
} else {
node_sl->handle(thread);
@@ -3573,6 +3583,9 @@ int CNodeGenerator::flush_file(dsec_t max_time,
}
}
+ if ( thread->is_terminated_by_master() ) {
+ return (0);
+ }
if (!always) {
old_offset =offset;
@@ -3659,8 +3672,19 @@ CNodeGenerator::handle_slow_messages(uint8_t type,
exit_scheduler = true;
} else {
- printf(" ERROR type is not valid %d \n",type);
- assert(0);
+ if ( type == CGenNode::COMMAND) {
+ m_p_queue.pop();
+ CGenNodeCommand *node_cmd = (CGenNodeCommand *)node;
+ {
+ TrexStatelessCpToDpMsgBase * cmd=node_cmd->m_cmd;
+ cmd->handle(&thread->m_stateless_dp_info);
+ exit_scheduler = cmd->is_quit();
+ thread->free_node((CGenNode *)node_cmd);/* free the node */
+ }
+ }else{
+ printf(" ERROR type is not valid %d \n",type);
+ assert(0);
+ }
}
return exit_scheduler;
@@ -3952,7 +3976,7 @@ void CFlowGenListPerThread::start_stateless_simulation_file(std::string erf_file
}
void CFlowGenListPerThread::stop_stateless_simulation_file(){
- m_node_gen.close_file(this);
+ m_node_gen.m_v_if->close_file();
}
void CFlowGenListPerThread::start_stateless_daemon_simulation(){
@@ -3962,6 +3986,15 @@ void CFlowGenListPerThread::start_stateless_daemon_simulation(){
}
+
+/* return true if we need to shedule next_stream, */
+
+bool CFlowGenListPerThread::set_stateless_next_node( CGenNodeStateless * cur_node,
+ CGenNodeStateless * next_node){
+ return ( m_stateless_dp_info.set_stateless_next_node(cur_node,next_node) );
+}
+
+
void CFlowGenListPerThread::start_stateless_daemon(){
m_cur_time_sec = 0;
m_stateless_dp_info.start();
@@ -4010,11 +4043,13 @@ void CFlowGenListPerThread::start_generate_stateful(std::string erf_file_name,
#endif
m_node_gen.flush_file(c_stop_sec,d_time_flow, false,this,old_offset);
+
+
#ifdef VALG
CALLGRIND_STOP_INSTRUMENTATION;
printf (" %llu \n",os_get_hr_tick_64()-_start_time);
#endif
- if ( !CGlobalInfo::m_options.preview.getNoCleanFlowClose() ){
+ if ( !CGlobalInfo::m_options.preview.getNoCleanFlowClose() && (is_terminated_by_master()==false) ){
/* clean close */
m_node_gen.flush_file(m_cur_time_sec, d_time_flow, true,this,old_offset);
}
@@ -6882,6 +6917,11 @@ void CGenNodeBase::free_base(){
p->free_stl_node();
return;
}
+ if ( m_type == COMMAND ) {
+ CGenNodeCommand* p=(CGenNodeCommand*)this;
+ p->free_command();
+ }
+
}
diff --git a/src/bp_sim.h b/src/bp_sim.h
index eef5576b..be462a91 100755
--- a/src/bp_sim.h
+++ b/src/bp_sim.h
@@ -62,6 +62,12 @@ limitations under the License.
#undef NAT_TRACE_
+static inline double
+usec_to_sec(double usec) {
+ return (usec / (1000 * 1000));
+}
+
+
#define FORCE_NO_INLINE __attribute__ ((noinline))
#define MAX_LATENCY_PORTS 12
@@ -1383,7 +1389,9 @@ public:
FLOW_SYNC =4, /* called evey 1 msec */
STATELESS_PKT =5,
EXIT_SCHED =6,
- EXIT_PORT_SCHED =7
+ COMMAND =7,
+
+ EXIT_PORT_SCHED =8
};
@@ -1898,6 +1906,8 @@ public:
public:
void add_node(CGenNode * mynode);
void remove_all(CFlowGenListPerThread * thread);
+ void remove_all_stateless(CFlowGenListPerThread * thread);
+
int open_file(std::string file_name,
CPreviewMode * preview);
int close_file(CFlowGenListPerThread * thread);
@@ -3400,6 +3410,13 @@ public:
uint32_t max_threads);
void Delete();
+ void set_terminate_mode(bool is_terminate){
+ m_terminated_by_master =is_terminate;
+ }
+ bool is_terminated_by_master(){
+ return (m_terminated_by_master);
+ }
+
void set_vif(CVirtualIF * v_if){
m_node_gen.set_vif(v_if);
}
@@ -3448,6 +3465,9 @@ public:
/* close a file for simulation */
void stop_stateless_simulation_file();
+ /* return true if we need to shedule next_stream, */
+ bool set_stateless_next_node( CGenNodeStateless * cur_node,
+ CGenNodeStateless * next_node);
void Dump(FILE *fd);
@@ -3542,6 +3562,7 @@ private:
flow_id_node_t m_flow_id_to_node_lookup;
TrexStatelessDpCore m_stateless_dp_info;
+ bool m_terminated_by_master;
private:
uint8_t m_cacheline_pad[RTE_CACHE_LINE_SIZE][19]; // improve prefech
diff --git a/src/gtest/trex_stateless_gtest.cpp b/src/gtest/trex_stateless_gtest.cpp
index c845c32e..93e8387f 100644
--- a/src/gtest/trex_stateless_gtest.cpp
+++ b/src/gtest/trex_stateless_gtest.cpp
@@ -1,5 +1,5 @@
/*
- Hanoh Haim
+ Hanoch Haim
Cisco Systems, Inc.
*/
@@ -29,7 +29,7 @@ limitations under the License.
#include <trex_stream.h>
#include <trex_stateless_port.h>
#include <trex_rpc_server_api.h>
-
+#include <iostream>
#define EXPECT_EQ_UINT32(a,b) EXPECT_EQ((uint32_t)(a),(uint32_t)(b))
@@ -91,6 +91,7 @@ public:
lpt->start_stateless_daemon_simulation();
+
//lpt->m_node_gen.DumpHist(stdout);
cmp.d_sec = m_time_diff;
@@ -109,7 +110,6 @@ public:
return (res);
}
-
public:
int m_threads;
double m_time_diff;
@@ -224,6 +224,200 @@ TEST_F(basic_stl, load_pcap_file) {
}
+
+TEST_F(basic_stl, simple_prog3) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/stl_simple_prog3";
+
+ TrexStreamsCompiler compile;
+
+
+ std::vector<TrexStream *> streams;
+
+ /* stream1 */
+ TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST, 0,100);
+ stream1->set_pps(1.0);
+ stream1->set_single_burst(5);
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_next_stream_id=200;
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+
+ /* stream1 */
+
+ TrexStream * stream2 = new TrexStream(TrexStream::stMULTI_BURST, 0,200);
+ stream2->set_pps(1.0);
+ stream2->m_isg_usec = 1000000; /*time betwean stream 1 to stream 2 */
+ stream2->m_enabled = true;
+ stream2->m_self_start = false;
+ stream2->set_multi_burst(5,
+ 3,
+ 2000000.0);
+
+ // next stream is 100 - loop
+ stream2->m_next_stream_id=100;
+
+
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000002);
+ pcap.clone_packet_into_stream(stream2);
+ streams.push_back(stream2);
+
+
+ TrexStreamsCompiledObj comp_obj(0,1.0);
+
+ EXPECT_TRUE(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart( comp_obj.clone(), 50.0 );
+
+
+ t1.m_msg = lpstart;
+
+ bool res=t1.init();
+
+ delete stream1 ;
+ delete stream2 ;
+
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+
+TEST_F(basic_stl, simple_prog2) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/stl_simple_prog2";
+
+ TrexStreamsCompiler compile;
+
+
+ std::vector<TrexStream *> streams;
+
+ /* stream1 */
+ TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST, 0,100);
+ stream1->set_pps(1.0);
+ stream1->set_single_burst(5);
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_next_stream_id=200;
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+
+ /* stream1 */
+
+ TrexStream * stream2 = new TrexStream(TrexStream::stSINGLE_BURST, 0,200);
+ stream2->set_pps(1.0);
+ stream2->set_single_burst(5);
+ stream2->m_isg_usec = 2000000; /*time betwean stream 1 to stream 2 */
+ stream2->m_enabled = true;
+ stream2->m_self_start = false;
+
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000002);
+ pcap.clone_packet_into_stream(stream2);
+ streams.push_back(stream2);
+
+
+ TrexStreamsCompiledObj comp_obj(0,1.0);
+
+ EXPECT_TRUE(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart( comp_obj.clone(), 10.0 );
+
+
+ t1.m_msg = lpstart;
+
+ bool res=t1.init();
+
+ delete stream1 ;
+ delete stream2 ;
+
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+
+
+TEST_F(basic_stl, simple_prog1) {
+
+ CBasicStl t1;
+ CParserOption * po =&CGlobalInfo::m_options;
+ po->preview.setVMode(7);
+ po->preview.setFileWrite(true);
+ po->out_file ="exp/stl_simple_prog1";
+
+ TrexStreamsCompiler compile;
+
+
+ std::vector<TrexStream *> streams;
+
+ /* stream1 */
+ TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST, 0,100);
+ stream1->set_pps(1.0);
+ stream1->set_single_burst(5);
+ stream1->m_enabled = true;
+ stream1->m_self_start = true;
+ stream1->m_next_stream_id=200;
+
+ CPcapLoader pcap;
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000001);
+ pcap.clone_packet_into_stream(stream1);
+
+ streams.push_back(stream1);
+
+
+ /* stream1 */
+
+ TrexStream * stream2 = new TrexStream(TrexStream::stSINGLE_BURST, 0,200);
+ stream2->set_pps(1.0);
+ stream2->set_single_burst(5);
+ stream2->m_enabled = true;
+ stream2->m_self_start = false;
+
+ pcap.load_pcap_file("cap2/udp_64B.pcap",0);
+ pcap.update_ip_src(0x10000002);
+ pcap.clone_packet_into_stream(stream2);
+ streams.push_back(stream2);
+
+
+ TrexStreamsCompiledObj comp_obj(0,1.0);
+
+ EXPECT_TRUE(compile.compile(streams, comp_obj) );
+
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart( comp_obj.clone(), 10.0 );
+
+
+ t1.m_msg = lpstart;
+
+ bool res=t1.init();
+
+ delete stream1 ;
+ delete stream2 ;
+
+ EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
+}
+
+
+
TEST_F(basic_stl, single_pkt_burst1) {
CBasicStl t1;
@@ -239,7 +433,7 @@ TEST_F(basic_stl, single_pkt_burst1) {
TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST, 0,0);
stream1->set_pps(1.0);
- stream1->set_signle_burtst(5);
+ stream1->set_single_burst(5);
stream1->m_enabled = true;
stream1->m_self_start = true;
@@ -279,6 +473,7 @@ TEST_F(basic_stl, single_pkt) {
TrexStreamsCompiler compile;
+ uint8_t port_id=0;
std::vector<TrexStream *> streams;
@@ -288,23 +483,23 @@ TEST_F(basic_stl, single_pkt) {
stream1->m_enabled = true;
stream1->m_self_start = true;
+ stream1->m_port_id= port_id;
CPcapLoader pcap;
pcap.load_pcap_file("cap2/udp_64B.pcap",0);
pcap.update_ip_src(0x10000001);
pcap.clone_packet_into_stream(stream1);
-
-
+
streams.push_back(stream1);
// stream - clean
- TrexStreamsCompiledObj comp_obj(0,1.0);
+ TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/);
assert(compile.compile(streams, comp_obj) );
- TrexStatelessDpStart * lpstart = new TrexStatelessDpStart( comp_obj.clone(), 10 );
+ TrexStatelessDpStart * lpstart = new TrexStatelessDpStart( comp_obj.clone(), 10.0 /*sec */ );
t1.m_msg = lpstart;
@@ -344,7 +539,7 @@ TEST_F(basic_stl, multi_pkt1) {
streams.push_back(stream1);
- TrexStream * stream2 = new TrexStream(TrexStream::stCONTINUOUS,0,0);
+ TrexStream * stream2 = new TrexStream(TrexStream::stCONTINUOUS,0,1);
stream2->set_pps(2.0);
stream2->m_enabled = true;
@@ -482,6 +677,247 @@ TEST_F(basic_stl, multi_burst1) {
EXPECT_EQ_UINT32(1, res?1:0)<< "pass";
}
+/********************************************* Itay Tests Start *************************************/
+
+/**
+ * check that continous stream does not point to another stream
+ * (makes no sense)
+ */
+TEST_F(basic_stl, compile_bad_1) {
+
+ TrexStreamsCompiler compile;
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,2);
+ stream1->m_enabled = true;
+ stream1->set_pps(52.0);
+ stream1->m_next_stream_id = 3;
+
+ streams.push_back(stream1);
+
+ TrexStreamsCompiledObj comp_obj(0,1.0);
+
+ std::string err_msg;
+ EXPECT_FALSE(compile.compile(streams, comp_obj, &err_msg));
+
+ delete stream1;
+
+}
+
+/**
+ * check for streams pointing to non exsistant streams
+ *
+ * @author imarom (16-Nov-15)
+ */
+TEST_F(basic_stl, compile_bad_2) {
+
+ TrexStreamsCompiler compile;
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST,0,1);
+ stream1->m_enabled = true;
+ stream1->set_pps(1.0);
+ stream1->set_single_burst(200);
+
+ /* non existant next stream */
+ stream1->m_next_stream_id = 5;
+
+ TrexStream * stream2 = new TrexStream(TrexStream::stCONTINUOUS,0,2);
+ stream1->set_pps(52.0);
+
+ streams.push_back(stream1);
+ streams.push_back(stream2);
+
+ TrexStreamsCompiledObj comp_obj(0,1.0);
+
+ std::string err_msg;
+ EXPECT_FALSE(compile.compile(streams, comp_obj, &err_msg));
+
+ delete stream1;
+ delete stream2;
+
+}
+
+/**
+ * check for "dead streams" in the mesh
+ * a streams that cannot be reached
+ *
+ * @author imarom (16-Nov-15)
+ */
+TEST_F(basic_stl, compile_bad_3) {
+
+ TrexStreamsCompiler compile;
+ std::vector<TrexStream *> streams;
+ TrexStream *stream;
+
+ /* stream 1 */
+ stream = new TrexStream(TrexStream::stSINGLE_BURST, 0, 231);
+ stream->m_enabled = true;
+ stream->set_pps(1.0);
+ stream->set_single_burst(200);
+
+ stream->m_next_stream_id = 5481;
+ stream->m_self_start = true;
+
+ streams.push_back(stream);
+
+ /* stream 2 */
+ stream = new TrexStream(TrexStream::stCONTINUOUS, 0, 5481);
+ stream->m_enabled = true;
+ stream->m_next_stream_id = -1;
+ stream->m_self_start = false;
+ stream->set_pps(52.0);
+
+ streams.push_back(stream);
+
+ /* stream 3 */
+
+ stream = new TrexStream(TrexStream::stSINGLE_BURST, 0, 1928);
+ stream->m_enabled = true;
+ stream->set_pps(1.0);
+ stream->set_single_burst(200);
+
+ stream->m_next_stream_id = -1;
+ stream->m_self_start = true;
+
+ streams.push_back(stream);
+
+ /* stream 4 */
+
+ stream = new TrexStream(TrexStream::stSINGLE_BURST, 0, 41231);
+ stream->m_enabled = true;
+ stream->set_pps(1.0);
+ stream->set_single_burst(200);
+
+ stream->m_next_stream_id = 3928;
+ stream->m_self_start = false;
+
+ streams.push_back(stream);
+
+ /* stream 5 */
+
+ stream = new TrexStream(TrexStream::stSINGLE_BURST, 0, 3928);
+ stream->m_enabled = true;
+ stream->set_pps(1.0);
+ stream->set_single_burst(200);
+
+ stream->m_next_stream_id = 41231;
+ stream->m_self_start = false;
+
+ streams.push_back(stream);
+
+ /* compile */
+ TrexStreamsCompiledObj comp_obj(0,1.0);
+
+ std::string err_msg;
+ EXPECT_FALSE(compile.compile(streams, comp_obj, &err_msg));
+
+ for (auto stream : streams) {
+ delete stream;
+ }
+
+}
+
+TEST_F(basic_stl, compile_with_warnings) {
+
+ TrexStreamsCompiler compile;
+ std::vector<TrexStream *> streams;
+ TrexStream *stream;
+
+ /* stream 1 */
+ stream = new TrexStream(TrexStream::stSINGLE_BURST, 0, 231);
+ stream->m_enabled = true;
+ stream->set_pps(1.0);
+ stream->set_single_burst(200);
+
+ stream->m_next_stream_id = 1928;
+ stream->m_self_start = true;
+
+ streams.push_back(stream);
+
+ /* stream 2 */
+ stream = new TrexStream(TrexStream::stSINGLE_BURST, 0, 5481);
+ stream->m_enabled = true;
+ stream->m_next_stream_id = 1928;
+ stream->m_self_start = true;
+ stream->set_pps(52.0);
+
+ streams.push_back(stream);
+
+ /* stream 3 */
+
+ stream = new TrexStream(TrexStream::stSINGLE_BURST, 0, 1928);
+ stream->m_enabled = true;
+ stream->set_pps(1.0);
+ stream->set_single_burst(200);
+
+ stream->m_next_stream_id = -1;
+ stream->m_self_start = true;
+
+ streams.push_back(stream);
+
+
+
+ /* compile */
+ TrexStreamsCompiledObj comp_obj(0,1.0);
+
+ std::string err_msg;
+ EXPECT_TRUE(compile.compile(streams, comp_obj, &err_msg));
+
+ EXPECT_TRUE(compile.get_last_compile_warnings().size() == 1);
+
+ for (auto stream : streams) {
+ delete stream;
+ }
+
+}
+
+
+TEST_F(basic_stl, compile_good_stream_id_compres) {
+
+ TrexStreamsCompiler compile;
+ std::vector<TrexStream *> streams;
+
+ TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST,0,700);
+ stream1->m_self_start = true;
+ stream1->m_enabled = true;
+ stream1->set_pps(1.0);
+ stream1->set_single_burst(200);
+
+ /* non existant next stream */
+ stream1->m_next_stream_id = 800;
+
+
+ TrexStream * stream2 = new TrexStream(TrexStream::stSINGLE_BURST,0,800);
+ stream2->set_pps(52.0);
+ stream2->m_enabled = true;
+ stream2->m_next_stream_id = 700;
+ stream2->set_single_burst(300);
+
+
+ streams.push_back(stream1);
+ streams.push_back(stream2);
+
+ TrexStreamsCompiledObj comp_obj(0,1.0);
+
+ std::string err_msg;
+ EXPECT_TRUE(compile.compile(streams, comp_obj, &err_msg));
+
+ printf(" %s \n",err_msg.c_str());
+
+ comp_obj.Dump(stdout);
+
+ EXPECT_EQ_UINT32(comp_obj.get_objects()[0].m_stream->m_stream_id,0);
+ EXPECT_EQ_UINT32(comp_obj.get_objects()[0].m_stream->m_next_stream_id,1);
+
+ EXPECT_EQ_UINT32(comp_obj.get_objects()[1].m_stream->m_stream_id,1);
+ EXPECT_EQ_UINT32(comp_obj.get_objects()[1].m_stream->m_next_stream_id,0);
+
+ delete stream1;
+ delete stream2;
+
+}
+/********************************************* Itay Tests End *************************************/
diff --git a/src/main.cpp b/src/main.cpp
index 64547d57..b633fce6 100755
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -767,6 +767,7 @@ extern "C" const char * get_build_time(void){
+
int main(int argc , char * argv[]){
int res=0;
time_init();
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index c4ecb97d..865c84ed 100755
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -58,6 +58,7 @@ limitations under the License.
#include <stateless/cp/trex_stateless.h>
#include <stateless/dp/trex_stream_node.h>
+#include <stateless/messaging/trex_stateless_messaging.h>
#include <../linux_dpdk/version.h>
@@ -551,19 +552,17 @@ static int usage(){
printf(" --mac [file] : YAML file with <client ip, mac addr> configuration \n");
printf(" \n\n");
- printf(" -r : realtime enable \n");
- printf(" \n\n");
- printf(" -c [number of cores] : 1 ,2,3,4,5 numnber of dual cores + master 1 means 1 master and 2 cores \n");
+ printf(" -c [number of threads] : default is 1. number of threads to allocate for each dual ports. \n");
printf(" \n");
- printf(" -s : run only one data path core\n");
+ printf(" -s : run only one data path core. for debug\n");
printf(" \n");
- printf(" --flip : flow will be sent from client->server and server->client for maximum throughput \n");
+ printf(" --flip : flow will be sent from client->server and server->client for maximum throughput \n");
printf(" \n");
- printf(" -p : flow-flip , send all packets flow from the same interface base of client ip \n");
+ printf(" -p : flow-flip , send all flow packets from the same interface base of client ip \n");
printf(" -e : like -p but comply to the generator rules \n");
printf(" \n");
- printf(" -l [pkt/sec] : run laterncy daemon in this rate \n");
+ printf(" -l [pkt/sec] : run latency daemon in this rate \n");
printf(" e.g -l 1000 run 1000 pkt/sec from each interface , zero mean to disable latency check \n");
printf(" --lm : latency mask \n");
printf(" 0x1 only port 0 will send traffic \n");
@@ -571,20 +570,18 @@ static int usage(){
printf(" \n");
- printf(" --limit-ports : limit number of ports , must be even e.g 2,4 \n");
+ printf(" --limit-ports : limit number of ports, must be even e.g. 2,4 \n");
printf(" \n");
- printf(" --nc : if set will not close all the flow , faster \n");
+ printf(" --nc : If set, will not wait for all the flows to be closed, terminate faster- see manual for more information \n");
printf(" \n");
- printf(" -d : duration of the test in sec \n");
+ printf(" -d : duration of the test in sec. look for --nc \n");
printf(" \n");
- printf(" -pm : platform factor , in case you have splitter in the setup you can multiply the total results in this factor \n");
+ printf(" -pm : platform factor ,in case you have splitter in the setup you can multiply the total results in this factor \n");
printf(" e.g --pm 2.0 will multiply all the results bps in this factor \n");
printf(" \n");
printf(" -pubd : disable monitors publishers \n");
- printf(" -m : factor of bandwidth \n");
- printf(" \n");
- printf(" -1g : 1G trex \n");
+ printf(" -m : factor of bandwidth \n");
printf(" \n");
printf(" -k [sec] : run latency test before starting the test. it will wait for x sec sending packet and x sec after that \n");
printf(" \n");
@@ -594,7 +591,7 @@ static int usage(){
printf(" you can copy this file to /etc/trex_cfg.yaml \n");
printf(" \n");
- printf(" --ipv6 : work in ipv6 mode \n");
+ printf(" --ipv6 : work in ipv6 mode\n");
printf(" --learn : Work in NAT environments, learn the dynamic NAT translation and ALG \n");
printf(" --learn-verify : Learn the translation, but intended for verification of the mechanism in cases that NAT does not exist \n");
@@ -609,17 +606,17 @@ static int usage(){
printf(" Warning : This program can generate huge-files (TB ) watch out! try this only on local drive \n");
printf(" \n");
printf(" \n");
- printf(" --rx-check [sample] : enable rx check thread , using this thread we sample flows 1/sample and check order,latency and more \n");
+ printf(" --rx-check [sample] : enable rx check thread, using this thread we sample flows 1/sample and check order,latency and more \n");
printf(" this feature consume another thread \n");
printf(" \n");
- printf(" --hops [hops] : If rx check is enabled, the hop number can be assigned. The default number of hops is 1\n");
- printf(" --iom [mode] : io mode for interactive mode [0- silent, 1- normal , 2- short] \n");
+ printf(" --hops [hops] : If rx check is enabled, the hop number can be assigned. The default number of hops is 1\n");
+ printf(" --iom [mode] : io mode for interactive mode [0- silent, 1- normal , 2- short] \n");
printf(" this feature consume another thread \n");
printf(" \n");
- printf(" --no-key : daemon mode, don't get input from keyboard \n");
- printf(" --no-flow-control : In default TRex disables flow-control using this flag it does not touch it \n");
- printf(" --prefix : for multi trex, each instance should have a different name \n");
- printf(" --mac-spread : Spread the destination mac-order by this factor. e.g 2 will generate the traffic to 2 devices DEST-MAC ,DEST-MAC+1 \n");
+ printf(" --no-key : daemon mode, don't get input from keyboard \n");
+ printf(" --no-flow-control : In default TRex disables flow-control using this flag it does not touch it \n");
+ printf(" --prefix : for multi trex, each instance should have a different name \n");
+ printf(" --mac-spread : Spread the destination mac-order by this factor. e.g 2 will generate the traffic to 2 devices DEST-MAC ,DEST-MAC+1 \n");
printf(" maximum is up to 128 devices \n");
@@ -630,7 +627,7 @@ static int usage(){
printf(" \n");
printf(" -o [capfile_name] simulate trex into pcap file \n");
printf(" --pcap export the file in pcap mode \n");
- printf(" t-rex-64 -d 10 -f cfg.yaml -o my.pcap --pcap # export 10 sec of what Trex will do on real-time to a file my.pcap \n");
+ printf(" bp-sim-64 -d 10 -f cfg.yaml -o my.pcap --pcap # export 10 sec of what Trex will do on real-time to a file my.pcap \n");
printf(" --vm-sim : simulate vm with driver of one input queue and one output queue \n");
printf(" \n");
printf(" Examples: ");
@@ -892,7 +889,7 @@ static int parse_options(int argc, char *argv[], CParserOption* po, bool first_t
}
if (po->preview.get_is_rx_check_enable() && ( po->is_latency_disabled() ) ) {
- printf(" rx check must be enable with latency check. try adding '-l 1000' \n");
+ printf(" rx check must be enabled with latency check. try adding '-l 1000' \n");
return -1;
}
@@ -905,7 +902,7 @@ static int parse_options(int argc, char *argv[], CParserOption* po, bool first_t
uint32_t cores=po->preview.getCores();
if ( cores > ((BP_MAX_CORES)/2-1) ) {
- printf(" ERROR maximum cores are : %d \n",((BP_MAX_CORES)/2-1));
+ printf(" ERROR maximum supported cores are : %d \n",((BP_MAX_CORES)/2-1));
return -1;
}
@@ -2821,8 +2818,15 @@ public:
int reset_counters();
+public:
+private:
+ /* try to stop all datapath cores */
+ void try_stop_all_dp();
+ /* send message to all dp cores */
+ int send_message_all_dp(TrexStatelessCpToDpMsgBase *msg);
public:
+
int start_send_master();
int start_master_stateless();
@@ -3248,6 +3252,40 @@ bool CGlobalTRex::is_all_links_are_up(bool dump){
}
+void CGlobalTRex::try_stop_all_dp(){
+
+ TrexStatelessDpQuit * msg= new TrexStatelessDpQuit();
+ send_message_all_dp(msg);
+ delete msg;
+ bool all_core_finished = false;
+ int i;
+ for (i=0; i<20; i++) {
+ if ( is_all_cores_finished() ){
+ all_core_finished =true;
+ break;
+ }
+ delay(100);
+ }
+ if ( all_core_finished ){
+ printf(" All cores stopped !! \n");
+ }else{
+ printf(" ERROR one of the DP core is stucked !\n");
+ }
+}
+
+
+int CGlobalTRex::send_message_all_dp(TrexStatelessCpToDpMsgBase *msg){
+
+ int max_threads=(int)CMsgIns::Ins()->getCpDp()->get_num_threads();
+ int i;
+
+ for (i=0; i<max_threads; i++) {
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp((uint8_t)i);
+ ring->Enqueue((CGenNode*)msg->clone());
+ }
+ return (0);
+}
+
int CGlobalTRex::ixgbe_rx_queue_flush(){
int i;
@@ -4088,6 +4126,11 @@ int CGlobalTRex::run_in_master(){
}
}
+ if (!is_all_cores_finished()) {
+ /* probably CLTR-C */
+ try_stop_all_dp();
+ }
+
m_mg.stop();
delay(1000);
if ( was_stopped ){
@@ -4602,11 +4645,10 @@ int main_test(int argc , char * argv[]){
&& (CGlobalInfo::m_options.m_latency_prev>0) ){
uint32_t pkts = CGlobalInfo::m_options.m_latency_prev*
CGlobalInfo::m_options.m_latency_rate;
- printf("Start prev latency check - hack for Keren for %d sec \n",CGlobalInfo::m_options.m_latency_prev);
+ printf("Start prev latency check- for %d sec \n",CGlobalInfo::m_options.m_latency_prev);
g_trex.m_mg.start(pkts);
- printf("Delay now you can call command \n");
delay(CGlobalInfo::m_options.m_latency_prev* 1000);
- printf("Finish wating \n");
+ printf("Finished \n");
g_trex.m_mg.reset();
g_trex.reset_counters();
}
diff --git a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp
index 5ec92afc..cdd13ed6 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp
@@ -143,6 +143,10 @@ TrexRpcCmdAddStream::allocate_new_stream(const Json::Value &section, uint8_t por
stream = new TrexStream( TrexStream::stCONTINUOUS, port_id, stream_id);
stream->set_pps(pps);
+ if (stream->m_next_stream_id != -1) {
+ generate_parse_err(result, "continious stream cannot provide next stream id - only -1 is valid");
+ }
+
} else if (type == "single_burst") {
uint32_t total_pkts = parse_int(mode, "total_pkts", result);
@@ -150,7 +154,7 @@ TrexRpcCmdAddStream::allocate_new_stream(const Json::Value &section, uint8_t por
stream = new TrexStream(TrexStream::stSINGLE_BURST,port_id, stream_id);
stream->set_pps(pps);
- stream->set_signle_burtst(total_pkts);
+ stream->set_single_burst(total_pkts);
} else if (type == "multi_burst") {
diff --git a/src/stateless/cp/trex_stream.cpp b/src/stateless/cp/trex_stream.cpp
index 1a05257c..5203b2a2 100644
--- a/src/stateless/cp/trex_stream.cpp
+++ b/src/stateless/cp/trex_stream.cpp
@@ -25,6 +25,71 @@ limitations under the License.
/**************************************
* stream
*************************************/
+
+
+std::string TrexStream::get_stream_type_str(stream_type_t stream_type){
+
+ std::string res;
+
+
+ switch (stream_type) {
+
+ case stCONTINUOUS :
+ res="stCONTINUOUS ";
+ break;
+
+ case stSINGLE_BURST :
+ res="stSINGLE_BURST ";
+ break;
+
+ case stMULTI_BURST :
+ res="stMULTI_BURST ";
+ break;
+ default:
+ res="Unknow ";
+ };
+ return(res);
+}
+
+
+void TrexStream::Dump(FILE *fd){
+
+ fprintf(fd,"\n");
+ fprintf(fd,"==> Stream_id : %lu \n",(ulong)m_stream_id);
+ fprintf(fd," Enabled : %lu \n",(ulong)(m_enabled?1:0));
+ fprintf(fd," Self_start : %lu \n",(ulong)(m_self_start?1:0));
+
+ if (m_next_stream_id>=0) {
+ fprintf(fd," Nex_stream_id : %lu \n",(ulong)m_next_stream_id);
+ }else {
+ fprintf(fd," Nex_stream_id : %d \n",m_next_stream_id);
+ }
+
+ fprintf(fd," Port_id : %lu \n",(ulong)m_port_id);
+
+ if (m_isg_usec>0.0) {
+ fprintf(fd," isg : %6.2f \n",m_isg_usec);
+ }
+ fprintf(fd," type : %s \n",get_stream_type_str(m_type).c_str());
+
+ if ( m_type == TrexStream::stCONTINUOUS ) {
+ fprintf(fd," pps : %f \n",m_pps);
+ }
+ if (m_type == TrexStream::stSINGLE_BURST) {
+ fprintf(fd," pps : %f \n",m_pps);
+ fprintf(fd," burst : %lu \n",(ulong)m_burst_total_pkts);
+ }
+ if (m_type == TrexStream::stMULTI_BURST) {
+ fprintf(fd," pps : %f \n",m_pps);
+ fprintf(fd," burst : %lu \n",(ulong)m_burst_total_pkts);
+ fprintf(fd," mburst : %lu \n",(ulong)m_num_bursts);
+ if (m_ibg_usec>0.0) {
+ fprintf(fd," m_ibg_usec : %f \n",m_ibg_usec);
+ }
+ }
+}
+
+
TrexStream::TrexStream(uint8_t type,
uint8_t port_id, uint32_t stream_id) : m_port_id(port_id), m_stream_id(stream_id) {
diff --git a/src/stateless/cp/trex_stream.h b/src/stateless/cp/trex_stream.h
index 151723ad..0634829e 100644
--- a/src/stateless/cp/trex_stream.h
+++ b/src/stateless/cp/trex_stream.h
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoch Haim
Cisco Systems, Inc.
*/
@@ -65,6 +66,9 @@ public:
stMULTI_BURST = 6
};
+ typedef uint8_t stream_type_t ;
+
+ static std::string get_stream_type_str(stream_type_t stream_type);
public:
TrexStream(uint8_t type,uint8_t port_id, uint32_t stream_id);
@@ -80,6 +84,12 @@ public:
/* access the stream json */
const Json::Value & get_stream_json();
+ /* compress the stream id to be zero based */
+ void fix_dp_stream_id(uint32_t my_stream_id,int next_stream_id){
+ m_stream_id = my_stream_id;
+ m_next_stream_id = next_stream_id;
+ }
+
double get_pps() {
return m_pps;
}
@@ -92,21 +102,29 @@ public:
m_type = type;
}
- uint8_t get_type(void){
+ uint8_t get_type(void) const {
return ( m_type );
}
+ bool is_dp_next_stream(){
+ if (m_next_stream_id<0) {
+ return (false);
+ }else{
+ return (true);
+ }
+ }
+
void set_multi_burst(uint32_t burst_total_pkts,
- uint32_t num_bursts,
- double ibg_usec){
+ uint32_t num_bursts,
+ double ibg_usec) {
m_burst_total_pkts = burst_total_pkts;
m_num_bursts = num_bursts;
m_ibg_usec = ibg_usec;
}
- void set_signle_burtst(uint32_t burst_total_pkts){
+ void set_single_burst(uint32_t burst_total_pkts){
set_multi_burst(burst_total_pkts,1,0.0);
}
@@ -132,11 +150,12 @@ public:
return (dp);
}
+ void Dump(FILE *fd);
public:
/* basic */
uint8_t m_type;
uint8_t m_port_id;
- uint32_t m_stream_id;
+ uint32_t m_stream_id; /* id from RPC can be anything */
/* config fields */
diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp
index 580db51c..bdfc3c01 100644
--- a/src/stateless/cp/trex_streams_compiler.cpp
+++ b/src/stateless/cp/trex_streams_compiler.cpp
@@ -19,9 +19,118 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-#include <string.h>
+#include <string>
+#include <sstream>
#include <trex_streams_compiler.h>
#include <trex_stream.h>
+#include <assert.h>
+#include <trex_stateless.h>
+#include <iostream>
+
+/**
+ * describes a graph node in the pre compile check
+ *
+ * @author imarom (16-Nov-15)
+ */
+class GraphNode {
+public:
+ GraphNode(TrexStream *stream, GraphNode *next) : m_stream(stream), m_next(next) {
+ marked = false;
+ m_compressed_stream_id=-1;
+ }
+
+ uint32_t get_stream_id() const {
+ return m_stream->m_stream_id;
+ }
+
+ const TrexStream *m_stream;
+ GraphNode *m_next;
+ std::vector<const GraphNode *> m_parents;
+ bool marked;
+ int m_compressed_stream_id;
+};
+
+/**
+ * node map
+ *
+ */
+class GraphNodeMap {
+public:
+
+ GraphNodeMap() : m_dead_end(NULL, NULL) {
+
+ }
+
+ bool add(GraphNode *node) {
+ if (has(node->get_stream_id())) {
+ return false;
+ }
+
+ m_nodes[node->get_stream_id()] = node;
+
+ if (node->m_stream->m_self_start) {
+ m_roots.push_back(node);
+ }
+
+ return true;
+ }
+
+ bool has(uint32_t stream_id) {
+
+ return (get(stream_id) != NULL);
+ }
+
+ GraphNode * get(uint32_t stream_id) {
+
+ if (stream_id == -1) {
+ return &m_dead_end;
+ }
+
+ auto search = m_nodes.find(stream_id);
+
+ if (search != m_nodes.end()) {
+ return search->second;
+ } else {
+ return NULL;
+ }
+ }
+
+ void clear_marks() {
+ for (auto node : m_nodes) {
+ node.second->marked = false;
+ }
+ }
+
+ void get_unmarked(std::vector <GraphNode *> &unmarked) {
+ for (auto node : m_nodes) {
+ if (!node.second->marked) {
+ unmarked.push_back(node.second);
+ }
+ }
+ }
+
+
+ ~GraphNodeMap() {
+ for (auto node : m_nodes) {
+ delete node.second;
+ }
+ m_nodes.clear();
+ }
+
+ std::vector <GraphNode *> & get_roots() {
+ return m_roots;
+ }
+
+
+ std::unordered_map<uint32_t, GraphNode *> get_nodes() {
+ return m_nodes;
+ }
+
+private:
+ std::unordered_map<uint32_t, GraphNode *> m_nodes;
+ std::vector <GraphNode *> m_roots;
+ GraphNode m_dead_end;
+};
/**************************************
* stream compiled object
@@ -36,15 +145,37 @@ TrexStreamsCompiledObj::~TrexStreamsCompiledObj() {
m_objs.clear();
}
+
+void
+TrexStreamsCompiledObj::add_compiled_stream(TrexStream * stream){
+
+ obj_st obj;
+
+ obj.m_stream = stream->clone_as_dp();
+
+ m_objs.push_back(obj);
+}
+
void
-TrexStreamsCompiledObj::add_compiled_stream(TrexStream * stream) {
+TrexStreamsCompiledObj::add_compiled_stream(TrexStream * stream,
+ uint32_t my_dp_id, int next_dp_id) {
obj_st obj;
obj.m_stream = stream->clone_as_dp();
+ /* compress the id's*/
+ obj.m_stream->fix_dp_stream_id(my_dp_id,next_dp_id);
m_objs.push_back(obj);
}
+void TrexStreamsCompiledObj::Dump(FILE *fd){
+ for (auto obj : m_objs) {
+ obj.m_stream->Dump(fd);
+ }
+}
+
+
+
TrexStreamsCompiledObj *
TrexStreamsCompiledObj::clone() {
@@ -63,11 +194,213 @@ TrexStreamsCompiledObj::clone() {
return new_compiled_obj;
}
+void
+TrexStreamsCompiler::add_warning(const std::string &warning) {
+ m_warnings.push_back("*** warning: " + warning);
+}
+
+void
+TrexStreamsCompiler::err(const std::string &err) {
+ throw TrexException("*** error: " + err);
+}
+
+void
+TrexStreamsCompiler::check_stream(const TrexStream *stream) {
+ std::stringstream ss;
+
+ /* cont. stream can point only on itself */
+ if (stream->get_type() == TrexStream::stCONTINUOUS) {
+ if (stream->m_next_stream_id != -1) {
+ ss << "continous stream '" << stream->m_stream_id << "' cannot point on another stream";
+ err(ss.str());
+ }
+ }
+}
+
+void
+TrexStreamsCompiler::allocate_pass(const std::vector<TrexStream *> &streams,
+ GraphNodeMap *nodes) {
+ std::stringstream ss;
+ uint32_t compressed_stream_id=0;
+
+
+ /* first pass - allocate all nodes and check for duplicates */
+ for (auto stream : streams) {
+
+ /* skip non enabled streams */
+ if (!stream->m_enabled) {
+ continue;
+ }
+
+ /* sanity check on the stream itself */
+ check_stream(stream);
+
+ /* duplicate stream id ? */
+ if (nodes->has(stream->m_stream_id)) {
+ ss << "duplicate instance of stream id " << stream->m_stream_id;
+ err(ss.str());
+ }
+
+ GraphNode *node = new GraphNode(stream, NULL);
+ /* allocate new compressed id */
+ node->m_compressed_stream_id = compressed_stream_id;
+
+ compressed_stream_id++;
+
+ /* add to the map */
+ assert(nodes->add(node));
+ }
+
+}
+
+/**
+ * on this pass we direct the graph to point to the right nodes
+ *
+ */
+void
+TrexStreamsCompiler::direct_pass(GraphNodeMap *nodes) {
+
+ /* second pass - direct the graph */
+ for (auto p : nodes->get_nodes()) {
+
+ GraphNode *node = p.second;
+ const TrexStream *stream = node->m_stream;
+
+ /* check the stream points on an existing stream */
+ GraphNode *next_node = nodes->get(stream->m_next_stream_id);
+ if (!next_node) {
+ std::stringstream ss;
+ ss << "stream " << node->get_stream_id() << " is pointing on non existent stream " << stream->m_next_stream_id;
+ err(ss.str());
+ }
+
+ node->m_next = next_node;
+
+ /* do we have more than one parent ? */
+ next_node->m_parents.push_back(node);
+ }
+
+
+ /* check for multiple parents */
+ for (auto p : nodes->get_nodes()) {
+ GraphNode *node = p.second;
+
+ if (node->m_parents.size() > 0 ) {
+ std::stringstream ss;
+
+ ss << "stream " << node->get_stream_id() << " is triggered by multiple streams: ";
+ for (auto x : node->m_parents) {
+ ss << x->get_stream_id() << " ";
+ }
+
+ add_warning(ss.str());
+ }
+ }
+}
+
+/**
+ * mark sure all the streams are reachable
+ *
+ */
+void
+TrexStreamsCompiler::check_for_unreachable_streams(GraphNodeMap *nodes) {
+ /* start with the roots */
+ std::vector <GraphNode *> next_nodes = nodes->get_roots();
+
+
+ nodes->clear_marks();
+
+ /* run BFS from all the roots */
+ while (!next_nodes.empty()) {
+
+ /* pull one */
+ GraphNode *node = next_nodes.back();
+ next_nodes.pop_back();
+ if (node->marked) {
+ continue;
+ }
+
+ node->marked = true;
+
+ if (node->m_next != NULL) {
+ next_nodes.push_back(node->m_next);
+ }
+
+ }
+
+ std::vector <GraphNode *> unmarked;
+ nodes->get_unmarked(unmarked);
+
+ if (!unmarked.empty()) {
+ std::stringstream ss;
+ for (auto node : unmarked) {
+ ss << "stream " << node->get_stream_id() << " is unreachable from any other stream\n";
+ }
+ err(ss.str());
+ }
+
+
+}
+
+/**
+ * check validation of streams for compile
+ *
+ * @author imarom (16-Nov-15)
+ *
+ * @param streams
+ * @param fail_msg
+ *
+ * @return bool
+ */
+void
+TrexStreamsCompiler::pre_compile_check(const std::vector<TrexStream *> &streams,
+ GraphNodeMap & nodes) {
+
+ m_warnings.clear();
+
+ /* allocate nodes */
+ allocate_pass(streams, &nodes);
+
+ /* direct the graph */
+ direct_pass(&nodes);
+
+ /* check for non reachable streams inside the graph */
+ check_for_unreachable_streams(&nodes);
+
+}
+
/**************************************
* stream compiler
*************************************/
bool
-TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams, TrexStreamsCompiledObj &obj) {
+TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams,
+ TrexStreamsCompiledObj &obj,
+ std::string *fail_msg) {
+
+#if 0
+ fprintf(stdout,"------------pre compile \n");
+ for (auto stream : streams) {
+ stream->Dump(stdout);
+ }
+ fprintf(stdout,"------------pre compile \n");
+#endif
+
+ GraphNodeMap nodes;
+
+
+ /* compile checks */
+ try {
+ pre_compile_check(streams,nodes);
+ } catch (const TrexException &ex) {
+ if (fail_msg) {
+ *fail_msg = ex.what();
+ } else {
+ std::cout << ex.what();
+ }
+ return false;
+ }
+
+
/* for now we do something trivial, */
for (auto stream : streams) {
@@ -76,13 +409,19 @@ TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams, TrexStrea
continue;
}
- /* for now skip also non self started streams */
- if (!stream->m_self_start) {
- continue;
+ int new_id= nodes.get(stream->m_stream_id)->m_compressed_stream_id;
+ assert(new_id>=0);
+ uint32_t my_stream_id = (uint32_t)new_id;
+ int my_next_stream_id=-1;
+ if (stream->m_next_stream_id>=0) {
+ my_next_stream_id=nodes.get(stream->m_next_stream_id)->m_compressed_stream_id;
}
/* add it */
- obj.add_compiled_stream(stream);
+ obj.add_compiled_stream(stream,
+ my_stream_id,
+ my_next_stream_id
+ );
}
return true;
diff --git a/src/stateless/cp/trex_streams_compiler.h b/src/stateless/cp/trex_streams_compiler.h
index 44c8a0fc..200f7ce9 100644
--- a/src/stateless/cp/trex_streams_compiler.h
+++ b/src/stateless/cp/trex_streams_compiler.h
@@ -23,9 +23,11 @@ limitations under the License.
#include <stdint.h>
#include <vector>
+#include <string>
class TrexStreamsCompiler;
class TrexStream;
+class GraphNodeMap;
/**
* compiled object for a table of streams
@@ -48,6 +50,10 @@ public:
return m_objs;
}
+ uint8_t get_port_id(){
+ return (m_port_id);
+ }
+
/**
* clone the compiled object
*
@@ -58,8 +64,13 @@ public:
return (m_mul);
}
+ void Dump(FILE *fd);
+
private:
+ void add_compiled_stream(TrexStream * stream,
+ uint32_t my_dp_id, int next_dp_id);
void add_compiled_stream(TrexStream * stream);
+
std::vector<obj_st> m_objs;
uint8_t m_port_id;
@@ -68,13 +79,36 @@ private:
class TrexStreamsCompiler {
public:
+
/**
* compiles a vector of streams to an object passable to the DP
*
* @author imarom (28-Oct-15)
*
*/
- bool compile(const std::vector<TrexStream *> &streams, TrexStreamsCompiledObj &obj);
+ bool compile(const std::vector<TrexStream *> &streams, TrexStreamsCompiledObj &obj, std::string *fail_msg = NULL);
+
+ /**
+ *
+ * returns a reference pointer to the last compile warnings
+ * if no warnings were produced - the vector is empty
+ */
+ const std::vector<std::string> & get_last_compile_warnings() {
+ return m_warnings;
+ }
+
+private:
+
+ void pre_compile_check(const std::vector<TrexStream *> &streams,
+ GraphNodeMap & nodes);
+ void allocate_pass(const std::vector<TrexStream *> &streams, GraphNodeMap *nodes);
+ void direct_pass(GraphNodeMap *nodes);
+ void check_for_unreachable_streams(GraphNodeMap *nodes);
+ void check_stream(const TrexStream *stream);
+ void add_warning(const std::string &warning);
+ void err(const std::string &err);
+
+ std::vector<std::string> m_warnings;
};
#endif /* __TREX_STREAMS_COMPILER_H__ */
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index eabd6fdb..c4fdd44b 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoch Haim
Cisco Systems, Inc.
*/
@@ -26,12 +27,83 @@ limitations under the License.
#include <bp_sim.h>
-static inline double
-usec_to_sec(double usec) {
- return (usec / (1000 * 1000));
+
+void CDpOneStream::Delete(CFlowGenListPerThread * core){
+ assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
+ core->free_node((CGenNode *)m_node);
+ delete m_dp_stream;
+ m_node=0;
+ m_dp_stream=0;
+}
+
+void CDpOneStream::DeleteOnlyStream(){
+ assert(m_dp_stream);
+ delete m_dp_stream;
+ m_dp_stream=0;
+}
+
+int CGenNodeStateless::get_stream_id(){
+ if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) {
+ return (-1); // not valid
+ }
+ assert(m_ref_stream_info);
+ return ((int)m_ref_stream_info->m_stream_id);
+}
+
+
+void CGenNodeStateless::DumpHeader(FILE *fd){
+ fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n");
+
+}
+void CGenNodeStateless::Dump(FILE *fd){
+ fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu \n",
+ m_time,
+ (ulong)m_port_id,
+ "s-pkt", //action
+ get_stream_state_str(m_state ).c_str(),
+ get_stream_id(), //stream_id
+ TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype
+ (ulong)m_multi_bursts,
+ (ulong)m_single_burst
+ );
+}
+
+
+void CGenNodeStateless::refresh(){
+
+ /* refill the stream info */
+ m_single_burst = m_single_burst_refill;
+ m_multi_bursts = m_ref_stream_info->m_num_bursts;
+ m_state = CGenNodeStateless::ss_ACTIVE;
+}
+
+
+
+void CGenNodeCommand::free_command(){
+ assert(m_cmd);
+ delete m_cmd;
}
+std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){
+ std::string res;
+
+ switch (stream_state) {
+ case CGenNodeStateless::ss_FREE_RESUSE :
+ res="FREE ";
+ break;
+ case CGenNodeStateless::ss_INACTIVE :
+ res="INACTIVE ";
+ break;
+ case CGenNodeStateless::ss_ACTIVE :
+ res="ACTIVE ";
+ break;
+ default:
+ res="Unknow ";
+ };
+ return(res);
+}
+
void CGenNodeStateless::free_stl_node(){
/* if we have cache mbuf free it */
@@ -43,11 +115,54 @@ void CGenNodeStateless::free_stl_node(){
}
+bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){
+ m_active_streams-=d; /* reduce the number of streams */
+ if (m_active_streams == 0) {
+ return (true);
+ }
+ return (false);
+}
+
+
+void TrexStatelessDpPerPort::stop_traffic(uint8_t port_id){
+
+ assert(m_state==TrexStatelessDpPerPort::ppSTATE_TRANSMITTING);
+
+ for (auto dp_stream : m_active_nodes) {
+ CGenNodeStateless * node =dp_stream.m_node;
+ assert(node->get_port_id() == port_id);
+ if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) {
+ node->mark_for_free();
+ m_active_streams--;
+ dp_stream.DeleteOnlyStream();
+
+ }else{
+ dp_stream.Delete(m_core);
+ }
+ }
+
+ /* active stream should be zero */
+ assert(m_active_streams==0);
+ m_active_nodes.clear();
+ m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
+}
+
+
+void TrexStatelessDpPerPort::create(CFlowGenListPerThread * core){
+ m_core=core;
+ m_state=TrexStatelessDpPerPort::ppSTATE_IDLE;
+ m_port_id=0;
+ m_active_streams=0;
+ m_active_nodes.clear();
+}
+
+
void
TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
m_thread_id = thread_id;
m_core = core;
+ m_local_port_offset = 2*core->getDualPortId();
CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp();
@@ -55,8 +170,54 @@ TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) {
m_ring_to_cp = cp_dp->getRingDpToCp(thread_id);
m_state = STATE_IDLE;
+
+ int i;
+ for (i=0; i<NUM_PORTS_PER_CORE; i++) {
+ m_ports[i].create(core);
+ }
}
+
+/* move to the next stream, old stream move to INACTIVE */
+bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node,
+ CGenNodeStateless * next_node){
+
+ assert(cur_node);
+ TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id);
+ bool schedule =false;
+
+ bool to_stop_port=false;
+
+ if (next_node == NULL) {
+ /* there is no next stream , reduce the number of active streams*/
+ to_stop_port = lp_port->update_number_of_active_streams(1);
+
+ }else{
+ uint8_t state=next_node->get_state();
+
+ /* can't be FREE_RESUSE */
+ assert(state != CGenNodeStateless::ss_FREE_RESUSE);
+ if (next_node->get_state() == CGenNodeStateless::ss_INACTIVE ) {
+
+ /* refill start info and scedule, no update in active streams */
+ next_node->refresh();
+ schedule = true;
+
+ }else{
+ to_stop_port = lp_port->update_number_of_active_streams(1);
+ }
+ }
+
+ if ( to_stop_port ) {
+ /* call stop port explictly to move the state */
+ stop_traffic(cur_node->m_port_id);
+ }
+
+ return ( schedule );
+}
+
+
+
/**
* in idle state loop, the processor most of the time sleeps
* and periodically checks for messages
@@ -72,6 +233,15 @@ TrexStatelessDpCore::idle_state_loop() {
}
}
+
+
+void TrexStatelessDpCore::quit_main_loop(){
+ m_core->set_terminate_mode(true); /* mark it as terminated */
+ m_state = STATE_TERMINATE;
+ add_global_duration(0.0001);
+}
+
+
/**
* scehduler runs when traffic exists
* it will return when no more transmitting is done on this
@@ -89,6 +259,7 @@ TrexStatelessDpCore::start_scheduler() {
double old_offset = 0.0;
m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset);
+ /* TBD do we need that ? */
m_core->m_node_gen.close_file(m_core);
}
@@ -97,6 +268,11 @@ void
TrexStatelessDpCore::run_once(){
idle_state_loop();
+
+ if ( m_state == STATE_TERMINATE ){
+ return;
+ }
+
start_scheduler();
}
@@ -106,11 +282,32 @@ TrexStatelessDpCore::start() {
while (true) {
run_once();
+
+ if ( m_core->is_terminated_by_master() ) {
+ break;
+ }
}
}
-void
-TrexStatelessDpCore::add_duration(double duration){
+/* only if both port are idle we can exit */
+void
+TrexStatelessDpCore::schedule_exit(){
+
+ CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
+
+ node->m_type = CGenNode::COMMAND;
+
+ node->m_cmd = new TrexStatelessDpCanQuit();
+
+ /* make sure it will be scheduled after the current node */
+ node->m_time = m_core->m_cur_time_sec ;
+
+ m_core->m_node_gen.add_node((CGenNode *)node);
+}
+
+
+void
+TrexStatelessDpCore::add_global_duration(double duration){
if (duration > 0.0) {
CGenNode *node = m_core->create_node() ;
@@ -123,9 +320,28 @@ TrexStatelessDpCore::add_duration(double duration){
}
}
+/* add per port exit */
+void
+TrexStatelessDpCore::add_port_duration(double duration,
+ uint8_t port_id){
+ if (duration > 0.0) {
+ CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ;
+
+ node->m_type = CGenNode::COMMAND;
+
+ /* make sure it will be scheduled after the current node */
+ node->m_time = m_core->m_cur_time_sec + duration ;
+
+ node->m_cmd = new TrexStatelessDpStop(port_id);
+
+ m_core->m_node_gen.add_node((CGenNode *)node);
+ }
+}
+
void
-TrexStatelessDpCore::add_cont_stream(TrexStream * stream,
+TrexStatelessDpCore::add_cont_stream(TrexStatelessDpPerPort * lp_port,
+ TrexStream * stream,
TrexStreamsCompiledObj *comp) {
CGenNodeStateless *node = m_core->create_node_sl();
@@ -133,6 +349,19 @@ TrexStatelessDpCore::add_cont_stream(TrexStream * stream,
/* add periodic */
node->m_type = CGenNode::STATELESS_PKT;
+ node->m_ref_stream_info = stream->clone_as_dp();
+
+ node->m_next_stream=0; /* will be fixed later */
+
+
+ if ( stream->m_self_start ){
+ /* if self start it is in active mode */
+ node->m_state =CGenNodeStateless::ss_ACTIVE;
+ lp_port->m_active_streams++;
+ }else{
+ node->m_state =CGenNodeStateless::ss_INACTIVE;
+ }
+
node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec);
pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id);
@@ -154,6 +383,10 @@ TrexStatelessDpCore::add_cont_stream(TrexStream * stream,
switch ( stream->m_type ) {
case TrexStream::stCONTINUOUS :
+ node->m_single_burst=0;
+ node->m_single_burst_refill=0;
+ node->m_multi_bursts=0;
+ node->m_ibg_sec = 0.0;
break;
case TrexStream::stSINGLE_BURST :
@@ -175,7 +408,6 @@ TrexStatelessDpCore::add_cont_stream(TrexStream * stream,
assert(0);
};
- node->m_is_stream_active = 1;
node->m_port_id = stream->m_port_id;
/* allocate const mbuf */
@@ -196,59 +428,94 @@ TrexStatelessDpCore::add_cont_stream(TrexStream * stream,
/* set the packet as a readonly */
node->set_cache_mbuf(m);
- /* keep track */
- m_active_nodes.push_back(node);
+ CDpOneStream one_stream;
- /* schedule */
- m_core->m_node_gen.add_node((CGenNode *)node);
+ one_stream.m_dp_stream = node->m_ref_stream_info;
+ one_stream.m_node =node;
- m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
+ lp_port->m_active_nodes.push_back(one_stream);
+ /* schedule only if active */
+ if (node->m_state == CGenNodeStateless::ss_ACTIVE) {
+ m_core->m_node_gen.add_node((CGenNode *)node);
+ }
}
void
-TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj, double duration) {
+TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj,
+ double duration) {
+
+#if 0
+ /* TBD to remove ! */
+ obj->Dump(stdout);
+#endif
+
+ TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id());
+ lp_port->m_active_streams = 0;
+ /* no nodes in the list */
+ assert(lp_port->m_active_nodes.size()==0);
+
for (auto single_stream : obj->get_objects()) {
- add_cont_stream(single_stream.m_stream,obj);
+ /* all commands should be for the same port */
+ assert(obj->get_port_id() == single_stream.m_stream->m_port_id);
+ add_cont_stream(lp_port,single_stream.m_stream,obj);
}
+ uint32_t nodes = lp_port->m_active_nodes.size();
+ /* find next stream */
+ assert(nodes == obj->get_objects().size());
+
+ int cnt=0;
+
+ /* set the next_stream pointer */
+ for (auto single_stream : obj->get_objects()) {
+
+ if (single_stream.m_stream->is_dp_next_stream() ) {
+ int stream_id = single_stream.m_stream->m_next_stream_id;
+ assert(stream_id<nodes);
+ /* point to the next stream , stream_id is fixed */
+ lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ;
+ }
+ cnt++;
+ }
+
+ lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING;
+ m_state = TrexStatelessDpCore::STATE_TRANSMITTING;
+
+
if ( duration > 0.0 ){
- add_duration( duration );
+ add_port_duration( duration ,obj->get_port_id() );
}
}
+
+bool TrexStatelessDpCore::are_all_ports_idle(){
+
+ bool res=true;
+ int i;
+ for (i=0; i<NUM_PORTS_PER_CORE; i++) {
+ if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){
+ res=false;
+ }
+ }
+ return (res);
+}
+
+
void
TrexStatelessDpCore::stop_traffic(uint8_t port_id) {
/* we cannot remove nodes not from the top of the queue so
for every active node - make sure next time
the scheduler invokes it, it will be free */
- for (auto node : m_active_nodes) {
- if (node->m_port_id == port_id) {
- node->m_is_stream_active = 0;
- }
- }
- /* remove all the non active nodes */
- auto pred = std::remove_if(m_active_nodes.begin(),
- m_active_nodes.end(),
- [](CGenNodeStateless *node) { return (!node->m_is_stream_active); });
+ TrexStatelessDpPerPort * lp_port = get_port_db(port_id);
- m_active_nodes.erase(pred, m_active_nodes.end());
+ lp_port->stop_traffic(port_id);
- if (m_active_nodes.size() == 0) {
- m_state = STATE_IDLE;
- /* stop the scheduler */
+ if ( are_all_ports_idle() ) {
- CGenNode *node = m_core->create_node() ;
-
- node->m_type = CGenNode::EXIT_SCHED;
-
- /* make sure it will be scheduled after the current node */
- node->m_time = m_core->m_cur_time_sec + 0.0001;
-
- m_core->m_node_gen.add_node(node);
+ schedule_exit();
}
-
}
/**
diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h
index 7448d215..85afcf8f 100644
--- a/src/stateless/dp/trex_stateless_dp_core.h
+++ b/src/stateless/dp/trex_stateless_dp_core.h
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoch Haim
Cisco Systems, Inc.
*/
@@ -33,6 +34,52 @@ class CGenNodeStateless;
class TrexStreamsCompiledObj;
class TrexStream;
+
+class CDpOneStream {
+public:
+ void Create(){
+ }
+
+ void Delete(CFlowGenListPerThread * core);
+ void DeleteOnlyStream();
+
+ CGenNodeStateless * m_node; // schedule node
+ TrexStream * m_dp_stream; // stream info
+};
+
+class TrexStatelessDpPerPort {
+
+public:
+ /* states */
+ enum state_e {
+ ppSTATE_IDLE,
+ ppSTATE_TRANSMITTING
+ };
+
+public:
+ TrexStatelessDpPerPort(){
+ }
+
+ void create(CFlowGenListPerThread * core);
+
+ void stop_traffic(uint8_t port_id);
+
+ bool update_number_of_active_streams(uint32_t d);
+
+public:
+
+ state_e m_state;
+ uint8_t m_port_id;
+
+ uint32_t m_active_streams; /* how many active streams on this port */
+
+ std::vector<CDpOneStream> m_active_nodes; /* holds the current active nodes */
+ CFlowGenListPerThread * m_core ;
+};
+
+/* for now */
+#define NUM_PORTS_PER_CORE 2
+
class TrexStatelessDpCore {
public:
@@ -40,7 +87,9 @@ public:
/* states */
enum state_e {
STATE_IDLE,
- STATE_TRANSMITTING
+ STATE_TRANSMITTING,
+ STATE_TERMINATE
+
};
TrexStatelessDpCore() {
@@ -83,6 +132,11 @@ public:
*/
void stop_traffic(uint8_t port_id);
+
+ /* return if all ports are idel */
+ bool are_all_ports_idle();
+
+
/**
* check for and handle messages from CP
*
@@ -109,7 +163,26 @@ public:
}
+ /* quit the main loop, work in both stateless in stateful, don't free memory trigger from master */
+ void quit_main_loop();
+
+ bool set_stateless_next_node(CGenNodeStateless * cur_node,
+ CGenNodeStateless * next_node);
+
private:
+
+
+ TrexStatelessDpPerPort * get_port_db(uint8_t port_id){
+ assert((m_local_port_offset==port_id) ||(m_local_port_offset+1==port_id));
+ uint8_t local_port_id = port_id -m_local_port_offset;
+ assert(local_port_id<NUM_PORTS_PER_CORE);
+ return (&m_ports[local_port_id]);
+ }
+
+
+ void schedule_exit();
+
+
/**
* in idle state loop, the processor most of the time sleeps
* and periodically checks for messages
@@ -132,21 +205,27 @@ private:
*/
void handle_cp_msg(TrexStatelessCpToDpMsgBase *msg);
- /* add global exit */
- void add_duration(double duration);
- void add_cont_stream(TrexStream * stream,TrexStreamsCompiledObj *comp);
+ void add_port_duration(double duration,
+ uint8_t port_id);
+
+ void add_global_duration(double duration);
+
+ void add_cont_stream(TrexStatelessDpPerPort * lp_port,
+ TrexStream * stream,
+ TrexStreamsCompiledObj *comp);
uint8_t m_thread_id;
- state_e m_state;
+ uint8_t m_local_port_offset;
+
+ state_e m_state; /* state of all ports */
CNodeRing *m_ring_from_cp;
CNodeRing *m_ring_to_cp;
- /* holds the current active nodes */
- std::vector<CGenNodeStateless *> m_active_nodes;
+ TrexStatelessDpPerPort m_ports[NUM_PORTS_PER_CORE];
/* pointer to the main object */
- CFlowGenListPerThread *m_core;
+ CFlowGenListPerThread * m_core;
double m_duration;
};
diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h
index e4cf964d..1e53887b 100644
--- a/src/stateless/dp/trex_stream_node.h
+++ b/src/stateless/dp/trex_stream_node.h
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoh Haim
Cisco Systems, Inc.
*/
@@ -27,20 +28,51 @@ limitations under the License.
class TrexStatelessDpCore;
#include <trex_stream.h>
+class TrexStatelessCpToDpMsgBase;
+
+struct CGenNodeCommand : public CGenNodeBase {
+
+friend class TrexStatelessDpCore;
+
+public:
+ TrexStatelessCpToDpMsgBase * m_cmd;
+
+ uint8_t m_pad_end[104];
+
+public:
+ void free_command();
+
+} __rte_cache_aligned;;
+
+
+static_assert(sizeof(CGenNodeCommand) == sizeof(CGenNode), "sizeof(CGenNodeCommand) != sizeof(CGenNode)" );
+
/* this is a event for stateless */
struct CGenNodeStateless : public CGenNodeBase {
friend class TrexStatelessDpCore;
+public:
+ enum {
+ ss_FREE_RESUSE =1, /* should be free by scheduler */
+ ss_INACTIVE =2, /* will be active by other stream or stopped */
+ ss_ACTIVE =3 /* the stream is active */
+ };
+ typedef uint8_t stream_state_t ;
+
+ static std::string get_stream_state_str(stream_state_t stream_state);
+
private:
+ /* cache line 0 */
+ /* important stuff here */
void * m_cache_mbuf;
double m_next_time_offset; /* in sec */
double m_ibg_sec; /* inter burst time in sec */
- uint8_t m_is_stream_active;
+ stream_state_t m_state;
uint8_t m_port_id;
- uint8_t m_stream_type; /* TrexStream::STREAM_TYPE */
+ uint8_t m_stream_type; /* see TrexStream::STREAM_TYPE ,stream_type_t */
uint8_t m_pad;
uint32_t m_single_burst; /* the number of bursts in case of burst */
@@ -48,14 +80,40 @@ private:
uint32_t m_multi_bursts; /* in case of multi_burst how many bursts */
-
+ /* cache line 1 */
+ TrexStream * m_ref_stream_info; /* the stream info */
+ CGenNodeStateless * m_next_stream;
/* pad to match the size of CGenNode */
- uint8_t m_pad_end[65];
+ uint8_t m_pad_end[56];
+
+
public:
+ uint8_t get_port_id(){
+ return (m_port_id);
+ }
+
+
+ /* we restart the stream, schedule it using stream isg */
+ inline void update_refresh_time(double cur_time){
+ m_time = cur_time + usec_to_sec(m_ref_stream_info->m_isg_usec);
+ }
+
+ inline bool is_mask_for_free(){
+ return (get_state() == CGenNodeStateless::ss_FREE_RESUSE ?true:false);
+
+ }
+ inline void mark_for_free(){
+ set_state(CGenNodeStateless::ss_FREE_RESUSE);
+ /* only to be safe */
+ m_ref_stream_info= NULL;
+ m_next_stream= NULL;
+
+ }
+
inline uint8_t get_stream_type(){
return (m_stream_type);
}
@@ -72,11 +130,16 @@ public:
return (m_multi_bursts);
}
+ inline void set_state(stream_state_t new_state){
+ m_state=new_state;
+ }
- inline bool is_active() {
- return m_is_stream_active;
+
+ inline stream_state_t get_state() {
+ return m_state;
}
+ void refresh();
inline void handle_continues(CFlowGenListPerThread *thread) {
thread->m_node_gen.m_v_if->send_node( (CGenNode *)this);
@@ -100,14 +163,22 @@ public:
}else{
m_multi_bursts--;
if ( m_multi_bursts == 0 ) {
- /* stop */
- m_is_stream_active =0;
+ set_state(CGenNodeStateless::ss_INACTIVE);
+ if ( thread->set_stateless_next_node(this,m_next_stream) ){
+ /* update the next stream time using isg */
+ m_next_stream->update_refresh_time(m_time);
+
+ thread->m_node_gen.m_p_queue.push( (CGenNode *)m_next_stream);
+ }else{
+ // in case of zero we will schedule a command to stop
+ // will be called from set_stateless_next_node
+ }
+
}else{
m_time += m_ibg_sec;
m_single_burst = m_single_burst_refill;
-
+ thread->m_node_gen.m_p_queue.push( (CGenNode *)this);
}
- thread->m_node_gen.m_p_queue.push( (CGenNode *)this);
}
}
@@ -168,10 +239,14 @@ public:
void free_stl_node();
+public:
+ /* debug functions */
- void Dump(FILE *fd){
- fprintf(fd," %f, %lu, %lu \n",m_time,(ulong)m_port_id,(ulong)get_mbuf_cache_dir());
- }
+ int get_stream_id();
+
+ static void DumpHeader(FILE *fd);
+
+ void Dump(FILE *fd);
} __rte_cache_aligned;
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index d8ebc52c..856fd9e3 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoch Haim
Cisco Systems, Inc.
*/
@@ -76,3 +77,41 @@ TrexStatelessDpStop::clone() {
return new_msg;
}
+
+
+
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpQuit::clone(){
+
+ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpQuit();
+
+ return new_msg;
+}
+
+
+bool TrexStatelessDpQuit::handle(TrexStatelessDpCore *dp_core){
+
+ /* quit */
+ dp_core->quit_main_loop();
+ return (true);
+}
+
+
+bool TrexStatelessDpCanQuit::handle(TrexStatelessDpCore *dp_core){
+
+ if ( dp_core->are_all_ports_idle() ){
+ /* if all ports are idle quit now */
+ set_quit(true);
+ }
+ return (true);
+}
+
+TrexStatelessCpToDpMsgBase *
+TrexStatelessDpCanQuit::clone(){
+
+ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpCanQuit();
+
+ return new_msg;
+}
+
+
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index 90897665..7dc307c7 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -1,5 +1,6 @@
/*
Itay Marom
+ Hanoch Haim
Cisco Systems, Inc.
*/
@@ -35,6 +36,7 @@ class TrexStatelessCpToDpMsgBase {
public:
TrexStatelessCpToDpMsgBase() {
+ m_quit_scheduler=false;
}
virtual ~TrexStatelessCpToDpMsgBase() {
@@ -53,9 +55,19 @@ public:
*/
virtual TrexStatelessCpToDpMsgBase * clone() = 0;
+ /* do we want to quit scheduler, can be set by handle function */
+ void set_quit(bool enable){
+ m_quit_scheduler=enable;
+ }
+
+ bool is_quit(){
+ return ( m_quit_scheduler);
+ }
+
/* no copy constructor */
TrexStatelessCpToDpMsgBase(TrexStatelessCpToDpMsgBase &) = delete;
-
+private:
+ bool m_quit_scheduler;
};
/**
@@ -99,5 +111,38 @@ private:
uint8_t m_port_id;
};
+/**
+ * a message to Quit the datapath traffic. support only stateless for now
+ *
+ * @author hhaim
+ */
+class TrexStatelessDpQuit : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpQuit() {
+ }
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+};
+
+/**
+ * a message to check if both port are idel and exit
+ *
+ * @author hhaim
+ */
+class TrexStatelessDpCanQuit : public TrexStatelessCpToDpMsgBase {
+public:
+
+ TrexStatelessDpCanQuit() {
+ }
+
+ virtual bool handle(TrexStatelessDpCore *dp_core);
+
+ virtual TrexStatelessCpToDpMsgBase * clone();
+};
+
+
#endif /* __TREX_STATELESS_MESSAGING_H__ */