diff options
author | Dan Klein <danklein10@gmail.com> | 2015-11-24 08:54:53 +0200 |
---|---|---|
committer | Dan Klein <danklein10@gmail.com> | 2015-11-24 08:54:53 +0200 |
commit | e7cb8b0f6c2fbe08d2086a7408040ac7d12aee5a (patch) | |
tree | 1b27e542fe9f3ae4abdc8245b804cda25a6e2c2f /src | |
parent | 597f74d8ed10abc3dd9df7e81ecea5ac2f5c714e (diff) | |
parent | f3861d504353729724086dec82c79e818224554f (diff) |
Merge branch 'master' into dan_stateless
Diffstat (limited to 'src')
50 files changed, 5315 insertions, 903 deletions
diff --git a/src/bp_gtest.cpp b/src/bp_gtest.cpp index e3145f2a..a94c2d37 100755 --- a/src/bp_gtest.cpp +++ b/src/bp_gtest.cpp @@ -66,7 +66,7 @@ int test_priorty_queue(void){ int i; for (i=0; i<10; i++) { node = new CGenNode(); - printf(" +%x \n",node); + printf(" +%p \n",node); node->m_flow_id = 10-i; node->m_pkt_info = (CFlowPktInfo *)(uintptr_t)i; node->m_time = (double)i+0.1; @@ -74,7 +74,7 @@ int test_priorty_queue(void){ } while (!p_queue.empty()) { node = p_queue.top(); - printf(" -->%x \n",node); + printf(" -->%p \n",node); //node->Dump(stdout); p_queue.pop(); //delete node; @@ -131,16 +131,6 @@ int test_human_p(){ -static bool was_init=false; - -void gtest_init_once(){ - - if ( !was_init ){ - CGlobalInfo::init_pools(1000); - time_init(); - was_init=true; - } -} @@ -159,7 +149,7 @@ public: bool init(void){ - uint16 * ports; + uint16 * ports = NULL; CTupleBase tuple; CErfIF erf_vif; @@ -259,7 +249,6 @@ public: class basic : public testing::Test { protected: virtual void SetUp() { - gtest_init_once(); } virtual void TearDown() { } @@ -269,7 +258,6 @@ public: class cpu : public testing::Test { protected: virtual void SetUp() { - gtest_init_once(); } virtual void TearDown() { } @@ -663,6 +651,7 @@ TEST_F(basic, latency1) { po->preview.setFileWrite(true); uint8_t mac[]={0,0,0,1,0,0}; + (void)mac; CErfIF erf_vif; erf_vif.set_review_mode(&CGlobalInfo::m_options.preview); @@ -714,6 +703,7 @@ TEST_F(basic, latency2) { uint8_t mac[]={0,0,0,1,0,0}; + (void)mac; mac[0]=0; mac[1]=0; @@ -728,14 +718,13 @@ TEST_F(basic, latency2) { int i; for (i=0; i<100; i++) { - uint8_t *p; rte_mbuf_t * m=l.generate_pkt(0); - p=rte_pktmbuf_mtod(m, uint8_t*); + rte_pktmbuf_mtod(m, uint8_t*); //utl_DumpBuffer(stdout,p,l.get_pkt_size(),0); port0.update_packet(m); - p=rte_pktmbuf_mtod(m, uint8_t*); + rte_pktmbuf_mtod(m, uint8_t*); //utl_DumpBuffer(stdout,p,l.get_pkt_size(),0); //printf("offset is : %d \n",l.get_payload_offset()); @@ -763,6 +752,7 @@ TEST_F(basic, latency3) { uint8_t mac[]={0,0,0,1,0,0}; + (void)mac; mac[0]=0; @@ -850,6 +840,7 @@ public: TEST_F(basic, latency4) { uint8_t mac[]={0,0,0,1,0,0}; + (void)mac; mac[0]=0; mac[1]=0; @@ -1196,7 +1187,6 @@ TEST_F(cpu, cpu3) { class timerwl : public testing::Test { protected: virtual void SetUp() { - gtest_init_once(); } virtual void TearDown() { } @@ -1447,7 +1437,6 @@ TEST_F(timerwl, many_timers_with_stop) { class rx_check : public testing::Test { protected: virtual void SetUp() { - gtest_init_once(); m_rx_check.Create(); } @@ -2125,7 +2114,7 @@ class CRxCheck1 : public CRxCheckCallbackBase { public: virtual void handle_packet(rte_mbuf_t * m){ - char *mp=rte_pktmbuf_mtod(m, char*); + rte_pktmbuf_mtod(m, char*); CRx_check_header * rx_p; rte_mbuf_t * m2 = m->next; rx_p=(CRx_check_header *)rte_pktmbuf_mtod(m2, char*); @@ -2139,7 +2128,6 @@ public: class rx_check_system : public testing::Test { protected: virtual void SetUp() { - gtest_init_once(); m_rx_check.m_callback=&m_callback; m_callback.mg =&m_mg; @@ -2417,8 +2405,6 @@ public: class nat_check_system : public testing::Test { protected: virtual void SetUp() { - gtest_init_once(); - m_rx_check.m_callback=&m_callback; m_callback.mg =&m_mg; m_mg.Create(); @@ -2464,7 +2450,6 @@ class file_flow_info : public testing::Test { protected: virtual void SetUp() { - gtest_init_once(); assert(m_flow_info.Create()); } diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp index 92beab91..a61fbb8f 100755 --- a/src/bp_sim.cpp +++ b/src/bp_sim.cpp @@ -26,6 +26,7 @@ limitations under the License. #include <common/basic_utils.h> #include <trex_stream_node.h> +#include <trex_stateless_messaging.h> #undef VALG @@ -72,11 +73,11 @@ void CGlobalMemory::Dump(FILE *fd){ c_size=c_size*2; } - fprintf(fd," %-40s : %lu \n",names[i].c_str(),m_mbuf[i]); + fprintf(fd," %-40s : %lu \n",names[i].c_str(),(ulong)m_mbuf[i]); } c_total += (m_mbuf[MBUF_DP_FLOWS] * sizeof(CGenNode)); - fprintf(fd," %-40s : %lu \n","get_each_core_dp_flows",get_each_core_dp_flows()); + fprintf(fd," %-40s : %lu \n","get_each_core_dp_flows",(ulong)get_each_core_dp_flows()); fprintf(fd," %-40s : %s \n","Total memory",double_to_human_str(c_total,"bytes",KBYE_1024).c_str() ); } @@ -218,7 +219,7 @@ bool CPlatformSocketInfoConfig::init(){ m_max_threads_per_dual_if = num_threads; }else{ if (lp->m_threads.size() != num_threads) { - printf("ERROR number of threads per dual ports should be the same for all dual ports\n"); + printf("ERROR, the number of threads per dual ports should be the same for all dual ports\n"); exit(1); } } @@ -230,7 +231,7 @@ bool CPlatformSocketInfoConfig::init(){ uint8_t phy_thread = lp->m_threads[j]; if (phy_thread>MAX_THREADS_SUPPORTED) { - printf("ERROR physical thread id is %d higher than max %d \n",phy_thread,MAX_THREADS_SUPPORTED); + printf("ERROR, physical thread id is %d higher than max %d \n",phy_thread,MAX_THREADS_SUPPORTED); exit(1); } @@ -240,7 +241,7 @@ bool CPlatformSocketInfoConfig::init(){ } if ( m_thread_phy_to_virtual[phy_thread] ){ - printf("ERROR physical thread %d defined twice %d \n",phy_thread); + printf("ERROR physical thread %d defined twice\n",phy_thread); exit(1); } m_thread_phy_to_virtual[phy_thread]=virt_thread; @@ -269,7 +270,7 @@ bool CPlatformSocketInfoConfig::init(){ void CPlatformSocketInfoConfig::dump(FILE *fd){ - fprintf(fd," core_mask %x \n",get_cores_mask()); + fprintf(fd," core_mask %llx \n",(unsigned long long)get_cores_mask()); fprintf(fd," sockets :"); int i; for (i=0; i<MAX_SOCKETS_SUPPORTED; i++) { @@ -478,8 +479,8 @@ void CPlatformSocketInfo::dump(FILE *fd){ void CRteMemPool::dump_in_case_of_error(FILE *fd){ - fprintf(fd," ERROR ERROR there is no enough memory in socket %d \n",m_pool_id); - fprintf(fd," Try to enlarge the memory values in the configuration file /etc/trex_cfg.yaml \n"); + fprintf(fd," ERROR,there is no enough memory in socket %d \n",m_pool_id); + fprintf(fd," Try to enlarge the memory values in the configuration file -/etc/trex_cfg.yaml ,see manual for more detail \n"); dump(fd); } @@ -496,6 +497,26 @@ void CRteMemPool::dump(FILE *fd){ } //////////////////////////////////////// + +void CGlobalInfo::free_pools(){ + CPlatformSocketInfo * lpSocket =&m_socket; + CRteMemPool * lpmem; + int i; + for (i=0; i<(int)MAX_SOCKETS_SUPPORTED; i++) { + if (lpSocket->is_sockets_enable((socket_id_t)i)) { + lpmem= &m_mem_pool[i]; + utl_rte_mempool_delete(lpmem->m_big_mbuf_pool); + utl_rte_mempool_delete(lpmem->m_small_mbuf_pool); + utl_rte_mempool_delete(lpmem->m_mbuf_pool_128); + utl_rte_mempool_delete(lpmem->m_mbuf_pool_256); + utl_rte_mempool_delete(lpmem->m_mbuf_pool_512); + utl_rte_mempool_delete(lpmem->m_mbuf_pool_1024); + } + utl_rte_mempool_delete(m_mem_pool[0].m_mbuf_global_nodes); + } +} + + void CGlobalInfo::init_pools(uint32_t rx_buffers){ /* this include the pkt from 64- */ CGlobalMemory * lp=&CGlobalInfo::m_memory_cfg; @@ -748,9 +769,7 @@ int CErfIF::write_pkt(CCapPktRaw *pkt_raw){ int CErfIF::close_file(void){ BP_ASSERT(m_raw); - m_raw->raw=0; delete m_raw; - if ( m_preview_mode->getFileWrite() ){ BP_ASSERT(m_writer); delete m_writer; @@ -821,7 +840,6 @@ void CPacketIndication::UpdatePacketPadding(){ void CPacketIndication::RefreshPointers(){ char *pobase=getBasePtr(); - CPacketIndication * obj=this; m_ether = (EthernetHeader *) (pobase + m_ether_offset); l3.m_ipv4 = (IPHeader *) (pobase + m_ip_offset); @@ -1672,7 +1690,6 @@ char * CFlowPktInfo::push_ipv4_option_offline(uint8_t bytes){ void CFlowPktInfo::mask_as_learn(){ - char *p; CNatOption *lpNat; if ( m_pkt_indication.is_ipv6() ){ lpNat=(CNatOption *)push_ipv6_option_offline(CNatOption::noOPTION_LEN); @@ -2266,7 +2283,7 @@ void CCCapFileMemoryUsage::dump(FILE *fd){ int c_total=0; for (i=0; i<CCCapFileMemoryUsage::MASK_SIZE; i++) { - fprintf(fd," size_%-7d : %lu \n",c_size,m_buf[i]); + fprintf(fd," size_%-7d : %lu \n",c_size, (ulong)m_buf[i]); c_total +=m_buf[i]*c_size; c_size = c_size*2; } @@ -2441,7 +2458,6 @@ void operator >> (const YAML::Node& node, CFlowYamlInfo & fi) { if ( node.FindValue("dyn_pyload") ){ - int i; const YAML::Node& dyn_pyload = node["dyn_pyload"]; for(unsigned i=0;i<dyn_pyload.size();i++) { CFlowYamlDpPkt fd; @@ -2840,8 +2856,20 @@ void CFlowStats::DumpHeader(FILE *fd){ void CFlowStats::Dump(FILE *fd){ //"name","cps","f-pkts","f-bytes","Mb/sec","MB/sec","c-flows","PPS","total-Mbytes-duration","errors","flows" fprintf(fd," %02d, %-40s ,%4.2f,%4.2f, %5.0f , %7.0f ,%7.2f ,%7.2f , %7.2f , %10.0f , %5.0f , %7.0f , %llu , %llu \n", - m_id,m_name.c_str(),m_cps,get_normal_cps(), - m_pkt,m_bytes,duration_sec,m_mb_sec,m_mB_sec,m_c_flows,m_pps,m_total_Mbytes,m_errors,m_flows); + m_id, + m_name.c_str(), + m_cps, + get_normal_cps(), + m_pkt, + m_bytes, + duration_sec, + m_mb_sec, + m_mB_sec, + m_c_flows, + m_pps, + m_total_Mbytes, + (unsigned long long)m_errors, + (unsigned long long)m_flows); } bool CFlowGeneratorRecPerThread::Create(CTupleGeneratorSmart * global_gen, @@ -3045,22 +3073,31 @@ void CGenNode::DumpHeader(FILE *fd){ fprintf(fd," pkt_id,time,fid,pkt_info,pkt,len,type,is_init,is_last,type,thread_id,src_ip,dest_ip,src_port \n"); } + +void CGenNode::free_gen_node(){ + rte_mbuf_t * m=get_cache_mbuf(); + if ( unlikely(m != NULL) ) { + rte_pktmbuf_free(m); + m_plugin_info=0; + } +} + + void CGenNode::Dump(FILE *fd){ - fprintf(fd,"%.6f,%llx,%p,%llu,%d,%d,%d,%d,%d,%d,%x,%x,%d\n",m_time,m_flow_id,m_pkt_info, - m_pkt_info->m_pkt_indication.m_packet->pkt_cnt, - m_pkt_info->m_pkt_indication.m_packet->pkt_len, - m_pkt_info->m_pkt_indication.m_desc.getId(), - (m_pkt_info->m_pkt_indication.m_desc.IsInitSide()?1:0), - m_pkt_info->m_pkt_indication.m_desc.IsLastPkt(), + fprintf(fd,"%.6f,%llx,%p,%llu,%d,%d,%d,%d,%d,%d,%x,%x,%d\n", + m_time, + (unsigned long long)m_flow_id, + m_pkt_info, + (unsigned long long)m_pkt_info->m_pkt_indication.m_packet->pkt_cnt, + m_pkt_info->m_pkt_indication.m_packet->pkt_len, + m_pkt_info->m_pkt_indication.m_desc.getId(), + (m_pkt_info->m_pkt_indication.m_desc.IsInitSide()?1:0), + m_pkt_info->m_pkt_indication.m_desc.IsLastPkt(), m_type, m_thread_id, m_src_ip, m_dest_ip, - m_src_port - - - - ); + m_src_port); } @@ -3093,6 +3130,13 @@ void CNodeGenerator::remove_all(CFlowGenListPerThread * thread){ while (!m_p_queue.empty()) { node = m_p_queue.top(); m_p_queue.pop(); + /* sanity check */ + if (node->m_type == CGenNode::STATELESS_PKT) { + CGenNodeStateless * p=(CGenNodeStateless *)node; + /* need to be changed in Pause support */ + assert(p->is_mask_for_free()); + } + thread->free_node( node); } } @@ -3108,6 +3152,7 @@ int CNodeGenerator::open_file(std::string file_name, return (0); } + int CNodeGenerator::close_file(CFlowGenListPerThread * thread){ remove_all(thread); BP_ASSERT(m_v_if); @@ -3115,10 +3160,19 @@ int CNodeGenerator::close_file(CFlowGenListPerThread * thread){ return (0); } +int CNodeGenerator::update_stl_stats(CGenNodeStateless *node_sl){ + if ( m_preview_mode.getVMode() >2 ){ + fprintf(stdout," %4lu ,", (ulong)m_cnt); + node_sl->Dump(stdout); + m_cnt++; + } + return (0); +} + int CNodeGenerator::update_stats(CGenNode * node){ if ( m_preview_mode.getVMode() >2 ){ - fprintf(stdout," %llu ,",m_cnt); + fprintf(stdout," %llu ,", (unsigned long long)m_cnt); node->Dump(stdout); m_cnt++; } @@ -3132,6 +3186,8 @@ bool CFlowGenListPerThread::Create(uint32_t thread_id, uint32_t max_threads){ + m_non_active_nodes = 0; + m_terminated_by_master=false; m_flow_list =flow_list; m_core_id= core_id; m_tcp_dpc= 0; @@ -3203,7 +3259,7 @@ bool CFlowGenListPerThread::Create(uint32_t thread_id, assert(m_ring_to_rx); /* create the info required for stateless DP core */ - m_stateless_dp_info = new TrexStatelessDpCore(thread_id, this); + m_stateless_dp_info.create(thread_id, this); return (true); } @@ -3353,8 +3409,7 @@ void CFlowGenListPerThread::Delete(){ Clean(); m_cpu_cp_u.Delete(); - delete m_stateless_dp_info; - m_stateless_dp_info = NULL; + utl_rte_mempool_delete(m_node_pool); } @@ -3401,15 +3456,24 @@ int CNodeGenerator::flush_file(dsec_t max_time, bool done=false; thread->m_cpu_dp_u.start_work(); - while (!m_p_queue.empty()) { + + /** + * if a positive value was given to max time + * schedule an exit node + */ + if ( (max_time > 0) && (!always) ) { + CGenNode *exit_node = thread->create_node(); + + exit_node->m_type = CGenNode::EXIT_SCHED; + exit_node->m_time = max_time; + add_node(exit_node); + } + + while (true) { + node = m_p_queue.top(); - n_time = node->m_time+ offset; + n_time = node->m_time + offset; - if (( (n_time) > max_time ) && - (always==false) ) { - /* nothing to do */ - break; - } events++; /*#ifdef VALG if (events > 1 ) { @@ -3421,8 +3485,6 @@ int CNodeGenerator::flush_file(dsec_t max_time, dsec_t dt ; thread->m_cpu_dp_u.commit(); - thread->check_msgs(); - while ( true ) { dt = now_sec() - n_time ; @@ -3449,9 +3511,9 @@ int CNodeGenerator::flush_file(dsec_t max_time, } } - #ifndef RTE_DPDK - thread->check_msgs(); - #endif + //#ifndef RTE_DPDK + //thread->check_msgs(); + //#endif uint8_t type=node->m_type; @@ -3459,8 +3521,12 @@ int CNodeGenerator::flush_file(dsec_t max_time, m_p_queue.pop(); CGenNodeStateless *node_sl = (CGenNodeStateless *)node; + #ifdef _DEBUG + update_stl_stats(node_sl); + #endif + /* if the stream has been deactivated - end */ - if (unlikely(!node_sl->is_active())) { + if ( unlikely( node_sl->is_mask_for_free() ) ) { thread->free_node(node); } else { node_sl->handle(thread); @@ -3509,12 +3575,18 @@ int CNodeGenerator::flush_file(dsec_t max_time, } }else{ - handle_slow_messages(type,node,thread,always); + bool exit_sccheduler = handle_slow_messages(type,node,thread,always); + if (exit_sccheduler) { + break; + } } } } } + if ( thread->is_terminated_by_master() ) { + return (0); + } if (!always) { old_offset =offset; @@ -3525,10 +3597,14 @@ int CNodeGenerator::flush_file(dsec_t max_time, return (0); } -void CNodeGenerator::handle_slow_messages(uint8_t type, - CGenNode * node, - CFlowGenListPerThread * thread, - bool always){ +bool +CNodeGenerator::handle_slow_messages(uint8_t type, + CGenNode * node, + CFlowGenListPerThread * thread, + bool always){ + + /* should we continue after */ + bool exit_scheduler = false; if (unlikely (type == CGenNode::FLOW_DEFER_PORT_RELEASE) ) { m_p_queue.pop(); @@ -3549,7 +3625,7 @@ void CNodeGenerator::handle_slow_messages(uint8_t type, m_p_queue.pop(); /* time out, need to free the flow and remove the association , we didn't get convertion yet*/ thread->terminate_nat_flows(node); - return; + return (exit_scheduler); }else{ flush_one_node_to_file(node); @@ -3569,28 +3645,50 @@ void CNodeGenerator::handle_slow_messages(uint8_t type, m_p_queue.push(node); } - } else if ( type == CGenNode::FLOW_SYNC ) { + } else if ( type == CGenNode::FLOW_SYNC ) { + /* flow sync message is a sync point for time */ + thread->m_cur_time_sec = node->m_time; + + /* first pop the node */ m_p_queue.pop(); thread->check_msgs(); /* check messages */ m_v_if->flush_tx_queue(); /* flush pkt each timeout */ - if (always == false) { + /* exit in case this is the last node*/ + if ( m_p_queue.size() == m_parent->m_non_active_nodes ) { + thread->free_node(node); + exit_scheduler = true; + } else { + /* schedule for next maintenace */ node->m_time += SYNC_TIME_OUT; m_p_queue.push(node); - }else{ - thread->free_node(node); } - /* must be the last section of processing */ - } else if ( type == CGenNode::EXIT_SCHED ) { - remove_all(thread); + } else if ( type == CGenNode::EXIT_SCHED ) { + m_p_queue.pop(); + thread->free_node(node); + exit_scheduler = true; + } else { - printf(" ERROR type is not valid %d \n",type); - assert(0); + if ( type == CGenNode::COMMAND) { + m_p_queue.pop(); + CGenNodeCommand *node_cmd = (CGenNodeCommand *)node; + { + TrexStatelessCpToDpMsgBase * cmd=node_cmd->m_cmd; + cmd->handle(&thread->m_stateless_dp_info); + exit_scheduler = cmd->is_quit(); + thread->free_node((CGenNode *)node_cmd);/* free the node */ + } + }else{ + printf(" ERROR type is not valid %d \n",type); + assert(0); + } } + + return exit_scheduler; } @@ -3829,7 +3927,7 @@ void CFlowGenListPerThread::handel_nat_msg(CGenNodeNatInfo * msg){ void CFlowGenListPerThread::check_msgs(void) { /* inlined for performance */ - m_stateless_dp_info->periodic_check_for_cp_messages(); + m_stateless_dp_info.periodic_check_for_cp_messages(); if ( likely ( m_ring_from_rx->isEmpty() ) ) { return; @@ -3868,44 +3966,43 @@ void CFlowGenListPerThread::check_msgs(void) { } } -void delay(int msec); +//void delay(int msec); -const uint8_t test_udp_pkt[]={ - 0x00,0x00,0x00,0x01,0x00,0x00, - 0x00,0x00,0x00,0x01,0x00,0x00, - 0x08,0x00, - 0x45,0x00,0x00,0x81, - 0xaf,0x7e,0x00,0x00, - 0x12,0x11,0xd9,0x23, - 0x01,0x01,0x01,0x01, - 0x3d,0xad,0x72,0x1b, - - 0x11,0x11, - 0x11,0x11, - - 0x00,0x6d, - 0x00,0x00, - - 0x64,0x31,0x3a,0x61, - 0x64,0x32,0x3a,0x69,0x64, - 0x32,0x30,0x3a,0xd0,0x0e, - 0xa1,0x4b,0x7b,0xbd,0xbd, - 0x16,0xc6,0xdb,0xc4,0xbb,0x43, - 0xf9,0x4b,0x51,0x68,0x33,0x72, - 0x20,0x39,0x3a,0x69,0x6e,0x66,0x6f, - 0x5f,0x68,0x61,0x73,0x68,0x32,0x30,0x3a,0xee,0xc6,0xa3, - 0xd3,0x13,0xa8,0x43,0x06,0x03,0xd8,0x9e,0x3f,0x67,0x6f, - 0xe7,0x0a,0xfd,0x18,0x13,0x8d,0x65,0x31,0x3a,0x71,0x39, - 0x3a,0x67,0x65,0x74,0x5f,0x70,0x65,0x65,0x72,0x73,0x31, - 0x3a,0x74,0x38,0x3a,0x3d,0xeb,0x0c,0xbf,0x0d,0x6a,0x0d, - 0xa5,0x31,0x3a,0x79,0x31,0x3a,0x71,0x65,0x87,0xa6,0x7d, - 0xe7 -}; +void CFlowGenListPerThread::start_stateless_simulation_file(std::string erf_file_name, + CPreviewMode &preview){ + m_preview_mode = preview; + m_node_gen.open_file(erf_file_name,&m_preview_mode); +} + +void CFlowGenListPerThread::stop_stateless_simulation_file(){ + m_node_gen.m_v_if->close_file(); +} + +void CFlowGenListPerThread::start_stateless_daemon_simulation(){ -void CFlowGenListPerThread::start_stateless_daemon(){ - m_stateless_dp_info->start(); + m_cur_time_sec = 0; + m_stateless_dp_info.run_once(); + +} + + +/* return true if we need to shedule next_stream, */ + +bool CFlowGenListPerThread::set_stateless_next_node( CGenNodeStateless * cur_node, + CGenNodeStateless * next_node){ + return ( m_stateless_dp_info.set_stateless_next_node(cur_node,next_node) ); +} + + +void CFlowGenListPerThread::start_stateless_daemon(CPreviewMode &preview){ + m_cur_time_sec = 0; + /* set per thread global info, for performance */ + m_preview_mode = preview; + m_node_gen.open_file("",&m_preview_mode); + + m_stateless_dp_info.start(); } @@ -3951,11 +4048,13 @@ void CFlowGenListPerThread::start_generate_stateful(std::string erf_file_name, #endif m_node_gen.flush_file(c_stop_sec,d_time_flow, false,this,old_offset); + + #ifdef VALG CALLGRIND_STOP_INSTRUMENTATION; printf (" %llu \n",os_get_hr_tick_64()-_start_time); #endif - if ( !CGlobalInfo::m_options.preview.getNoCleanFlowClose() ){ + if ( !CGlobalInfo::m_options.preview.getNoCleanFlowClose() && (is_terminated_by_master()==false) ){ /* clean close */ m_node_gen.flush_file(m_cur_time_sec, d_time_flow, true,this,old_offset); } @@ -3970,6 +4069,12 @@ void CFlowGenListPerThread::start_generate_stateful(std::string erf_file_name, m_node_gen.close_file(this); } +void CFlowGenList::Delete(){ + clean_p_thread_info(); + Clean(); + delete CPluginCallback::callback; +} + bool CFlowGenList::Create(){ check_objects_sizes(); @@ -4001,10 +4106,6 @@ void CFlowGenList::clean_p_thread_info(void){ } -void CFlowGenList::Delete(){ - clean_p_thread_info(); - Clean(); -} int CFlowGenList::load_from_mac_file(std::string file_name) { if ( !utl_is_file_exists (file_name) ){ @@ -4499,9 +4600,12 @@ void CTupleTemplateGenerator::Generate(){ #endif +static uint32_t get_rand_32(uint32_t MinimumRange, + uint32_t MaximumRange) __attribute__ ((unused)); + +static uint32_t get_rand_32(uint32_t MinimumRange, + uint32_t MaximumRange) { -static uint32_t get_rand_32(uint32_t MinimumRange , - uint32_t MaximumRange ){ enum {RANDS_NUM = 2 , RAND_MAX_BITS = 0xf , UNSIGNED_INT_BITS = 0x20 , TWO_BITS_MASK = 0x3}; const double TWO_POWER_32_BITS = 0x10000000 * (double)0x10; @@ -4535,24 +4639,54 @@ int CNullIF::send_node(CGenNode * node){ } -int CErfIF::send_node(CGenNode * node){ - if ( m_preview_mode->getFileWrite() ){ - CFlowPktInfo * lp=node->m_pkt_info; - rte_mbuf_t * m=lp->generate_new_mbuf(node); +void CErfIF::fill_raw_packet(rte_mbuf_t * m,CGenNode * node,pkt_dir_t dir){ fill_pkt(m_raw,m); + CPktNsecTimeStamp t_c(node->m_time); m_raw->time_nsec = t_c.m_time_nsec; m_raw->time_sec = t_c.m_time_sec; - - pkt_dir_t dir=node->cur_interface_dir(); uint8_t p_id = (uint8_t)dir; - m_raw->setInterface(p_id); +} + + +int CErfIFStl::send_node(CGenNode * _no_to_use){ + + if ( m_preview_mode->getFileWrite() ){ + + CGenNodeStateless * node_sl=(CGenNodeStateless *) _no_to_use; + + /* check that we have mbuf */ + rte_mbuf_t * m=node_sl->get_cache_mbuf(); + assert( m ); + pkt_dir_t dir=(pkt_dir_t)node_sl->get_mbuf_cache_dir(); + + fill_raw_packet(m,_no_to_use,dir); + BP_ASSERT(m_writer); + bool res=m_writer->write_packet(m_raw); + + + BP_ASSERT(res); + } + return (0); +} + + +int CErfIF::send_node(CGenNode * node){ + + if ( m_preview_mode->getFileWrite() ){ + + CFlowPktInfo * lp=node->m_pkt_info; + rte_mbuf_t * m=lp->generate_new_mbuf(node); + pkt_dir_t dir=node->cur_interface_dir(); + + fill_raw_packet(m,node,dir); /* update mac addr dest/src 12 bytes */ uint8_t *p=(uint8_t *)m_raw->raw; + int p_id=(int)dir; memcpy(p,CGlobalInfo::m_options.get_dst_src_mac_addr(p_id),12); /* If vlan is enabled, add vlan header */ @@ -4734,7 +4868,6 @@ void CCPortLatency::reset(){ static uint8_t nat_is_port_can_send(uint8_t port_id){ - uint8_t offset= ((port_id>>1)<<1); uint8_t client_index = (port_id %2); return (client_index ==0 ?1:0); } @@ -4856,7 +4989,7 @@ void CCPortLatency::dump_counters_json(std::string & json ){ } void CCPortLatency::DumpCounters(FILE *fd){ - #define DP_A1(f) if (f) fprintf(fd," %-40s : %llu \n",#f,f) + #define DP_A1(f) if (f) fprintf(fd," %-40s : %llu \n",#f, (unsigned long long)f) fprintf(fd," counter \n"); fprintf(fd," -----------\n"); @@ -4875,7 +5008,7 @@ void CCPortLatency::DumpCounters(FILE *fd){ fprintf(fd," -----------\n"); m_hist.Dump(fd); - fprintf(fd," %-40s : %llu \n","jitter",get_jitter_usec()); + fprintf(fd," %-40s : %lu \n","jitter", (ulong)get_jitter_usec()); } bool CCPortLatency::dump_packet(rte_mbuf_t * m){ @@ -4899,6 +5032,9 @@ bool CCPortLatency::dump_packet(rte_mbuf_t * m){ if ( unlikely( CGlobalInfo::m_options.preview.get_vlan_mode_enable() ) ){ vlan_offset=4; } + + (void)vlan_offset; + // utl_DumpBuffer(stdout,p,pkt_size,0); return (0); @@ -4934,9 +5070,7 @@ bool CCPortLatency::check_packet(rte_mbuf_t * m,CRx_check_header * & rx_p){ uint16_t vlan_offset=parser.m_vlan_offset; uint8_t *p=rte_pktmbuf_mtod(m, uint8_t*); - rx_p=(CRx_check_header *)0; - bool managed_by_ip_options=false; - bool is_rx_check=true; + rx_p = (CRx_check_header *)0; if ( !parser.IsLatencyPkt() ){ @@ -5053,7 +5187,7 @@ void CLatencyManager::Delete(){ static uint8_t swap_port(uint8_t port_id){ uint8_t offset= ((port_id>>1)<<1); uint8_t client_index = (port_id %2); - return (offset+client_index^1); + return (offset + (client_index ^ 1)); } @@ -5176,8 +5310,6 @@ void CLatencyManager::run_rx_queue_msgs(uint8_t thread_id, CGenNodeMsgBase * msg=(CGenNodeMsgBase *)node; - CGenNodeLatencyPktInfo * msg1=(CGenNodeLatencyPktInfo *)msg; - uint8_t msg_type = msg->m_msg_type; switch (msg_type ) { case CGenNodeMsgBase::LATENCY_PKT: @@ -5300,7 +5432,7 @@ void CLatencyManager::start(int iter){ } if ( iter>0 ){ if ( ( cnt>iter) ){ - printf("stop due iter %d %d \n",iter); + printf("stop due iter %d\n",iter); break; } } @@ -5469,8 +5601,8 @@ void CLatencyManager::DumpRxCheckVerification(FILE *fd, fprintf(fd," rx_checker is disabled \n"); return; } - fprintf(fd," rx_check Tx : %u \n",total_tx_rx_check); - fprintf(fd," rx_check Rx : %u \n",m_rx_check_manager.getTotalRx() ); + fprintf(fd," rx_check Tx : %llu \n", (unsigned long long)total_tx_rx_check); + fprintf(fd," rx_check Rx : %llu \n", (unsigned long long)m_rx_check_manager.getTotalRx() ); fprintf(fd," rx_check verification :" ); if (m_rx_check_manager.getTotalRx() == total_tx_rx_check) { fprintf(fd," OK \n" ); @@ -6723,7 +6855,6 @@ bool CSimplePacketParser::Parse(){ EthernetHeader *m_ether = (EthernetHeader *)p; IPHeader * ipv4=0; IPv6Header * ipv6=0; - uint16_t pkt_size=rte_pktmbuf_pkt_len(m); m_vlan_offset=0; m_option_offset=0; @@ -6775,6 +6906,29 @@ bool CSimplePacketParser::Parse(){ } +/* free the right object. + it is classic to use virtual function but we can't do it here and we don't even want to use callback function + as we want to save space and in most cases there is nothing to free. + this might be changed in the future + */ +void CGenNodeBase::free_base(){ + if ( m_type == FLOW_PKT ) { + CGenNode* p=(CGenNode*)this; + p->free_gen_node(); + return; + } + if (m_type==STATELESS_PKT) { + CGenNodeStateless* p=(CGenNodeStateless*)this; + p->free_stl_node(); + return; + } + if ( m_type == COMMAND ) { + CGenNodeCommand* p=(CGenNodeCommand*)this; + p->free_command(); + } + +} + diff --git a/src/bp_sim.h b/src/bp_sim.h index af084757..0da7fb99 100755 --- a/src/bp_sim.h +++ b/src/bp_sim.h @@ -62,6 +62,12 @@ limitations under the License. #undef NAT_TRACE_ +static inline double +usec_to_sec(double usec) { + return (usec / (1000 * 1000)); +} + + #define FORCE_NO_INLINE __attribute__ ((noinline)) #define MAX_LATENCY_PORTS 12 @@ -328,6 +334,9 @@ public: CVirtualIF (){ m_preview_mode =NULL; } + + virtual ~CVirtualIF(){ + } public: virtual int open_file(std::string file_name)=0; @@ -372,6 +381,15 @@ public: * @return */ virtual int update_mac_addr_from_global_cfg(pkt_dir_t dir, rte_mbuf_t *m)=0; + + /** + * translate a port_id to the correct dir on the core + * + */ + virtual pkt_dir_t port_id_to_dir(uint8_t port_id) { + return (CS_INVALID); + } + public: @@ -885,6 +903,8 @@ public: /* number of main active sockets. socket #0 is always used */ virtual socket_id_t max_num_active_sockets()=0; + virtual ~CPlatformSocketInfoBase() {} + public: /* which socket to allocate memory to each port */ virtual socket_id_t port_to_socket(port_id_t port)=0; @@ -1129,6 +1149,9 @@ public: class CGlobalInfo { public: static void init_pools(uint32_t rx_buffers); + /* for simulation */ + static void free_pools(); + static inline rte_mbuf_t * pktmbuf_alloc_small(socket_id_t socket){ return ( m_mem_pool[socket].pktmbuf_alloc_small() ); @@ -1328,8 +1351,8 @@ public: -#define DP(f) if (f) printf(" %-40s: %llu \n",#f,f) -#define DP_name(n,f) if (f) printf(" %-40s: %llu \n",n,f) +#define DP(f) if (f) printf(" %-40s: %llu \n",#f,(unsigned long long)f) +#define DP_name(n,f) if (f) printf(" %-40s: %llu \n",n,(unsigned long long)f) #define DP_S(f,f_s) if (f) printf(" %-40s: %s \n",#f,f_s.c_str()) @@ -1365,7 +1388,11 @@ public: FLOW_PKT_NAT =3, FLOW_SYNC =4, /* called evey 1 msec */ STATELESS_PKT =5, - EXIT_SCHED =6 + EXIT_SCHED =6, + COMMAND =7, + + EXIT_PORT_SCHED =8 + }; @@ -1421,6 +1448,7 @@ public: } + void free_base(); }; @@ -1455,6 +1483,9 @@ public: uint32_t m_dest_idx; uint32_t m_end_of_cache_line[6]; + +public: + void free_gen_node(); public: void Dump(FILE *fd); @@ -1641,6 +1672,8 @@ public: + + #if __x86_64__ /* size of 64 bytes */ #define DEFER_CLIENTS_NUM (16) @@ -1791,11 +1824,24 @@ public: virtual int flush_tx_queue(void); -private: +protected: + + void fill_raw_packet(rte_mbuf_t * m,CGenNode * node,pkt_dir_t dir); + CFileWriterBase * m_writer; CCapPktRaw * m_raw; }; +/* for stateless we have a small changes in case we send the packets for optimization */ +class CErfIFStl : public CErfIF { + +public: + + virtual int send_node(CGenNode * node); +}; + + + static inline int fill_pkt(CCapPktRaw * raw,rte_mbuf_t * m){ raw->pkt_len = m->pkt_len; char *p=raw->raw; @@ -1860,6 +1906,8 @@ public: public: void add_node(CGenNode * mynode); void remove_all(CFlowGenListPerThread * thread); + void remove_all_stateless(CFlowGenListPerThread * thread); + int open_file(std::string file_name, CPreviewMode * preview); int close_file(CFlowGenListPerThread * thread); @@ -1892,9 +1940,12 @@ private: return (m_v_if->send_node(node)); } int update_stats(CGenNode * node); - FORCE_NO_INLINE void handle_slow_messages(uint8_t type, - CGenNode * node, - CFlowGenListPerThread * thread, + int update_stl_stats(CGenNodeStateless *node_sl); + + + FORCE_NO_INLINE bool handle_slow_messages(uint8_t type, + CGenNode * node, + CFlowGenListPerThread * thread, bool always); @@ -2370,6 +2421,7 @@ public: return (uint32_t)((uintptr_t)( ((char *)l3.m_ipv4)-getBasePtr()) ); }else{ BP_ASSERT(0); + return (0); } } @@ -3358,6 +3410,13 @@ public: uint32_t max_threads); void Delete(); + void set_terminate_mode(bool is_terminate){ + m_terminated_by_master =is_terminate; + } + bool is_terminated_by_master(){ + return (m_terminated_by_master); + } + void set_vif(CVirtualIF * v_if){ m_node_gen.set_vif(v_if); } @@ -3397,7 +3456,18 @@ public : public: void Clean(); void start_generate_stateful(std::string erf_file_name,CPreviewMode &preview); - void start_stateless_daemon(); + void start_stateless_daemon(CPreviewMode &preview); + + void start_stateless_daemon_simulation(); + + /* open a file for simulation */ + void start_stateless_simulation_file(std::string erf_file_name,CPreviewMode &preview); + /* close a file for simulation */ + void stop_stateless_simulation_file(); + + /* return true if we need to shedule next_stream, */ + bool set_stateless_next_node( CGenNodeStateless * cur_node, + CGenNodeStateless * next_node); void Dump(FILE *fd); @@ -3471,6 +3541,7 @@ public: CNodeGenerator m_node_gen; public: uint32_t m_cur_template; + uint32_t m_non_active_nodes; /* the number of non active nodes -> nodes that try to stop somthing */ uint64_t m_cur_flow_id; double m_cur_time_sec; double m_stop_time_sec; @@ -3491,7 +3562,8 @@ private: flow_id_node_t m_flow_id_to_node_lookup; - TrexStatelessDpCore *m_stateless_dp_info; + TrexStatelessDpCore m_stateless_dp_info; + bool m_terminated_by_master; private: uint8_t m_cacheline_pad[RTE_CACHE_LINE_SIZE][19]; // improve prefech @@ -3506,7 +3578,10 @@ inline CGenNode * CFlowGenListPerThread::create_node(void){ return (res); } + + inline void CFlowGenListPerThread::free_node(CGenNode *p){ + p->free_base(); rte_mempool_sp_put(m_node_pool, p); } @@ -4020,6 +4095,8 @@ enum MINVM_PLUGIN_ID{ class CPluginCallback { public: + virtual ~CPluginCallback(){ + } virtual void on_node_first(uint8_t plugin_id,CGenNode * node,CFlowYamlInfo * template_info, CTupleTemplateGeneratorSmart * tuple_gen,CFlowGenListPerThread * flow_gen) =0; virtual void on_node_last(uint8_t plugin_id,CGenNode * node)=0; virtual rte_mbuf_t * on_node_generate_mbuf(uint8_t plugin_id,CGenNode * node,CFlowPktInfo * pkt_info)=0; diff --git a/src/common/Network/Packet/IPHeader.cpp b/src/common/Network/Packet/IPHeader.cpp index 3b90a1aa..c3363603 100755 --- a/src/common/Network/Packet/IPHeader.cpp +++ b/src/common/Network/Packet/IPHeader.cpp @@ -52,7 +52,7 @@ void IPHeader::dump(FILE *fd) { fprintf(fd, "\nIPHeader"); fprintf(fd, "\nSource 0x%.8lX, Destination 0x%.8lX, Protocol 0x%.1X", - getSourceIp(), getDestIp(), getProtocol()); + (ulong)getSourceIp(), (ulong)getDestIp(), (uint)getProtocol()); fprintf(fd, "\nTTL : %d, Id : 0x%.2X, Ver %d, Header Length %d, Total Length %d", getTimeToLive(), getId(), getVersion(), getHeaderLength(), getTotalLength()); if(isFragmented()) diff --git a/src/common/Network/Packet/TCPHeader.cpp b/src/common/Network/Packet/TCPHeader.cpp index bf28db2e..1826cef8 100755 --- a/src/common/Network/Packet/TCPHeader.cpp +++ b/src/common/Network/Packet/TCPHeader.cpp @@ -25,7 +25,7 @@ void TCPHeader::dump(FILE *fd) fprintf(fd, "\nSourcePort 0x%.4X, DestPort 0x%.4X", getSourcePort(), getDestPort()); fprintf(fd, "\nSeqNum 0x%.8lX, AckNum 0x%.8lX, Window %d", - getSeqNumber(), getAckNumber(), getWindowSize()); + (ulong)getSeqNumber(), (ulong)getAckNumber(), getWindowSize()); fprintf(fd, "\nHeader Length : %d, Checksum : 0x%.4X", getHeaderLength(), getChecksum()); fprintf(fd, "\nFlags : SYN - %d, FIN - %d, ACK - %d, URG - %d, RST - %d, PSH - %d", diff --git a/src/common/c_common.h b/src/common/c_common.h index d8320aaa..3e43644f 100755 --- a/src/common/c_common.h +++ b/src/common/c_common.h @@ -46,7 +46,7 @@ typedef void* c_pvoid; #ifdef _DEBUG #define BP_ASSERT(a) assert(a) #else - #define BP_ASSERT(a) + #define BP_ASSERT(a) (void (a)) #endif #endif diff --git a/src/gtest/rpc_test.cpp b/src/gtest/rpc_test.cpp index 6b8e3eff..34bb02a8 100644 --- a/src/gtest/rpc_test.cpp +++ b/src/gtest/rpc_test.cpp @@ -478,6 +478,7 @@ TEST_F(RpcTestOwned, add_remove_stream) { create_request(request, "get_stream", 1, 1); request["params"]["stream_id"] = 5; + request["params"]["get_pkt"] = true; send_request(request, response); @@ -501,6 +502,7 @@ TEST_F(RpcTestOwned, add_remove_stream) { create_request(request, "get_stream", 1, 1); request["params"]["stream_id"] = 5; + request["params"]["get_pkt"] = true; send_request(request, response); @@ -607,17 +609,21 @@ TEST_F(RpcTestOwned, start_stop_traffic) { /* start port 1 */ create_request(request, "start_traffic", 1, 1); + request["params"]["mul"] = 1.0; send_request(request, response); + EXPECT_EQ(response["result"], "ACK"); /* start port 3 */ create_request(request, "start_traffic", 1, 3); + request["params"]["mul"] = 1.0; send_request(request, response); EXPECT_EQ(response["result"], "ACK"); /* start not configured port */ create_request(request, "start_traffic", 1, 2); + request["params"]["mul"] = 1.0; send_request(request, response); EXPECT_EQ(response["error"]["code"], -32000); @@ -633,11 +639,13 @@ TEST_F(RpcTestOwned, start_stop_traffic) { /* start 1 again */ create_request(request, "start_traffic", 1, 1); + request["params"]["mul"] = 1.0; send_request(request, response); EXPECT_EQ(response["result"], "ACK"); /* start 1 twice (error) */ create_request(request, "start_traffic", 1, 1); + request["params"]["mul"] = 1.0; send_request(request, response); EXPECT_EQ(response["error"]["code"], -32000); @@ -657,3 +665,96 @@ TEST_F(RpcTestOwned, start_stop_traffic) { EXPECT_EQ(response["result"], "ACK"); } + + +TEST_F(RpcTestOwned, states_check) { + Json::Value request; + Json::Value response; + + /* add stream #1 */ + create_request(request, "add_stream", 1, 1); + request["params"]["stream_id"] = 5; + + Json::Value stream; + create_simple_stream(stream); + + request["params"]["stream"] = stream; + + send_request(request, response); + EXPECT_EQ(response["result"], "ACK"); + + /* start traffic */ + create_request(request, "start_traffic", 1, 1); + request["params"]["mul"] = 1.0; + send_request(request, response); + EXPECT_EQ(response["result"], "ACK"); + + /* now we cannot add streams */ + create_request(request, "add_stream", 1, 1); + request["params"]["stream_id"] = 15; + + create_simple_stream(stream); + + request["params"]["stream"] = stream; + + send_request(request, response); + EXPECT_EQ(response["error"]["code"], -32000); + + /* we cannot remove streams */ + create_request(request, "remove_stream", 1, 1); + request["params"]["stream_id"] = 15; + send_request(request, response); + EXPECT_EQ(response["error"]["code"], -32000); + + /* cannot start again */ + create_request(request, "start_traffic", 1, 1); + request["params"]["mul"] = 1.0; + send_request(request, response); + EXPECT_EQ(response["error"]["code"], -32000); + + /* we can stop and add stream / remove */ + + create_request(request, "stop_traffic", 1, 1); + send_request(request, response); + EXPECT_EQ(response["result"], "ACK"); + + create_request(request, "add_stream", 1, 1); + request["params"]["stream_id"] = 328; + + create_simple_stream(stream); + + request["params"]["stream"] = stream; + + send_request(request, response); + EXPECT_EQ(response["result"], "ACK"); + + + create_request(request, "remove_stream", 1, 1); + request["params"]["stream_id"] = 15; + send_request(request, response); + EXPECT_EQ(response["error"]["code"], -32000); + + /* we cannot pause now */ + create_request(request, "pause_traffic", 1, 1); + send_request(request, response); + EXPECT_EQ(response["error"]["code"], -32000); + + + /* start */ + create_request(request, "start_traffic", 1, 1); + request["params"]["mul"] = 1.0; + send_request(request, response); + EXPECT_EQ(response["result"], "ACK"); + + /* now can pause */ + create_request(request, "pause_traffic", 1, 1); + send_request(request, response); + EXPECT_EQ(response["result"], "ACK"); + + /* also we can resume*/ + create_request(request, "resume_traffic", 1, 1); + send_request(request, response); + EXPECT_EQ(response["result"], "ACK"); + + +} diff --git a/src/gtest/trex_stateless_gtest.cpp b/src/gtest/trex_stateless_gtest.cpp index 0341516c..8640e7db 100644 --- a/src/gtest/trex_stateless_gtest.cpp +++ b/src/gtest/trex_stateless_gtest.cpp @@ -1,5 +1,5 @@ /* - Hanoh Haim + Hanoch Haim Cisco Systems, Inc. */ @@ -22,332 +22,1651 @@ limitations under the License. #include "bp_sim.h" #include <common/gtest.h> #include <common/basic_utils.h> - +#include <trex_stateless_dp_core.h> +#include <trex_stateless_messaging.h> +#include <trex_streams_compiler.h> +#include <trex_stream_node.h> +#include <trex_stream.h> +#include <trex_stateless_port.h> +#include <trex_rpc_server_api.h> +#include <iostream> #define EXPECT_EQ_UINT32(a,b) EXPECT_EQ((uint32_t)(a),(uint32_t)(b)) -// one stream info with const packet , no VM -class CTRexDpStatelessVM { + +/* basic stateless test */ +class basic_stl : public testing::Test { + protected: + virtual void SetUp() { + } + virtual void TearDown() { + } +public: }; -//- add dump function -// - check one object -// create frame work -class CTRexDpStreamModeContinues{ -public: - void set_pps(double pps){ - m_pps=pps; - } - double get_pps(){ - return (m_pps); - } +/** + * Queue of RPC msgs for test + * + * @author hhaim + */ - void dump(FILE *fd); -private: - double m_pps; +class CBasicStl; + + +struct CBasicStlDelayCommand { + + CBasicStlDelayCommand(){ + m_node=NULL; + } + CGenNodeCommand * m_node; }; -void CTRexDpStreamModeContinues::dump(FILE *fd){ - fprintf (fd," pps : %f \n",m_pps); -} +class CBasicStlMsgQueue { + +friend CBasicStl; -class CTRexDpStreamModeSingleBurst{ public: - void set_pps(double pps){ - m_pps=pps; - } - double get_pps(){ - return (m_pps); + CBasicStlMsgQueue(){ } - void set_total_packets(uint64_t total_packets){ - m_total_packets =total_packets; + /* user will allocate the message, no need to free it by this module */ + void add_msg(TrexStatelessCpToDpMsgBase * msg){ + m_msgs.push_back(msg); } - uint64_t get_total_packets(){ - return (m_total_packets); + void add_command(CBasicStlDelayCommand & command){ + m_commands.push_back(command); } - void dump(FILE *fd); + /* only if both port are idle we can exit */ + void add_command(CFlowGenListPerThread * core, + TrexStatelessCpToDpMsgBase * msg, + double time){ -private: - double m_pps; - uint64_t m_total_packets; -}; + CGenNodeCommand *node = (CGenNodeCommand *)core->create_node() ; + node->m_type = CGenNode::COMMAND; -void CTRexDpStreamModeSingleBurst::dump(FILE *fd){ - fprintf (fd," pps : %f \n",m_pps); - fprintf (fd," total_packets : %llu \n",m_total_packets); -} + node->m_cmd = msg; + /* make sure it will be scheduled after the current node */ + node->m_time = time ; -class CTRexDpStreamModeMultiBurst{ -public: - void set_pps(double pps){ - m_pps=pps; - } - double get_pps(){ - return (m_pps); - } + CBasicStlDelayCommand command; + command.m_node =node; - void set_pkts_per_burst(uint64_t pkts_per_burst){ - m_pkts_per_burst =pkts_per_burst; + add_command(command); } - uint64_t get_pkts_per_burst(){ - return (m_pkts_per_burst); - } - void set_ibg(double ibg){ - m_ibg = ibg; + void clear(){ + m_msgs.clear(); + m_commands.clear(); } - double get_ibg(){ - return ( m_ibg ); + +protected: + std::vector<TrexStatelessCpToDpMsgBase *> m_msgs; + + std::vector<CBasicStlDelayCommand> m_commands; +}; + + + +class CBasicStlSink { + +public: + CBasicStlSink(){ + m_core=0; } + virtual void call_after_init(CBasicStl * m_obj)=0; + virtual void call_after_run(CBasicStl * m_obj)=0; - void set_number_of_bursts(uint32_t number_of_bursts){ - m_number_of_bursts = number_of_bursts; + CFlowGenListPerThread * m_core; +}; + + +/** + * handler for DP to CP messages + * + * @author imarom (19-Nov-15) + */ +class DpToCpHandler { +public: + virtual void handle(TrexStatelessDpToCpMsgBase *msg) = 0; +}; + +class CBasicStl { + +public: + + + CBasicStl(){ + m_time_diff=0.001; + m_threads=1; + m_dump_json=false; + m_dp_to_cp_handler = NULL; + m_msg = NULL; + m_sink = NULL; } - uint32_t get_number_of_bursts(){ - return (m_number_of_bursts); + + void flush_dp_to_cp_messages() { + + CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(0); + + while ( true ) { + CGenNode * node = NULL; + if (ring->Dequeue(node) != 0) { + break; + } + assert(node); + + TrexStatelessDpToCpMsgBase * msg = (TrexStatelessDpToCpMsgBase *)node; + if (m_dp_to_cp_handler) { + m_dp_to_cp_handler->handle(msg); + } + + delete msg; + } + } - void dump(FILE *fd); -private: - double m_pps; - double m_ibg; // inter burst gap - uint64_t m_pkts_per_burst; - uint32_t m_number_of_bursts; + bool init(void){ + + CErfIFStl erf_vif; + fl.Create(); + fl.generate_p_thread_info(1); + CFlowGenListPerThread * lpt; + + fl.m_threads_info[0]->set_vif(&erf_vif); + + CErfCmp cmp; + cmp.dump=1; + + CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp(); + + m_ring_from_cp = cp_dp->getRingCpToDp(0); + + + bool res=true; + + lpt=fl.m_threads_info[0]; + + if ( m_sink ){ + m_sink->m_core =lpt; + } + + char buf[100]; + char buf_ex[100]; + sprintf(buf,"%s-%d.erf",CGlobalInfo::m_options.out_file.c_str(),0); + sprintf(buf_ex,"%s-%d-ex.erf",CGlobalInfo::m_options.out_file.c_str(),0); + + lpt->start_stateless_simulation_file(buf,CGlobalInfo::m_options.preview); + + /* add stream to the queue */ + if ( m_msg ) { + assert(m_ring_from_cp->Enqueue((CGenNode *)m_msg)==0); + } + if (m_msg_queue.m_msgs.size()>0) { + for (auto msg : m_msg_queue.m_msgs) { + assert(m_ring_from_cp->Enqueue((CGenNode *)msg)==0); + } + } + + if (m_sink) { + m_sink->call_after_init(this); + } + + /* add the commands */ + if (m_msg_queue.m_commands.size()>0) { + for (auto cmd : m_msg_queue.m_commands) { + /* add commands nodes */ + lpt->m_node_gen.add_node((CGenNode *)cmd.m_node); + } + } + + lpt->start_stateless_daemon_simulation(); + + + //lpt->m_node_gen.DumpHist(stdout); + + cmp.d_sec = m_time_diff; + if ( cmp.compare(std::string(buf),std::string(buf_ex)) != true ) { + res=false; + } + + if ( m_dump_json ){ + printf(" dump json ...........\n"); + std::string s; + fl.m_threads_info[0]->m_node_gen.dump_json(s); + printf(" %s \n",s.c_str()); + } + + if (m_sink) { + m_sink->call_after_run(this); + } + + flush_dp_to_cp_messages(); + m_msg_queue.clear(); + + + fl.Delete(); + return (res); + } + +public: + int m_threads; + double m_time_diff; + bool m_dump_json; + DpToCpHandler *m_dp_to_cp_handler; + CBasicStlSink * m_sink; + + TrexStatelessCpToDpMsgBase * m_msg; + CNodeRing *m_ring_from_cp; + CBasicStlMsgQueue m_msg_queue; + CFlowGenList fl; }; -void CTRexDpStreamModeMultiBurst::dump(FILE *fd){ - fprintf (fd," pps : %f \n",m_pps); - fprintf (fd," total_packets : %llu \n",m_pkts_per_burst); - fprintf (fd," ibg : %f \n",m_ibg); - fprintf (fd," num_of_bursts : %llu \n",m_number_of_bursts); -} +class CPcapLoader { +public: + CPcapLoader(); + ~CPcapLoader(); -class CTRexDpStreamMode { public: - enum MODES { - moCONTINUES = 0x0, - moSINGLE_BURST = 0x1, - moMULTI_BURST = 0x2 - } ; - typedef uint8_t MODE_TYPE_t; + bool load_pcap_file(std::string file,int pkt_id=0); + void update_ip_src(uint32_t ip_addr); + void clone_packet_into_stream(TrexStream * stream); + void dump_packet(); - void reset(); +public: + bool m_valid; + CCapPktRaw m_raw; + CPacketIndication m_pkt_indication; +}; - void set_mode(MODE_TYPE_t mode ){ - m_type = mode; - } +CPcapLoader::~CPcapLoader(){ +} - MODE_TYPE_t get_mode(){ - return (m_type); +bool CPcapLoader::load_pcap_file(std::string cap_file,int pkt_id){ + m_valid=false; + CPacketParser parser; + + CCapReaderBase * lp=CCapReaderFactory::CreateReader((char *)cap_file.c_str(),0); + + if (lp == 0) { + printf(" ERROR file %s does not exist or not supported \n",(char *)cap_file.c_str()); + return false; } + int cnt=0; + bool found =false; + - CTRexDpStreamModeContinues & cont(void){ - return (m_data.m_cont); + while ( true ) { + /* read packet */ + if ( lp->ReadPacket(&m_raw) ==false ){ + break; + } + if (cnt==pkt_id) { + found = true; + break; + } + cnt++; } - CTRexDpStreamModeSingleBurst & single_burst(void){ - return (m_data.m_signle_burst); + if ( found ){ + if ( parser.ProcessPacket(&m_pkt_indication, &m_raw) ){ + m_valid = true; + } } - CTRexDpStreamModeMultiBurst & multi_burst(void){ - return (m_data.m_multi_burst); + delete lp; + return (m_valid); +} + +void CPcapLoader::update_ip_src(uint32_t ip_addr){ + + if ( m_pkt_indication.l3.m_ipv4 ) { + m_pkt_indication.l3.m_ipv4->setSourceIp(ip_addr); + m_pkt_indication.l3.m_ipv4->updateCheckSum(); } +} + +void CPcapLoader::clone_packet_into_stream(TrexStream * stream){ + + uint16_t pkt_size=m_raw.getTotalLen(); + + uint8_t *binary = new uint8_t[pkt_size]; + memcpy(binary,m_raw.raw,pkt_size); + stream->m_pkt.binary = binary; + stream->m_pkt.len = pkt_size; +} - void dump(FILE *fd); -private: - uint8_t m_type; - union Data { - CTRexDpStreamModeContinues m_cont; - CTRexDpStreamModeSingleBurst m_signle_burst; - CTRexDpStreamModeMultiBurst m_multi_burst; - } m_data; -}; -void CTRexDpStreamMode::reset(){ - m_type =CTRexDpStreamMode::moCONTINUES; - memset(&m_data,0,sizeof(m_data)); +CPcapLoader::CPcapLoader(){ + } - -void CTRexDpStreamMode::dump(FILE *fd){ - const char * table[3] = {"CONTINUES","SINGLE_BURST","MULTI_BURST"}; - - fprintf(fd," mode : %s \n", (char*)table[m_type]); - switch (m_type) { - case CTRexDpStreamMode::moCONTINUES : - cont().dump(fd); - break; - case CTRexDpStreamMode::moSINGLE_BURST : - single_burst().dump(fd); - break; - case CTRexDpStreamMode::moMULTI_BURST : - multi_burst().dump(fd); - break; - default: - fprintf(fd," ERROR type if not valid %d \n",m_type); - break; + +void CPcapLoader::dump_packet(){ + if (m_valid ) { + m_pkt_indication.Dump(stdout,1); + }else{ + fprintf(stdout," no packets were found \n"); } } +TEST_F(basic_stl, load_pcap_file) { + printf (" stateles %d \n",(int)sizeof(CGenNodeStateless)); + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + + //pcap.dump_packet(); +} -class CTRexDpStatelessStream { +class CBBStartPause0: public CBasicStlSink { public: - enum FLAGS_0{ - _ENABLE = 0, - _SELF_START = 1, - _VM_ENABLE =2, - _END_STREAM =-1 + + virtual void call_after_init(CBasicStl * m_obj); + virtual void call_after_run(CBasicStl * m_obj){ }; + uint8_t m_port_id; +}; - CTRexDpStatelessStream(){ - reset(); - } - void reset(){ - m_packet =0; - m_vm=0; - m_flags=0; - m_isg_sec=0.0; - m_next_stream = CTRexDpStatelessStream::_END_STREAM ; // END - m_mode.reset(); - } - void set_enable(bool enable){ - btSetMaskBit32(m_flags,_ENABLE,_ENABLE,enable?1:0); - } +void CBBStartPause0::call_after_init(CBasicStl * m_obj){ - bool get_enabled(){ - return (btGetMaskBit32(m_flags,_ENABLE,_ENABLE)?true:false); - } + TrexStatelessDpPause * lpPauseCmd = new TrexStatelessDpPause(m_port_id); + TrexStatelessDpResume * lpResumeCmd1 = new TrexStatelessDpResume(m_port_id); - void set_self_start(bool enable){ - btSetMaskBit32(m_flags,_SELF_START,_SELF_START,enable?1:0); - } + m_obj->m_msg_queue.add_command(m_core,lpPauseCmd, 5.0); /* command in delay of 5 sec */ + m_obj->m_msg_queue.add_command(m_core,lpResumeCmd1, 7.0);/* command in delay of 7 sec */ - bool get_self_start(bool enable){ - return (btGetMaskBit32(m_flags,_SELF_START,_SELF_START)?true:false); - } +} - /* if we don't have VM we could just replicate the mbuf and allocate it once */ - void set_vm_enable(bool enable){ - btSetMaskBit32(m_flags,_VM_ENABLE,_VM_ENABLE,enable?1:0); - } - bool get_vm_enabled(bool enable){ - return (btGetMaskBit32(m_flags,_VM_ENABLE,_VM_ENABLE)?true:false); - } +/* start/stop/stop back to back */ +TEST_F(basic_stl, basic_pause_resume0) { - void set_inter_stream_gap(double isg_sec){ - m_isg_sec =isg_sec; - } + CBasicStl t1; + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(7); + po->preview.setFileWrite(true); + po->out_file ="exp/stl_basic_pause_resume0"; - double get_inter_stream_gap(){ - return (m_isg_sec); - } + TrexStreamsCompiler compile; - CTRexDpStreamMode & get_mode(); + uint8_t port_id=0; + std::vector<TrexStream *> streams; - // CTRexDpStatelessStream::_END_STREAM for END - void set_next_stream(int32_t next_stream){ - m_next_stream =next_stream; - } + TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0); + stream1->set_pps(1.0); - int32_t get_next_stream(void){ - return ( m_next_stream ); - } + + stream1->m_enabled = true; + stream1->m_self_start = true; + stream1->m_port_id= port_id; - void dump(FILE *fd); -private: - char * m_packet; - CTRexDpStatelessVM * m_vm; - uint32_t m_flags; - double m_isg_sec; // in second - CTRexDpStreamMode m_mode; - int32_t m_next_stream; // next stream id -}; + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); -//- list of streams info with const packet , no VM -// - object that include the stream /scheduler/ packet allocation / need to create an object for one thread that works for test -// generate pcap file and compare it + // stream - clean -#if 0 -void CTRexDpStatelessStream::dump(FILE *fd){ + TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/); - fprintf(fd," enabled : %d \n",get_enabled()?1:0); - fprintf(fd," self_start : %d \n",get_self_start()?1:0); - fprintf(fd," vm : %d \n",get_vm_enabled()?1:0); - fprintf(" isg : %f \n",m_isg_sec); - m_mode.dump(fd); - if (m_next_stream == CTRexDpStatelessStream::_END_STREAM ) { - fprintf(fd," action : End of Stream \n"); - }else{ - fprintf(" next : %d \n",m_next_stream); - } + assert(compile.compile(streams, comp_obj) ); + + TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ ); + + t1.m_msg_queue.add_msg(lpStartCmd); + + + CBBStartPause0 sink; + sink.m_port_id = port_id; + t1.m_sink = &sink; + + bool res=t1.init(); + + delete stream1 ; + + EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; } +////////////////////////////////////////////////////////////// -class CTRexStatelessBasic { +class CBBStartStopDelay2: public CBasicStlSink { public: - CTRexStatelessBasic(){ - m_threads=1; - } - bool init(void){ - return (true); - } + virtual void call_after_init(CBasicStl * m_obj); + virtual void call_after_run(CBasicStl * m_obj){ + }; + uint8_t m_port_id; +}; + + + +void CBBStartStopDelay2::call_after_init(CBasicStl * m_obj){ + + TrexStatelessDpStop * lpStopCmd = new TrexStatelessDpStop(m_port_id); + TrexStatelessDpStop * lpStopCmd1 = new TrexStatelessDpStop(m_port_id); + + + TrexStreamsCompiler compile; + + uint8_t port_id=0; + + std::vector<TrexStream *> streams; + + TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0); + stream1->set_pps(1.0); + + + stream1->m_enabled = true; + stream1->m_self_start = true; + stream1->m_port_id= port_id; + + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000002); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + // stream - clean + + TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/); + + assert(compile.compile(streams, comp_obj) ); + + + /* start with different event id */ + TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(m_port_id, 1, comp_obj.clone(), 10.0 /*sec */ ); + + + m_obj->m_msg_queue.add_command(m_core,lpStopCmd, 5.0); /* command in delay of 5 sec */ + m_obj->m_msg_queue.add_command(m_core,lpStopCmd1, 7.0);/* command in delay of 7 sec */ + m_obj->m_msg_queue.add_command(m_core,lpStartCmd, 7.5);/* command in delay of 7 sec */ + + delete stream1 ; + + +} + + + +/* start/stop/stop back to back */ +TEST_F(basic_stl, single_pkt_bb_start_stop_delay2) { + + CBasicStl t1; + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(7); + po->preview.setFileWrite(true); + po->out_file ="exp/stl_bb_start_stop_delay2"; + + TrexStreamsCompiler compile; + + uint8_t port_id=0; + + std::vector<TrexStream *> streams; + + TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0); + stream1->set_pps(1.0); + + + stream1->m_enabled = true; + stream1->m_self_start = true; + stream1->m_port_id= port_id; + + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + + // stream - clean + + TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/); + + assert(compile.compile(streams, comp_obj) ); + + TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ ); + + t1.m_msg_queue.add_msg(lpStartCmd); + + + CBBStartStopDelay2 sink; + sink.m_port_id = port_id; + t1.m_sink = &sink; + + bool res=t1.init(); + + delete stream1 ; + + EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; +} + + + + + + + +class CBBStartStopDelay1: public CBasicStlSink { public: - bool m_threads; + + virtual void call_after_init(CBasicStl * m_obj); + virtual void call_after_run(CBasicStl * m_obj){ + }; + uint8_t m_port_id; }; -/* stateless basic */ -class dp_sl_basic : public testing::Test { - protected: - virtual void SetUp() { - } - virtual void TearDown() { - } + +void CBBStartStopDelay1::call_after_init(CBasicStl * m_obj){ + + TrexStatelessDpStop * lpStopCmd = new TrexStatelessDpStop(m_port_id); + TrexStatelessDpStop * lpStopCmd1 = new TrexStatelessDpStop(m_port_id); + + m_obj->m_msg_queue.add_command(m_core,lpStopCmd, 5.0); /* command in delay of 5 sec */ + m_obj->m_msg_queue.add_command(m_core,lpStopCmd1, 7.0);/* command in delay of 7 sec */ +} + + + +/* start/stop/stop back to back */ +TEST_F(basic_stl, single_pkt_bb_start_stop_delay1) { + + CBasicStl t1; + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(7); + po->preview.setFileWrite(true); + po->out_file ="exp/stl_bb_start_stop_delay1"; + + TrexStreamsCompiler compile; + + uint8_t port_id=0; + + std::vector<TrexStream *> streams; + + TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0); + stream1->set_pps(1.0); + + + stream1->m_enabled = true; + stream1->m_self_start = true; + stream1->m_port_id= port_id; + + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + + // stream - clean + + TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/); + + assert(compile.compile(streams, comp_obj) ); + + TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ ); + + t1.m_msg_queue.add_msg(lpStartCmd); + + + CBBStartStopDelay1 sink; + sink.m_port_id = port_id; + t1.m_sink = &sink; + + bool res=t1.init(); + + delete stream1 ; + + EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; +} + + +/* start/stop/stop back to back */ +TEST_F(basic_stl, single_pkt_bb_start_stop3) { + + CBasicStl t1; + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(7); + po->preview.setFileWrite(true); + po->out_file ="exp/stl_bb_start_stop3"; + + TrexStreamsCompiler compile; + + uint8_t port_id=0; + + std::vector<TrexStream *> streams; + + TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0); + stream1->set_pps(1.0); + + + stream1->m_enabled = true; + stream1->m_self_start = true; + stream1->m_port_id= port_id; + + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + + // stream - clean + + TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/); + + assert(compile.compile(streams, comp_obj) ); + + TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ ); + TrexStatelessDpStop * lpStopCmd = new TrexStatelessDpStop(port_id); + TrexStatelessDpStop * lpStopCmd1 = new TrexStatelessDpStop(port_id); + + + t1.m_msg_queue.add_msg(lpStartCmd); + t1.m_msg_queue.add_msg(lpStopCmd); + t1.m_msg_queue.add_msg(lpStopCmd1); + + bool res=t1.init(); + + delete stream1 ; + + EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; +} + + +TEST_F(basic_stl, single_pkt_bb_start_stop2) { + + CBasicStl t1; + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(7); + po->preview.setFileWrite(true); + po->out_file ="exp/stl_bb_start_stop2"; + + TrexStreamsCompiler compile; + + uint8_t port_id=0; + + std::vector<TrexStream *> streams; + + TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0); + stream1->set_pps(1.0); + + + stream1->m_enabled = true; + stream1->m_self_start = true; + stream1->m_port_id= port_id; + + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + + // stream - clean + + TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/); + + assert(compile.compile(streams, comp_obj) ); + + TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ ); + TrexStatelessDpStop * lpStopCmd = new TrexStatelessDpStop(port_id); + TrexStatelessDpStart * lpStartCmd1 = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ ); + + + t1.m_msg_queue.add_msg(lpStartCmd); + t1.m_msg_queue.add_msg(lpStopCmd); + t1.m_msg_queue.add_msg(lpStartCmd1); + + bool res=t1.init(); + + delete stream1 ; + + EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; +} + + + +/* back to back send start/stop */ +TEST_F(basic_stl, single_pkt_bb_start_stop) { + + CBasicStl t1; + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(7); + po->preview.setFileWrite(true); + po->out_file ="exp/stl_bb_start_stop"; + + TrexStreamsCompiler compile; + + uint8_t port_id=0; + + std::vector<TrexStream *> streams; + + TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0); + stream1->set_pps(1.0); + + + stream1->m_enabled = true; + stream1->m_self_start = true; + stream1->m_port_id= port_id; + + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + + // stream - clean + + TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/); + + assert(compile.compile(streams, comp_obj) ); + + TrexStatelessDpStart * lpStartCmd = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ ); + TrexStatelessDpStop * lpStopCmd = new TrexStatelessDpStop(port_id); + + + t1.m_msg_queue.add_msg(lpStartCmd); + t1.m_msg_queue.add_msg(lpStopCmd); + + bool res=t1.init(); + + delete stream1 ; + + EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; +} + + + + +TEST_F(basic_stl, simple_prog4) { + + CBasicStl t1; + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(7); + po->preview.setFileWrite(true); + po->out_file ="exp/stl_simple_prog4"; + + TrexStreamsCompiler compile; + + + std::vector<TrexStream *> streams; + + + /* stream0 */ + TrexStream * stream0 = new TrexStream(TrexStream::stCONTINUOUS, 0,300); + stream0->set_pps(1.0); + stream0->m_enabled = true; + stream0->m_self_start = true; + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000000); + pcap.clone_packet_into_stream(stream0); + streams.push_back(stream0); + + + /* stream1 */ + TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST, 0,100); + stream1->set_pps(1.0); + stream1->set_single_burst(5); + stream1->m_enabled = true; + stream1->m_self_start = true; + stream1->m_next_stream_id=200; + + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + + + /* stream1 */ + + TrexStream * stream2 = new TrexStream(TrexStream::stMULTI_BURST, 0,200); + stream2->set_pps(1.0); + stream2->m_isg_usec = 1000000; /*time betwean stream 1 to stream 2 */ + stream2->m_enabled = true; + stream2->m_self_start = false; + stream2->set_multi_burst(5, + 3, + 2000000.0); + + // next stream is 100 - loop + stream2->m_next_stream_id=100; + + + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000002); + pcap.clone_packet_into_stream(stream2); + streams.push_back(stream2); + + + TrexStreamsCompiledObj comp_obj(0,1.0); + + EXPECT_TRUE(compile.compile(streams, comp_obj) ); + + TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 20.0 ); + + + t1.m_msg = lpstart; + + bool res=t1.init(); + + delete stream0 ; + delete stream1 ; + delete stream2 ; + + EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; +} + + + +TEST_F(basic_stl, simple_prog3) { + + CBasicStl t1; + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(7); + po->preview.setFileWrite(true); + po->out_file ="exp/stl_simple_prog3"; + + TrexStreamsCompiler compile; + + + std::vector<TrexStream *> streams; + + /* stream1 */ + TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST, 0,100); + stream1->set_pps(1.0); + stream1->set_single_burst(5); + stream1->m_enabled = true; + stream1->m_self_start = true; + stream1->m_next_stream_id=200; + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + + + /* stream1 */ + + TrexStream * stream2 = new TrexStream(TrexStream::stMULTI_BURST, 0,200); + stream2->set_pps(1.0); + stream2->m_isg_usec = 1000000; /*time betwean stream 1 to stream 2 */ + stream2->m_enabled = true; + stream2->m_self_start = false; + stream2->set_multi_burst(5, + 3, + 2000000.0); + + // next stream is 100 - loop + stream2->m_next_stream_id=100; + + + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000002); + pcap.clone_packet_into_stream(stream2); + streams.push_back(stream2); + + + TrexStreamsCompiledObj comp_obj(0,1.0); + + EXPECT_TRUE(compile.compile(streams, comp_obj) ); + + TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 50.0 ); + + + t1.m_msg = lpstart; + + bool res=t1.init(); + + delete stream1 ; + delete stream2 ; + + EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; +} + + +TEST_F(basic_stl, simple_prog2) { + + CBasicStl t1; + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(7); + po->preview.setFileWrite(true); + po->out_file ="exp/stl_simple_prog2"; + + TrexStreamsCompiler compile; + + + std::vector<TrexStream *> streams; + + /* stream1 */ + TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST, 0,100); + stream1->set_pps(1.0); + stream1->set_single_burst(5); + stream1->m_enabled = true; + stream1->m_self_start = true; + stream1->m_next_stream_id=200; + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + + + /* stream1 */ + + TrexStream * stream2 = new TrexStream(TrexStream::stSINGLE_BURST, 0,200); + stream2->set_pps(1.0); + stream2->set_single_burst(5); + stream2->m_isg_usec = 2000000; /*time betwean stream 1 to stream 2 */ + stream2->m_enabled = true; + stream2->m_self_start = false; + + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000002); + pcap.clone_packet_into_stream(stream2); + streams.push_back(stream2); + + + TrexStreamsCompiledObj comp_obj(0,1.0); + + EXPECT_TRUE(compile.compile(streams, comp_obj) ); + + TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 10.0 ); + + + t1.m_msg = lpstart; + + bool res=t1.init(); + + delete stream1 ; + delete stream2 ; + + EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; +} + + + +TEST_F(basic_stl, simple_prog1) { + + CBasicStl t1; + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(7); + po->preview.setFileWrite(true); + po->out_file ="exp/stl_simple_prog1"; + + TrexStreamsCompiler compile; + + + std::vector<TrexStream *> streams; + + /* stream1 */ + TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST, 0,100); + stream1->set_pps(1.0); + stream1->set_single_burst(5); + stream1->m_enabled = true; + stream1->m_self_start = true; + stream1->m_next_stream_id=200; + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + + + /* stream1 */ + + TrexStream * stream2 = new TrexStream(TrexStream::stSINGLE_BURST, 0,200); + stream2->set_pps(1.0); + stream2->set_single_burst(5); + stream2->m_enabled = true; + stream2->m_self_start = false; + + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000002); + pcap.clone_packet_into_stream(stream2); + streams.push_back(stream2); + + + TrexStreamsCompiledObj comp_obj(0,1.0); + + EXPECT_TRUE(compile.compile(streams, comp_obj) ); + + TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 10.0 ); + + + t1.m_msg = lpstart; + + bool res=t1.init(); + + delete stream1 ; + delete stream2 ; + + EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; +} + + + +TEST_F(basic_stl, single_pkt_burst1) { + + CBasicStl t1; + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(7); + po->preview.setFileWrite(true); + po->out_file ="exp/stl_single_pkt_burst1"; + + TrexStreamsCompiler compile; + + + std::vector<TrexStream *> streams; + + TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST, 0,0); + stream1->set_pps(1.0); + stream1->set_single_burst(5); + stream1->m_enabled = true; + stream1->m_self_start = true; + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + + TrexStreamsCompiledObj comp_obj(0,1.0); + + assert(compile.compile(streams, comp_obj) ); + + TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 10.0 ); + + + t1.m_msg = lpstart; + + bool res=t1.init(); + + delete stream1 ; + + EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; +} + + + + +TEST_F(basic_stl, single_pkt) { + + CBasicStl t1; + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(7); + po->preview.setFileWrite(true); + po->out_file ="exp/stl_single_stream"; + + TrexStreamsCompiler compile; + + uint8_t port_id=0; + + std::vector<TrexStream *> streams; + + TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0); + stream1->set_pps(1.0); + + + stream1->m_enabled = true; + stream1->m_self_start = true; + stream1->m_port_id= port_id; + + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + + // stream - clean + + TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/); + + assert(compile.compile(streams, comp_obj) ); + + TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(port_id, 0, comp_obj.clone(), 10.0 /*sec */ ); + + + t1.m_msg = lpstart; + + bool res=t1.init(); + + delete stream1 ; + + EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; +} + + +TEST_F(basic_stl, multi_pkt1) { + + CBasicStl t1; + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(7); + po->preview.setFileWrite(true); + po->out_file ="exp/stl_multi_pkt1"; + + TrexStreamsCompiler compile; + + + std::vector<TrexStream *> streams; + + TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0); + stream1->set_pps(1.0); + + + stream1->m_enabled = true; + stream1->m_self_start = true; + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + + TrexStream * stream2 = new TrexStream(TrexStream::stCONTINUOUS,0,1); + stream2->set_pps(2.0); + + stream2->m_enabled = true; + stream2->m_self_start = true; + stream2->m_isg_usec = 1000.0; /* 1 msec */ + pcap.update_ip_src(0x20000001); + pcap.clone_packet_into_stream(stream2); + + streams.push_back(stream2); + + + // stream - clean + TrexStreamsCompiledObj comp_obj(0,1.0); + + assert(compile.compile(streams, comp_obj) ); + + TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 10 ); + + t1.m_msg = lpstart; + + bool res=t1.init(); + + delete stream1 ; + delete stream2 ; + + EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; +} + + + + + +/* check disabled stream with multiplier of 5*/ +TEST_F(basic_stl, multi_pkt2) { + + CBasicStl t1; + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(7); + po->preview.setFileWrite(true); + po->out_file ="exp/stl_multi_pkt2"; + + TrexStreamsCompiler compile; + + + std::vector<TrexStream *> streams; + + + TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,0); + stream1->set_pps(1.0); + + + stream1->m_enabled = true; + stream1->m_self_start = true; + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + + + TrexStream * stream2 = new TrexStream(TrexStream::stCONTINUOUS,0,1); + stream2->set_pps(2.0); + + stream2->m_enabled = false; + stream2->m_self_start = false; + stream2->m_isg_usec = 1000.0; /* 1 msec */ + pcap.update_ip_src(0x20000001); + pcap.clone_packet_into_stream(stream2); + + streams.push_back(stream2); + + + // stream - clean + TrexStreamsCompiledObj comp_obj(0,5.0); + + assert(compile.compile(streams, comp_obj) ); + + TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 10 ); + + t1.m_msg = lpstart; + + bool res=t1.init(); + + delete stream1 ; + delete stream2 ; + + EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; +} + + +TEST_F(basic_stl, multi_burst1) { + + CBasicStl t1; + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(7); + po->preview.setFileWrite(true); + po->out_file ="exp/stl_multi_burst1"; + + TrexStreamsCompiler compile; + + + std::vector<TrexStream *> streams; + + TrexStream * stream1 = new TrexStream(TrexStream::stMULTI_BURST,0,0); + stream1->set_pps(1.0); + stream1->set_multi_burst(5, + 3, + 2000000.0); + + stream1->m_enabled = true; + stream1->m_self_start = true; + + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + + TrexStreamsCompiledObj comp_obj(0,1.0); + + assert(compile.compile(streams, comp_obj) ); + + TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(0, 0, comp_obj.clone(), 40 ); + + + t1.m_msg = lpstart; + + bool res=t1.init(); + + delete stream1 ; + + EXPECT_EQ_UINT32(1, res?1:0)<< "pass"; +} + +/********************************************* Itay Tests Start *************************************/ + +/** + * check that continous stream does not point to another stream + * (makes no sense) + */ +TEST_F(basic_stl, compile_bad_1) { + + TrexStreamsCompiler compile; + std::vector<TrexStream *> streams; + + TrexStream * stream1 = new TrexStream(TrexStream::stCONTINUOUS,0,2); + stream1->m_enabled = true; + stream1->set_pps(52.0); + stream1->m_next_stream_id = 3; + + streams.push_back(stream1); + + TrexStreamsCompiledObj comp_obj(0,1.0); + + std::string err_msg; + EXPECT_FALSE(compile.compile(streams, comp_obj, &err_msg)); + + delete stream1; + +} + +/** + * check for streams pointing to non exsistant streams + * + * @author imarom (16-Nov-15) + */ +TEST_F(basic_stl, compile_bad_2) { + + TrexStreamsCompiler compile; + std::vector<TrexStream *> streams; + + TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST,0,1); + stream1->m_enabled = true; + stream1->set_pps(1.0); + stream1->set_single_burst(200); + + /* non existant next stream */ + stream1->m_next_stream_id = 5; + + TrexStream * stream2 = new TrexStream(TrexStream::stCONTINUOUS,0,2); + stream1->set_pps(52.0); + + streams.push_back(stream1); + streams.push_back(stream2); + + TrexStreamsCompiledObj comp_obj(0,1.0); + + std::string err_msg; + EXPECT_FALSE(compile.compile(streams, comp_obj, &err_msg)); + + delete stream1; + delete stream2; + +} + +/** + * check for "dead streams" in the mesh + * a streams that cannot be reached + * + * @author imarom (16-Nov-15) + */ +TEST_F(basic_stl, compile_bad_3) { + + TrexStreamsCompiler compile; + std::vector<TrexStream *> streams; + TrexStream *stream; + + /* stream 1 */ + stream = new TrexStream(TrexStream::stSINGLE_BURST, 0, 231); + stream->m_enabled = true; + stream->set_pps(1.0); + stream->set_single_burst(200); + + stream->m_next_stream_id = 5481; + stream->m_self_start = true; + + streams.push_back(stream); + + /* stream 2 */ + stream = new TrexStream(TrexStream::stCONTINUOUS, 0, 5481); + stream->m_enabled = true; + stream->m_next_stream_id = -1; + stream->m_self_start = false; + stream->set_pps(52.0); + + streams.push_back(stream); + + /* stream 3 */ + + stream = new TrexStream(TrexStream::stSINGLE_BURST, 0, 1928); + stream->m_enabled = true; + stream->set_pps(1.0); + stream->set_single_burst(200); + + stream->m_next_stream_id = -1; + stream->m_self_start = true; + + streams.push_back(stream); + + /* stream 4 */ + + stream = new TrexStream(TrexStream::stSINGLE_BURST, 0, 41231); + stream->m_enabled = true; + stream->set_pps(1.0); + stream->set_single_burst(200); + + stream->m_next_stream_id = 3928; + stream->m_self_start = false; + + streams.push_back(stream); + + /* stream 5 */ + + stream = new TrexStream(TrexStream::stSINGLE_BURST, 0, 3928); + stream->m_enabled = true; + stream->set_pps(1.0); + stream->set_single_burst(200); + + stream->m_next_stream_id = 41231; + stream->m_self_start = false; + + streams.push_back(stream); + + /* compile */ + TrexStreamsCompiledObj comp_obj(0,1.0); + + std::string err_msg; + EXPECT_FALSE(compile.compile(streams, comp_obj, &err_msg)); + + for (auto stream : streams) { + delete stream; + } + +} + +TEST_F(basic_stl, compile_with_warnings) { + + TrexStreamsCompiler compile; + std::vector<TrexStream *> streams; + TrexStream *stream; + + /* stream 1 */ + stream = new TrexStream(TrexStream::stSINGLE_BURST, 0, 231); + stream->m_enabled = true; + stream->set_pps(1.0); + stream->set_single_burst(200); + + stream->m_next_stream_id = 1928; + stream->m_self_start = true; + + streams.push_back(stream); + + /* stream 2 */ + stream = new TrexStream(TrexStream::stSINGLE_BURST, 0, 5481); + stream->m_enabled = true; + stream->m_next_stream_id = 1928; + stream->m_self_start = true; + stream->set_pps(52.0); + + streams.push_back(stream); + + /* stream 3 */ + + stream = new TrexStream(TrexStream::stSINGLE_BURST, 0, 1928); + stream->m_enabled = true; + stream->set_pps(1.0); + stream->set_single_burst(200); + + stream->m_next_stream_id = -1; + stream->m_self_start = true; + + streams.push_back(stream); + + + + /* compile */ + TrexStreamsCompiledObj comp_obj(0,1.0); + + std::string err_msg; + EXPECT_TRUE(compile.compile(streams, comp_obj, &err_msg)); + + EXPECT_TRUE(compile.get_last_compile_warnings().size() == 1); + + for (auto stream : streams) { + delete stream; + } + +} + + +TEST_F(basic_stl, compile_good_stream_id_compres) { + + TrexStreamsCompiler compile; + std::vector<TrexStream *> streams; + + TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST,0,700); + stream1->m_self_start = true; + stream1->m_enabled = true; + stream1->set_pps(1.0); + stream1->set_single_burst(200); + + /* non existant next stream */ + stream1->m_next_stream_id = 800; + + + TrexStream * stream2 = new TrexStream(TrexStream::stSINGLE_BURST,0,800); + stream2->set_pps(52.0); + stream2->m_enabled = true; + stream2->m_next_stream_id = 700; + stream2->set_single_burst(300); + + + streams.push_back(stream1); + streams.push_back(stream2); + + TrexStreamsCompiledObj comp_obj(0,1.0); + + std::string err_msg; + EXPECT_TRUE(compile.compile(streams, comp_obj, &err_msg)); + + printf(" %s \n",err_msg.c_str()); + + comp_obj.Dump(stdout); + + EXPECT_EQ_UINT32(comp_obj.get_objects()[0].m_stream->m_stream_id,0); + EXPECT_EQ_UINT32(comp_obj.get_objects()[0].m_stream->m_next_stream_id,1); + + EXPECT_EQ_UINT32(comp_obj.get_objects()[1].m_stream->m_stream_id,1); + EXPECT_EQ_UINT32(comp_obj.get_objects()[1].m_stream->m_next_stream_id,0); + + delete stream1; + delete stream2; + +} + + + +class DpToCpHandlerStopEvent: public DpToCpHandler { public: + DpToCpHandlerStopEvent(int event_id) { + m_event_id = event_id; + } + + virtual void handle(TrexStatelessDpToCpMsgBase *msg) { + /* first the message must be an event */ + TrexDpPortEventMsg *event = dynamic_cast<TrexDpPortEventMsg *>(msg); + EXPECT_TRUE(event != NULL); + EXPECT_TRUE(event->get_event_type() == TrexDpPortEvent::EVENT_STOP); + + EXPECT_TRUE(event->get_event_id() == m_event_id); + EXPECT_TRUE(event->get_port_id() == 0); + + } + +private: + int m_event_id; }; +TEST_F(basic_stl, dp_stop_event) { + CBasicStl t1; + CParserOption * po =&CGlobalInfo::m_options; + po->preview.setVMode(7); + po->preview.setFileWrite(true); + po->out_file ="exp/ignore"; + + TrexStreamsCompiler compile; + + uint8_t port_id=0; + + std::vector<TrexStream *> streams; + + TrexStream * stream1 = new TrexStream(TrexStream::stSINGLE_BURST,0,0); + stream1->set_pps(1.0); + stream1->set_single_burst(100); + + stream1->m_enabled = true; + stream1->m_self_start = true; + stream1->m_port_id= port_id; -TEST_F(dp_sl_basic, test1) { - CTRexDpStatelessStream s1; - s1.set_enable(true); - s1.set_self_start(true); - s1.set_inter_stream_gap(0.77); - s1.get_mode().set_mode(CTRexDpStreamMode::moCONTINUES); - s1.get_mode().cont().set_pps(100.2); - s1.dump(stdout); -} + CPcapLoader pcap; + pcap.load_pcap_file("cap2/udp_64B.pcap",0); + pcap.update_ip_src(0x10000001); + pcap.clone_packet_into_stream(stream1); + + streams.push_back(stream1); + // stream - clean + + TrexStreamsCompiledObj comp_obj(port_id, 1.0 /*mul*/); + + assert(compile.compile(streams, comp_obj) ); + + TrexStatelessDpStart * lpstart = new TrexStatelessDpStart(port_id, 17, comp_obj.clone(), 10.0 /*sec */ ); + + + t1.m_msg = lpstart; + + /* let me handle these */ + DpToCpHandlerStopEvent handler(17); + t1.m_dp_to_cp_handler = &handler; + + bool res=t1.init(); + EXPECT_EQ_UINT32(1, res?1:0); + + delete stream1 ; + +} -#endif +/********************************************* Itay Tests End *************************************/ diff --git a/src/gtest/tuple_gen_test.cpp b/src/gtest/tuple_gen_test.cpp index 8a774e38..f3b9fa1e 100755 --- a/src/gtest/tuple_gen_test.cpp +++ b/src/gtest/tuple_gen_test.cpp @@ -161,7 +161,6 @@ TEST(tuple_gen,clientPoolL) { 0,0); CTupleBase result; uint32_t result_src; - uint32_t result_dest; uint16_t result_port; for(int i=0;i<10;i++) { @@ -186,7 +185,6 @@ TEST(tuple_gen,clientPool) { 0,0); CTupleBase result; uint32_t result_src; - uint32_t result_dest; uint16_t result_port; for(int i=0;i<10;i++) { @@ -436,7 +434,6 @@ TEST(tuple_gen,template1) { template_1.GenerateTuple(result); uint32_t result_src = result.getClient(); uint32_t result_dest = result.getServer(); - uint16_t result_port = result.getClientPort(); //printf(" %x %x %x \n",result_src,result_dest,result_port); EXPECT_EQ(result_src, (uint32_t)(0x10000001+i)); EXPECT_EQ(result_dest, (uint32_t)(((0x12121212)) )); @@ -489,9 +486,6 @@ TEST(tuple_gen,no_free) { int i; for (i=0; i<65557; i++) { template_1.GenerateTuple(result); - uint32_t result_src = result.getClient(); - uint32_t result_dest = result.getServer(); - uint16_t result_port = result.getClientPort(); } // should have error EXPECT_TRUE((gen.getErrorAllocationCounter()>0)?true:false); @@ -514,8 +508,6 @@ TEST(tuple_gen,try_to_free) { int i; for (i=0; i<65557; i++) { template_1.GenerateTuple(result); - uint32_t result_src = result.getClient(); - uint32_t result_dest = result.getServer(); uint16_t result_port = result.getClientPort(); gen.FreePort(0,result.getClientId(),result_port); } diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h index 5c2d42d2..5890a965 100644 --- a/src/internal_api/trex_platform_api.h +++ b/src/internal_api/trex_platform_api.h @@ -23,6 +23,7 @@ limitations under the License. #define __TREX_PLATFORM_API_H__ #include <stdint.h> +#include <vector> /** * Global stats @@ -96,6 +97,7 @@ public: class TrexPlatformApi { public: + virtual void port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list) const = 0; virtual void get_global_stats(TrexPlatformGlobalStats &stats) const = 0; virtual void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const = 0; virtual uint8_t get_dp_core_count() const = 0; @@ -110,6 +112,7 @@ public: */ class TrexDpdkPlatformApi : public TrexPlatformApi { public: + void port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list) const; void get_global_stats(TrexPlatformGlobalStats &stats) const; void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const; uint8_t get_dp_core_count() const; @@ -122,6 +125,7 @@ public: */ class TrexMockPlatformApi : public TrexPlatformApi { public: + void port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list) const {} void get_global_stats(TrexPlatformGlobalStats &stats) const; void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const; uint8_t get_dp_core_count() const; diff --git a/src/main.cpp b/src/main.cpp index bd64c5a4..b633fce6 100755 --- a/src/main.cpp +++ b/src/main.cpp @@ -26,6 +26,8 @@ limitations under the License. #include <common/arg/SimpleGlob.h> #include <common/arg/SimpleOpt.h> +#include <stateless/cp/trex_stateless.h> + // An enum for all the option types enum { OPT_HELP, OPT_CFG, OPT_NODE_DUMP, OP_STATS, @@ -94,21 +96,20 @@ static int usage(){ int gtest_main(int argc, char **argv) ; -static int parse_options(int argc, char *argv[], CParserOption* po ) { +static int parse_options(int argc, char *argv[], CParserOption* po, bool & is_gtest ) { CSimpleOpt args(argc, argv, parser_options); int a=0; int node_dump=0; po->preview.clean(); po->preview.setFileWrite(true); - int res1; while ( args.Next() ){ if (args.LastError() == SO_SUCCESS) { switch (args.OptionId()) { case OPT_UT : - res1=gtest_main(argc, argv); - exit(res1); + is_gtest=true; + return (0); break; case OPT_HELP: usage(); @@ -214,11 +215,12 @@ void * thread_task(void *info){ char buf[100]; sprintf(buf,"my%d.erf",obj->thread_id); - volatile int i; lpt->start_generate_stateful(buf,*obj->preview_info); lpt->m_node_gen.DumpHist(stdout); printf("end thread %d \n",obj->thread_id); } + + return (NULL); } @@ -405,8 +407,6 @@ void update_tcp_seq_num(CCapFileFlowInfo * obj, int i; for (i=pkt_id+1; i<s; i++) { - uint32_t seq; - uint32_t ack; pkt=obj->GetPacket(i); tcp=pkt->m_pkt_indication.l4.m_tcp; @@ -490,7 +490,7 @@ int manipolate_capfile() { CCapFileFlowInfo flow_info; flow_info.Create(); - int res=flow_info.load_cap_file("avl/delay_10_rtsp_0.pcap",0,0); + flow_info.load_cap_file("avl/delay_10_rtsp_0.pcap",0,0); change_pkt_len(&flow_info,4-1 ,6); change_pkt_len(&flow_info,5-1 ,6); @@ -515,7 +515,7 @@ int manipolate_capfile_sip() { CCapFileFlowInfo flow_info; flow_info.Create(); - int res=flow_info.load_cap_file("avl/delay_10_sip_0.pcap",0,0); + flow_info.load_cap_file("avl/delay_10_sip_0.pcap",0,0); change_pkt_len(&flow_info,1-1 ,6+6); change_pkt_len(&flow_info,2-1 ,6+6); @@ -532,8 +532,8 @@ int manipolate_capfile_sip1() { CCapFileFlowInfo flow_info; flow_info.Create(); - int res=flow_info.load_cap_file("avl/delay_sip_0.pcap",0,0); - CFlowPktInfo * pkt=flow_info.GetPacket(1); + flow_info.load_cap_file("avl/delay_sip_0.pcap",0,0); + flow_info.GetPacket(1); change_pkt_len(&flow_info,1-1 ,6+6+10); @@ -569,7 +569,7 @@ public: void CMergeCapFileRec::Dump(FILE *fd,int _id){ - double time; + double time = 0.0; bool stop=GetCurPacket(time); fprintf (fd," id:%2d stop : %d index:%4d %3.4f \n",_id,stop?1:0,m_index,time); } @@ -620,6 +620,8 @@ bool CMergeCapFileRec::Create(std::string cap_file, m_limit_number_of_packets =0; m_start_time = pkt->m_packet->get_time() ; m_offset = offset; + + return (true); } @@ -663,12 +665,12 @@ bool CMergeCapFile::run_merge(std::string to_cap_file){ int min_index=0; double min_time; - fprintf(stdout," --------------\n",cnt); + fprintf(stdout," --------------\n"); fprintf(stdout," pkt : %d \n",cnt); for (i=0; i<MERGE_CAP_FILES; i++) { m[i].Dump(stdout,i); } - fprintf(stdout," --------------\n",cnt); + fprintf(stdout," --------------\n"); bool valid = false; for (i=0; i<MERGE_CAP_FILES; i++) { @@ -702,6 +704,8 @@ bool CMergeCapFile::run_merge(std::string to_cap_file){ }; m_results.save_to_erf(to_cap_file,1); + + return (true); } @@ -746,18 +750,51 @@ int merge_2_cap_files_sip() { return (0); } +static TrexStateless *g_trex_stateless; + + +TrexStateless * get_stateless_obj() { + return g_trex_stateless; +} + +extern "C" const char * get_build_date(void){ + return (__DATE__); +} + +extern "C" const char * get_build_time(void){ + return (__TIME__ ); +} + + + int main(int argc , char * argv[]){ + int res=0; time_init(); CGlobalInfo::m_socket.Create(0); - CGlobalInfo::init_pools(1000); assert( CMsgIns::Ins()->Create(4) ); - if ( parse_options(argc, argv, &CGlobalInfo::m_options ) != 0){ + + bool is_gtest=false; + + if ( parse_options(argc, argv, &CGlobalInfo::m_options , is_gtest) != 0){ exit(-1); } - return (load_list_of_cap_files(&CGlobalInfo::m_options)); + + if ( is_gtest ) { + res = gtest_main(argc, argv); + }else{ + res = load_list_of_cap_files(&CGlobalInfo::m_options); + } + + CMsgIns::Ins()->Free(); + CGlobalInfo::free_pools(); + CGlobalInfo::m_socket.Delete(); + + + return (res); + } diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index d4e07ef2..b1c9ed12 100755 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -58,6 +58,8 @@ limitations under the License. #include <stateless/cp/trex_stateless.h> #include <stateless/dp/trex_stream_node.h> +#include <publisher/trex_publisher.h> +#include <stateless/messaging/trex_stateless_messaging.h> #include <../linux_dpdk/version.h> @@ -551,19 +553,17 @@ static int usage(){ printf(" --mac [file] : YAML file with <client ip, mac addr> configuration \n"); printf(" \n\n"); - printf(" -r : realtime enable \n"); - printf(" \n\n"); - printf(" -c [number of cores] : 1 ,2,3,4,5 numnber of dual cores + master 1 means 1 master and 2 cores \n"); + printf(" -c [number of threads] : default is 1. number of threads to allocate for each dual ports. \n"); printf(" \n"); - printf(" -s : run only one data path core\n"); + printf(" -s : run only one data path core. for debug\n"); printf(" \n"); - printf(" --flip : flow will be sent from client->server and server->client for maximum throughput \n"); + printf(" --flip : flow will be sent from client->server and server->client for maximum throughput \n"); printf(" \n"); - printf(" -p : flow-flip , send all packets flow from the same interface base of client ip \n"); + printf(" -p : flow-flip , send all flow packets from the same interface base of client ip \n"); printf(" -e : like -p but comply to the generator rules \n"); printf(" \n"); - printf(" -l [pkt/sec] : run laterncy daemon in this rate \n"); + printf(" -l [pkt/sec] : run latency daemon in this rate \n"); printf(" e.g -l 1000 run 1000 pkt/sec from each interface , zero mean to disable latency check \n"); printf(" --lm : latency mask \n"); printf(" 0x1 only port 0 will send traffic \n"); @@ -571,20 +571,18 @@ static int usage(){ printf(" \n"); - printf(" --limit-ports : limit number of ports , must be even e.g 2,4 \n"); + printf(" --limit-ports : limit number of ports, must be even e.g. 2,4 \n"); printf(" \n"); - printf(" --nc : if set will not close all the flow , faster \n"); + printf(" --nc : If set, will not wait for all the flows to be closed, terminate faster- see manual for more information \n"); printf(" \n"); - printf(" -d : duration of the test in sec \n"); + printf(" -d : duration of the test in sec. look for --nc \n"); printf(" \n"); - printf(" -pm : platform factor , in case you have splitter in the setup you can multiply the total results in this factor \n"); + printf(" -pm : platform factor ,in case you have splitter in the setup you can multiply the total results in this factor \n"); printf(" e.g --pm 2.0 will multiply all the results bps in this factor \n"); printf(" \n"); printf(" -pubd : disable monitors publishers \n"); - printf(" -m : factor of bandwidth \n"); - printf(" \n"); - printf(" -1g : 1G trex \n"); + printf(" -m : factor of bandwidth \n"); printf(" \n"); printf(" -k [sec] : run latency test before starting the test. it will wait for x sec sending packet and x sec after that \n"); printf(" \n"); @@ -594,7 +592,7 @@ static int usage(){ printf(" you can copy this file to /etc/trex_cfg.yaml \n"); printf(" \n"); - printf(" --ipv6 : work in ipv6 mode \n"); + printf(" --ipv6 : work in ipv6 mode\n"); printf(" --learn : Work in NAT environments, learn the dynamic NAT translation and ALG \n"); printf(" --learn-verify : Learn the translation, but intended for verification of the mechanism in cases that NAT does not exist \n"); @@ -609,17 +607,17 @@ static int usage(){ printf(" Warning : This program can generate huge-files (TB ) watch out! try this only on local drive \n"); printf(" \n"); printf(" \n"); - printf(" --rx-check [sample] : enable rx check thread , using this thread we sample flows 1/sample and check order,latency and more \n"); + printf(" --rx-check [sample] : enable rx check thread, using this thread we sample flows 1/sample and check order,latency and more \n"); printf(" this feature consume another thread \n"); printf(" \n"); - printf(" --hops [hops] : If rx check is enabled, the hop number can be assigned. The default number of hops is 1\n"); - printf(" --iom [mode] : io mode for interactive mode [0- silent, 1- normal , 2- short] \n"); + printf(" --hops [hops] : If rx check is enabled, the hop number can be assigned. The default number of hops is 1\n"); + printf(" --iom [mode] : io mode for interactive mode [0- silent, 1- normal , 2- short] \n"); printf(" this feature consume another thread \n"); printf(" \n"); - printf(" --no-key : daemon mode, don't get input from keyboard \n"); - printf(" --no-flow-control : In default TRex disables flow-control using this flag it does not touch it \n"); - printf(" --prefix : for multi trex, each instance should have a different name \n"); - printf(" --mac-spread : Spread the destination mac-order by this factor. e.g 2 will generate the traffic to 2 devices DEST-MAC ,DEST-MAC+1 \n"); + printf(" --no-key : daemon mode, don't get input from keyboard \n"); + printf(" --no-flow-control : In default TRex disables flow-control using this flag it does not touch it \n"); + printf(" --prefix : for multi trex, each instance should have a different name \n"); + printf(" --mac-spread : Spread the destination mac-order by this factor. e.g 2 will generate the traffic to 2 devices DEST-MAC ,DEST-MAC+1 \n"); printf(" maximum is up to 128 devices \n"); @@ -630,7 +628,7 @@ static int usage(){ printf(" \n"); printf(" -o [capfile_name] simulate trex into pcap file \n"); printf(" --pcap export the file in pcap mode \n"); - printf(" t-rex-64 -d 10 -f cfg.yaml -o my.pcap --pcap # export 10 sec of what Trex will do on real-time to a file my.pcap \n"); + printf(" bp-sim-64 -d 10 -f cfg.yaml -o my.pcap --pcap # export 10 sec of what Trex will do on real-time to a file my.pcap \n"); printf(" --vm-sim : simulate vm with driver of one input queue and one output queue \n"); printf(" \n"); printf(" Examples: "); @@ -688,12 +686,13 @@ static int parse_options(int argc, char *argv[], CParserOption* po, bool first_t CSimpleOpt args(argc, argv, parser_options); bool latency_was_set=false; + (void)latency_was_set; + int a=0; int node_dump=0; po->preview.setFileWrite(true); po->preview.setRealTime(true); - int res1; uint32_t tmp_data; po->m_run_mode = CParserOption::RUN_MODE_INVALID; @@ -891,7 +890,7 @@ static int parse_options(int argc, char *argv[], CParserOption* po, bool first_t } if (po->preview.get_is_rx_check_enable() && ( po->is_latency_disabled() ) ) { - printf(" rx check must be enable with latency check. try adding '-l 1000' \n"); + printf(" rx check must be enabled with latency check. try adding '-l 1000' \n"); return -1; } @@ -904,7 +903,7 @@ static int parse_options(int argc, char *argv[], CParserOption* po, bool first_t uint32_t cores=po->preview.getCores(); if ( cores > ((BP_MAX_CORES)/2-1) ) { - printf(" ERROR maximum cores are : %d \n",((BP_MAX_CORES)/2-1)); + printf(" ERROR maximum supported cores are : %d \n",((BP_MAX_CORES)/2-1)); return -1; } @@ -951,9 +950,9 @@ static int parse_options(int argc, char *argv[], CParserOption* po, bool first_t int main_test(int argc , char * argv[]); -static const char * default_argv[] = {"xx","-c", "0x7", "-n","2","-b","0000:0b:01.01"}; -static int argv_num = 7; - +//static const char * default_argv[] = {"xx","-c", "0x7", "-n","2","-b","0000:0b:01.01"}; +//static int argv_num = 7; + #define RX_PTHRESH 8 /**< Default values of RX prefetch threshold reg. */ @@ -1137,8 +1136,8 @@ void CPhyEthIFStats::Clear(){ void CPhyEthIFStats::DumpAll(FILE *fd){ - #define DP_A4(f) printf(" %-40s : %llu \n",#f,f) - #define DP_A(f) if (f) printf(" %-40s : %llu \n",#f,f) + #define DP_A4(f) printf(" %-40s : %llu \n",#f, (unsigned long long)f) + #define DP_A(f) if (f) printf(" %-40s : %llu \n",#f, (unsigned long long)f) DP_A4(opackets); DP_A4(obytes); DP_A4(ipackets); @@ -1177,7 +1176,7 @@ public: m_port_id = portid; m_last_rx_rate = 0.0; m_last_tx_rate = 0.0; - m_last_pps=0.0; + m_last_tx_pps = 0.0; return (true); } void Delete(); @@ -1253,8 +1252,12 @@ public: return (m_last_rx_rate); } - float get_last_pps_rate(){ - return (m_last_pps); + float get_last_tx_pps_rate(){ + return (m_last_tx_pps); + } + + float get_last_rx_pps_rate(){ + return (m_last_rx_pps); } CPhyEthIFStats & get_stats(){ @@ -1307,12 +1310,14 @@ private: CBwMeasure m_bw_tx; CBwMeasure m_bw_rx; CPPSMeasure m_pps_tx; + CPPSMeasure m_pps_rx; CPhyEthIFStats m_stats; - float m_last_rx_rate; float m_last_tx_rate; - float m_last_pps; + float m_last_rx_rate; + float m_last_tx_pps; + float m_last_rx_pps; public: struct rte_eth_dev_info m_dev_info; }; @@ -1607,10 +1612,6 @@ void CPhyEthIF::macaddr_get(struct ether_addr *mac_addr){ void CPhyEthIF::get_stats_1g(CPhyEthIFStats *stats){ - int i; - uint64_t t=0; - - stats->ipackets += pci_reg_read(E1000_GPRC) ; stats->ibytes += (pci_reg_read(E1000_GORCL) ); @@ -1648,7 +1649,8 @@ void CPhyEthIF::get_stats_1g(CPhyEthIFStats *stats){ m_last_tx_rate = m_bw_tx.add(stats->obytes); m_last_rx_rate = m_bw_rx.add(stats->ibytes); - m_last_pps = m_pps_tx.add(stats->opackets); + m_last_tx_pps = m_pps_tx.add(stats->opackets); + m_last_rx_pps = m_pps_rx.add(stats->ipackets); } @@ -1658,14 +1660,15 @@ void CPhyEthIF::get_stats(CPhyEthIFStats *stats){ m_last_tx_rate = m_bw_tx.add(stats->obytes); m_last_rx_rate = m_bw_rx.add(stats->ibytes); - m_last_pps = m_pps_tx.add(stats->opackets); + m_last_tx_pps = m_pps_tx.add(stats->opackets); + m_last_rx_pps = m_pps_rx.add(stats->ipackets); } void dump_hw_state(FILE *fd,struct ixgbe_hw_stats *hs ){ - #define DP_A1(f) if (hs->f) fprintf(fd," %-40s : %llu \n",#f,hs->f) - #define DP_A2(f,m) for (i=0;i<m; i++) { if (hs->f[i]) fprintf(fd," %-40s[%d] : %llu \n",#f,i,hs->f[i]); } + #define DP_A1(f) if (hs->f) fprintf(fd," %-40s : %llu \n",#f, (unsigned long long)hs->f) + #define DP_A2(f,m) for (i=0;i<m; i++) { if (hs->f[i]) fprintf(fd," %-40s[%d] : %llu \n",#f,i, (unsigned long long)hs->f[i]); } int i; //for (i=0;i<8; i++) { if (hs->mpc[i]) fprintf(fd," %-40s[%d] : %llu \n","mpc",i,hs->mpc[i]); } @@ -1849,6 +1852,7 @@ public: virtual int update_mac_addr_from_global_cfg(pkt_dir_t dir, rte_mbuf_t *m); + virtual pkt_dir_t port_id_to_dir(uint8_t port_id); public: void GetCoreCounters(CVirtualIFPerSideStats *stats); @@ -1861,6 +1865,10 @@ public: return ( CGlobalInfo::m_socket.port_to_socket( m_ports[0].m_port->get_port_id() ) ); } + const CCorePerPort * get_ports() { + return m_ports; + } + protected: int send_burst(CCorePerPort * lp_port, @@ -2087,6 +2095,8 @@ int CCoreEthIF::send_burst(CCorePerPort * lp_port, rte_pktmbuf_free(m); } } + + return (0); } @@ -2107,6 +2117,8 @@ int CCoreEthIF::send_pkt(CCorePerPort * lp_port, len = 0; } lp_port->m_len = len; + + return (0); } @@ -2265,7 +2277,17 @@ int CCoreEthIF::update_mac_addr_from_global_cfg(pkt_dir_t dir, return (0); } +pkt_dir_t +CCoreEthIF::port_id_to_dir(uint8_t port_id) { + + for (pkt_dir_t dir = 0; dir < CS_NUM; dir++) { + if (m_ports[dir].m_port->get_port_id() == port_id) { + return dir; + } + } + return (CS_INVALID); +} class CLatencyHWPort : public CPortLatencyHWBase { public: @@ -2377,71 +2399,6 @@ private: }; -class CZMqPublisher { -public: - CZMqPublisher(){ - m_context=0; - m_publisher=0; - } - - bool Create(uint16_t port,bool disable); - void Delete(); - void publish_json(std::string & s); -private: - void show_zmq_last_error(char *s); -private: - void * m_context; - void * m_publisher; -}; - -void CZMqPublisher::show_zmq_last_error(char *s){ - printf(" ERROR %s \n",s); - printf(" ZMQ: %s",zmq_strerror (zmq_errno ())); - exit(-1); -} - - -bool CZMqPublisher::Create(uint16_t port,bool disable){ - - if (disable) { - return(true); - } - m_context = zmq_ctx_new (); - if ( m_context == 0 ) { - show_zmq_last_error((char *)"can't connect to ZMQ library"); - } - m_publisher = zmq_socket (m_context, ZMQ_PUB); - if ( m_context == 0 ) { - show_zmq_last_error((char *)"can't create ZMQ socket"); - } - char buffer[100]; - sprintf(buffer,"tcp://*:%d",port); - int rc=zmq_bind (m_publisher, buffer); - if (rc != 0 ) { - sprintf(buffer,"can't bind to ZMQ socket %d",port); - show_zmq_last_error(buffer); - } - printf("zmq publisher at: %s \n",buffer); - return (true); -} - - -void CZMqPublisher::Delete(){ - if (m_publisher) { - zmq_close (m_publisher); - } - if (m_context) { - zmq_ctx_destroy (m_context); - } -} - - -void CZMqPublisher::publish_json(std::string & s){ - if ( m_publisher ){ - int size = zmq_send (m_publisher, s.c_str(), s.length(), 0); - assert(size==s.length()); - } -} class CPerPortStats { public: @@ -2453,6 +2410,10 @@ public: uint64_t oerrors; float m_total_tx_bps; + float m_total_tx_pps; + + float m_total_rx_bps; + float m_total_rx_pps; }; class CGlobalStats { @@ -2489,6 +2450,7 @@ public: float m_tx_bps; float m_rx_bps; float m_tx_pps; + float m_rx_pps; float m_tx_cps; float m_tx_expected_cps; float m_tx_expected_pps; @@ -2521,7 +2483,7 @@ std::string CGlobalStats::get_field(std::string name,float &f){ std::string CGlobalStats::get_field(std::string name,uint64_t &f){ char buff[200]; - sprintf(buff,"\"%s\":%llu,",name.c_str(),f); + sprintf(buff,"\"%s\":%llu,",name.c_str(), (unsigned long long)f); return (std::string(buff)); } @@ -2533,7 +2495,7 @@ std::string CGlobalStats::get_field_port(int port,std::string name,float &f){ std::string CGlobalStats::get_field_port(int port,std::string name,uint64_t &f){ char buff[200]; - sprintf(buff,"\"%s-%d\":%llu,",name.c_str(),port,f); + sprintf(buff,"\"%s-%d\":%llu,",name.c_str(),port, (unsigned long long)f); return (std::string(buff)); } @@ -2549,6 +2511,7 @@ void CGlobalStats::dump_json(std::string & json){ json+=GET_FIELD(m_tx_bps); json+=GET_FIELD(m_rx_bps); json+=GET_FIELD(m_tx_pps); + json+=GET_FIELD(m_rx_pps); json+=GET_FIELD(m_tx_cps); json+=GET_FIELD(m_tx_expected_cps); json+=GET_FIELD(m_tx_expected_pps); @@ -2583,6 +2546,9 @@ void CGlobalStats::dump_json(std::string & json){ json+=GET_FIELD_PORT(i,ierrors) ; json+=GET_FIELD_PORT(i,oerrors) ; json+=GET_FIELD_PORT(i,m_total_tx_bps); + json+=GET_FIELD_PORT(i,m_total_tx_pps); + json+=GET_FIELD_PORT(i,m_total_rx_bps); + json+=GET_FIELD_PORT(i,m_total_rx_pps); } json+=m_template.dump_as_json("template"); json+="\"unknown\":0}}" ; @@ -2602,7 +2568,7 @@ void CGlobalStats::DumpAllPorts(FILE *fd){ fprintf (fd," Platform_factor : %2.1f \n",m_platform_factor); fprintf (fd," Total-Tx : %s ",double_to_human_str(m_tx_bps,"bps",KBYE_1000).c_str()); if ( CGlobalInfo::is_learn_mode() ) { - fprintf (fd," Nat_time_out : %8llu \n",m_total_nat_time_out); + fprintf (fd," Nat_time_out : %8llu \n", (unsigned long long)m_total_nat_time_out); }else{ fprintf (fd,"\n"); } @@ -2610,49 +2576,52 @@ void CGlobalStats::DumpAllPorts(FILE *fd){ fprintf (fd," Total-Rx : %s ",double_to_human_str(m_rx_bps,"bps",KBYE_1000).c_str()); if ( CGlobalInfo::is_learn_mode() ) { - fprintf (fd," Nat_no_fid : %8llu \n",m_total_nat_no_fid); + fprintf (fd," Nat_no_fid : %8llu \n", (unsigned long long)m_total_nat_no_fid); }else{ fprintf (fd,"\n"); } fprintf (fd," Total-PPS : %s ",double_to_human_str(m_tx_pps,"pps",KBYE_1000).c_str()); if ( CGlobalInfo::is_learn_mode() ) { - fprintf (fd," Total_nat_active: %8llu \n",m_total_nat_active); + fprintf (fd," Total_nat_active: %8llu \n", (unsigned long long)m_total_nat_active); }else{ fprintf (fd,"\n"); } fprintf (fd," Total-CPS : %s ",double_to_human_str(m_tx_cps,"cps",KBYE_1000).c_str()); if ( CGlobalInfo::is_learn_mode() ) { - fprintf (fd," Total_nat_open : %8llu \n",m_total_nat_open); + fprintf (fd," Total_nat_open : %8llu \n", (unsigned long long)m_total_nat_open); }else{ fprintf (fd,"\n"); } fprintf (fd,"\n"); fprintf (fd," Expected-PPS : %s ",double_to_human_str(m_tx_expected_pps,"pps",KBYE_1000).c_str()); if ( CGlobalInfo::is_learn_verify_mode() ) { - fprintf (fd," Nat_learn_errors: %8llu \n",m_total_nat_learn_error); + fprintf (fd," Nat_learn_errors: %8llu \n", (unsigned long long)m_total_nat_learn_error); }else{ fprintf (fd,"\n"); } fprintf (fd," Expected-CPS : %s \n",double_to_human_str(m_tx_expected_cps,"cps",KBYE_1000).c_str()); fprintf (fd," Expected-BPS : %s \n",double_to_human_str(m_tx_expected_bps,"bps",KBYE_1000).c_str()); fprintf (fd,"\n"); - fprintf (fd," Active-flows : %8llu Clients : %8llu Socket-util : %3.4f %% \n",(uint64_t)m_active_flows,m_total_clients,m_socket_util); + fprintf (fd," Active-flows : %8llu Clients : %8llu Socket-util : %3.4f %% \n", + (unsigned long long)m_active_flows, + (unsigned long long)m_total_clients, + m_socket_util); fprintf (fd," Open-flows : %8llu Servers : %8llu Socket : %8llu Socket/Clients : %.1f \n", - (uint64_t)m_open_flows, - m_total_servers, - m_active_sockets, + (unsigned long long)m_open_flows, + (unsigned long long)m_total_servers, + (unsigned long long)m_active_sockets, (float)m_active_sockets/(float)m_total_clients); if (m_total_alloc_error) { - fprintf (fd," Total_alloc_err : %llu \n",(uint64_t)m_total_alloc_error); + fprintf (fd," Total_alloc_err : %llu \n", (unsigned long long)m_total_alloc_error); } if ( m_total_queue_full ){ - fprintf (fd," Total_queue_full : %llu \n",(uint64_t)m_total_queue_full); + fprintf (fd," Total_queue_full : %llu \n", (unsigned long long)m_total_queue_full); } if (m_total_queue_drop) { - fprintf (fd," Total_queue_drop : %llu \n",(uint64_t)m_total_queue_drop); + fprintf (fd," Total_queue_drop : %llu \n", (unsigned long long)m_total_queue_drop); } //m_template.Dump(fd); @@ -2676,8 +2645,8 @@ void CGlobalStats::Dump(FILE *fd,DumpFormat mode){ CPerPortStats * lp=&m_port[i]; fprintf(fd,"port : %d \n",(int)i); fprintf(fd,"------------\n"); - #define GS_DP_A4(f) fprintf(fd," %-40s : %llu \n",#f,lp->f) - #define GS_DP_A(f) if (lp->f) fprintf(fd," %-40s : %llu \n",#f,lp->f) + #define GS_DP_A4(f) fprintf(fd," %-40s : %llu \n",#f, (unsigned long long)lp->f) + #define GS_DP_A(f) if (lp->f) fprintf(fd," %-40s : %llu \n",#f, (unsigned long long)lp->f) GS_DP_A4(opackets); GS_DP_A4(obytes); GS_DP_A4(ipackets); @@ -2785,8 +2754,19 @@ public: int reset_counters(); +public: + +private: + /* try to stop all datapath cores */ + void try_stop_all_dp(); + /* send message to all dp cores */ + int send_message_all_dp(TrexStatelessCpToDpMsgBase *msg); + + void check_for_dp_message_from_core(int thread_id); + void check_for_dp_messages(); public: + int start_send_master(); int start_master_stateless(); @@ -2925,7 +2905,7 @@ private: CLatencyVmPort m_latency_vm_vports[BP_MAX_PORTS]; /* vm driver */ CLatencyPktInfo m_latency_pkt; - CZMqPublisher m_zmq_publisher; + TrexPublisher m_zmq_publisher; public: TrexStateless *m_trex_stateless; @@ -2966,13 +2946,13 @@ int CGlobalTRex::rcv_send_all(int queue_id){ int CGlobalTRex::test_send(){ int i; - CPhyEthIF * lp=&m_ports[0]; - //set_promisc_all(true); //create_sctp_pkt(); create_udp_pkt(); CRx_check_header rx_check_header; + (void)rx_check_header; + rx_check_header.m_time_stamp=0x1234567; rx_check_header.m_option_type=RX_CHECK_V4_OPT_TYPE; rx_check_header.m_option_len=RX_CHECK_V4_OPT_LEN; @@ -3046,7 +3026,7 @@ int CGlobalTRex::test_send(){ }*/ #endif - fprintf(stdout," drop : %llu \n",m_test_drop); + fprintf(stdout," drop : %llu \n", (unsigned long long)m_test_drop); return (0); } @@ -3177,6 +3157,8 @@ int CGlobalTRex::set_promisc_all(bool enable){ CPhyEthIF * _if=&m_ports[i]; _if->set_promiscuous(enable); } + + return (0); } @@ -3187,8 +3169,52 @@ int CGlobalTRex::reset_counters(){ CPhyEthIF * _if=&m_ports[i]; _if->stats_clear(); } + + return (0); } +/** + * check for a single core + * + * @author imarom (19-Nov-15) + * + * @param thread_id + */ +void +CGlobalTRex::check_for_dp_message_from_core(int thread_id) { + + CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(thread_id); + + /* fast path check */ + if ( likely ( ring->isEmpty() ) ) { + return; + } + + while ( true ) { + CGenNode * node = NULL; + if (ring->Dequeue(node) != 0) { + break; + } + assert(node); + + TrexStatelessDpToCpMsgBase * msg = (TrexStatelessDpToCpMsgBase *)node; + msg->handle(); + delete msg; + } + +} + +/** + * check for messages that arrived from DP to CP + * + */ +void +CGlobalTRex::check_for_dp_messages() { + /* for all the cores - check for a new message */ + for (int i = 0; i < get_cores_tx(); i++) { + check_for_dp_message_from_core(i); + } +} bool CGlobalTRex::is_all_links_are_up(bool dump){ bool all_link_are=true; @@ -3208,6 +3234,40 @@ bool CGlobalTRex::is_all_links_are_up(bool dump){ } +void CGlobalTRex::try_stop_all_dp(){ + + TrexStatelessDpQuit * msg= new TrexStatelessDpQuit(); + send_message_all_dp(msg); + delete msg; + bool all_core_finished = false; + int i; + for (i=0; i<20; i++) { + if ( is_all_cores_finished() ){ + all_core_finished =true; + break; + } + delay(100); + } + if ( all_core_finished ){ + printf(" All cores stopped !! \n"); + }else{ + printf(" ERROR one of the DP core is stucked !\n"); + } +} + + +int CGlobalTRex::send_message_all_dp(TrexStatelessCpToDpMsgBase *msg){ + + int max_threads=(int)CMsgIns::Ins()->getCpDp()->get_num_threads(); + int i; + + for (i=0; i<max_threads; i++) { + CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp((uint8_t)i); + ring->Enqueue((CGenNode*)msg->clone()); + } + return (0); +} + int CGlobalTRex::ixgbe_rx_queue_flush(){ int i; @@ -3259,6 +3319,8 @@ int CGlobalTRex::ixgbe_configure_mg(void){ m_mg.Create(&mg_cfg); m_mg.set_mask(CGlobalInfo::m_options.m_latency_mask); + + return (0); } @@ -3381,7 +3443,6 @@ int CGlobalTRex::ixgbe_start(void){ */ int port_offset=0; - int queue_offset=0; for (i=0; i<get_cores_tx(); i++) { int j=(i+1); int queue_id=((j-1)/get_base_num_cores() ); /* for the first min core queue 0 , then queue 1 etc */ @@ -3408,27 +3469,15 @@ int CGlobalTRex::ixgbe_start(void){ m_cores_vif[i+1]->DumpIfCfg(stdout); } fprintf(stdout," -------------------------------\n"); + + return (0); } bool CGlobalTRex::Create(){ CFlowsYamlInfo pre_yaml_info; - if (get_is_stateless()) { - - TrexStatelessCfg cfg; - - TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, global_platform_cfg_info.m_zmq_rpc_port); - - cfg.m_port_count = CGlobalInfo::m_options.m_expected_portd; - cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg; - cfg.m_rpc_async_cfg = NULL; - cfg.m_rpc_server_verbose = false; - cfg.m_platform_api = new TrexDpdkPlatformApi(); - - m_trex_stateless = new TrexStateless(cfg); - - } else { + if (!get_is_stateless()) { pre_yaml_info.load_from_yaml_file(CGlobalInfo::m_options.cfg_file); } @@ -3450,12 +3499,12 @@ bool CGlobalTRex::Create(){ assert( CMsgIns::Ins()->Create(get_cores_tx()) ); if ( sizeof(CGenNodeNatInfo) != sizeof(CGenNode) ) { - printf("ERROR sizeof(CGenNodeNatInfo) %d != sizeof(CGenNode) %d must be the same size \n",sizeof(CGenNodeNatInfo),sizeof(CGenNode)); + printf("ERROR sizeof(CGenNodeNatInfo) %lu != sizeof(CGenNode) %lu must be the same size \n",sizeof(CGenNodeNatInfo),sizeof(CGenNode)); assert(0); } if ( sizeof(CGenNodeLatencyPktInfo) != sizeof(CGenNode) ) { - printf("ERROR sizeof(CGenNodeLatencyPktInfo) %d != sizeof(CGenNode) %d must be the same size \n",sizeof(CGenNodeLatencyPktInfo),sizeof(CGenNode)); + printf("ERROR sizeof(CGenNodeLatencyPktInfo) %lu != sizeof(CGenNode) %lu must be the same size \n",sizeof(CGenNodeLatencyPktInfo),sizeof(CGenNode)); assert(0); } @@ -3472,6 +3521,24 @@ bool CGlobalTRex::Create(){ CGlobalInfo::init_pools(rx_mbuf); ixgbe_start(); dump_config(stdout); + + /* start stateless */ + if (get_is_stateless()) { + + TrexStatelessCfg cfg; + + TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, global_platform_cfg_info.m_zmq_rpc_port); + + cfg.m_port_count = CGlobalInfo::m_options.m_expected_portd; + cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg; + cfg.m_rpc_async_cfg = NULL; + cfg.m_rpc_server_verbose = false; + cfg.m_platform_api = new TrexDpdkPlatformApi(); + cfg.m_publisher = &m_zmq_publisher; + + m_trex_stateless = new TrexStateless(cfg); + } + return (true); } @@ -3483,9 +3550,6 @@ void CGlobalTRex::Delete(){ int CGlobalTRex::ixgbe_prob_init(void){ - uint8_t nb_ports; - - m_max_ports = rte_eth_dev_count(); if (m_max_ports == 0) rte_exit(EXIT_FAILURE, "No Ethernet ports - bye\n"); @@ -3664,17 +3728,17 @@ void CGlobalTRex::dump_post_test_stats(FILE *fd){ fprintf (fd," summary stats \n"); fprintf (fd," -------------- \n"); - fprintf (fd," Total-pkt-drop : %d pkts \n",(int64_t)(pkt_out-pkt_in)); - fprintf (fd," Total-tx-bytes : %llu bytes \n",pkt_out_bytes); - fprintf (fd," Total-tx-sw-bytes : %llu bytes \n",sw_pkt_out_bytes); - fprintf (fd," Total-rx-bytes : %llu byte \n",pkt_in_bytes); + fprintf (fd," Total-pkt-drop : %llu pkts \n",(unsigned long long)(pkt_out-pkt_in)); + fprintf (fd," Total-tx-bytes : %llu bytes \n", (unsigned long long)pkt_out_bytes); + fprintf (fd," Total-tx-sw-bytes : %llu bytes \n", (unsigned long long)sw_pkt_out_bytes); + fprintf (fd," Total-rx-bytes : %llu byte \n", (unsigned long long)pkt_in_bytes); fprintf (fd," \n"); - fprintf (fd," Total-tx-pkt : %llu pkts \n",pkt_out); - fprintf (fd," Total-rx-pkt : %llu pkts \n",pkt_in); - fprintf (fd," Total-sw-tx-pkt : %llu pkts \n",sw_pkt_out); - fprintf (fd," Total-sw-err : %llu pkts \n",sw_pkt_out_err); + fprintf (fd," Total-tx-pkt : %llu pkts \n", (unsigned long long)pkt_out); + fprintf (fd," Total-rx-pkt : %llu pkts \n", (unsigned long long)pkt_in); + fprintf (fd," Total-sw-tx-pkt : %llu pkts \n", (unsigned long long)sw_pkt_out); + fprintf (fd," Total-sw-err : %llu pkts \n", (unsigned long long)sw_pkt_out_err); if ( !CGlobalInfo::m_options.is_latency_disabled() ){ @@ -3713,7 +3777,8 @@ void CGlobalTRex::get_stats(CGlobalStats & stats){ int i; float total_tx=0.0; float total_rx=0.0; - float total_pps=0.0; + float total_tx_pps=0.0; + float total_rx_pps=0.0; stats.m_total_tx_pkts = 0; stats.m_total_rx_pkts = 0; @@ -3741,6 +3806,9 @@ void CGlobalTRex::get_stats(CGlobalStats & stats){ stp->ierrors = st.ierrors; stp->oerrors = st.oerrors; stp->m_total_tx_bps = _if->get_last_tx_rate()*_1Mb_DOUBLE; + stp->m_total_tx_pps = _if->get_last_tx_pps_rate(); + stp->m_total_rx_bps = _if->get_last_rx_rate()*_1Mb_DOUBLE; + stp->m_total_rx_pps = _if->get_last_rx_pps_rate(); stats.m_total_tx_pkts += st.opackets; stats.m_total_rx_pkts += st.ipackets; @@ -3749,7 +3817,8 @@ void CGlobalTRex::get_stats(CGlobalStats & stats){ total_tx +=_if->get_last_tx_rate(); total_rx +=_if->get_last_rx_rate(); - total_pps +=_if->get_last_pps_rate(); + total_tx_pps +=_if->get_last_tx_pps_rate(); + total_rx_pps +=_if->get_last_rx_pps_rate(); } @@ -3832,7 +3901,8 @@ void CGlobalTRex::get_stats(CGlobalStats & stats){ stats.m_tx_bps = total_tx*pf*_1Mb_DOUBLE; stats.m_rx_bps = total_rx*pf*_1Mb_DOUBLE; - stats.m_tx_pps = total_pps*pf; + stats.m_tx_pps = total_tx_pps*pf; + stats.m_rx_pps = total_rx_pps*pf; stats.m_tx_cps = m_last_total_cps*pf; stats.m_tx_expected_cps = m_expected_cps*pf; @@ -3920,7 +3990,9 @@ int CGlobalTRex::run_in_master(){ std::string json; bool was_stopped=false; - m_trex_stateless->launch_control_plane(); + if ( get_is_stateless() ) { + m_trex_stateless->launch_control_plane(); + } while ( true ) { @@ -4033,6 +4105,9 @@ int CGlobalTRex::run_in_master(){ m_trex_stateless->generate_publish_snapshot(json); m_zmq_publisher.publish_json(json); + /* check from messages from DP */ + check_for_dp_messages(); + delay(500); if ( is_all_cores_finished() ) { @@ -4040,6 +4115,11 @@ int CGlobalTRex::run_in_master(){ } } + if (!is_all_cores_finished()) { + /* probably CLTR-C */ + try_stop_all_dp(); + } + m_mg.stop(); delay(1000); if ( was_stopped ){ @@ -4081,7 +4161,7 @@ int CGlobalTRex::run_in_core(virtual_thread_id_t virt_core_id){ lpt = m_fl.m_threads_info[virt_core_id-1]; if (get_is_stateless()) { - lpt->start_stateless_daemon(); + lpt->start_stateless_daemon(*lp); }else{ lpt->start_generate_stateful(CGlobalInfo::m_options.out_file,*lp); } @@ -4143,6 +4223,7 @@ int CGlobalTRex::stop_master(){ dump_post_test_stats(stdout); m_fl.Delete(); + return (0); } bool CGlobalTRex::is_all_cores_finished(){ @@ -4176,6 +4257,8 @@ int CGlobalTRex::start_master_stateless(){ lpt->m_node_gen.m_socket_id =m_cores_vif[i+1]->get_socket_id(); } m_fl_was_init=true; + + return (0); } @@ -4234,6 +4317,7 @@ int CGlobalTRex::start_send_master(){ } m_fl_was_init=true; + return (0); } @@ -4374,7 +4458,6 @@ int update_global_info_from_platform_file(){ int update_dpdk_args(void){ - uint32_t cores_number; CPlatformSocketInfo * lpsock=&CGlobalInfo::m_socket; CParserOption * lpop= &CGlobalInfo::m_options; @@ -4391,7 +4474,7 @@ int update_dpdk_args(void){ } - sprintf(global_cores_str,"0x%x",lpsock->get_cores_mask()); + sprintf(global_cores_str,"0x%llx",(unsigned long long)lpsock->get_cores_mask()); /* set the DPDK options */ global_dpdk_args_num =7; @@ -4442,6 +4525,7 @@ int update_dpdk_args(void){ printf(" %s \n",global_dpdk_args[i]); } } + return (0); } @@ -4550,11 +4634,10 @@ int main_test(int argc , char * argv[]){ && (CGlobalInfo::m_options.m_latency_prev>0) ){ uint32_t pkts = CGlobalInfo::m_options.m_latency_prev* CGlobalInfo::m_options.m_latency_rate; - printf("Start prev latency check - hack for Keren for %d sec \n",CGlobalInfo::m_options.m_latency_prev); + printf("Start prev latency check- for %d sec \n",CGlobalInfo::m_options.m_latency_prev); g_trex.m_mg.start(pkts); - printf("Delay now you can call command \n"); delay(CGlobalInfo::m_options.m_latency_prev* 1000); - printf("Finish wating \n"); + printf("Finished \n"); g_trex.m_mg.reset(); g_trex.reset_counters(); } @@ -4725,14 +4808,13 @@ int CTRexExtendedDriverBase1G::configure_rx_filter_rules(CPhyEthIF * _if){ /* enable all rules */ _if->pci_reg_write(E1000_WUFC, (mask<<16) | (1<<14) ); + + return (0); } void CTRexExtendedDriverBase1G::get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats){ - int i; - uint64_t t=0; - stats->ipackets += _if->pci_reg_read(E1000_GPRC) ; stats->ibytes += (_if->pci_reg_read(E1000_GORCL) ); @@ -4862,6 +4944,7 @@ int CTRexExtendedDriverBase10G::configure_rx_filter_rules(CPhyEthIF * _if){ rte_exit(EXIT_FAILURE, " ERROR rte_eth_dev_fdir_add_perfect_filter : %d\n",res); } } + return (0); } int CTRexExtendedDriverBase10G::configure_drop_queue(CPhyEthIF * _if){ @@ -4984,6 +5067,8 @@ int CTRexExtendedDriverBase40G::configure_rx_filter_rules(CPhyEthIF * _if){ add_rules(_if,RTE_ETH_FLOW_TYPE_UDPV6,ttl); add_rules(_if,RTE_ETH_FLOW_TYPE_TCPV6,ttl); } + + return (0); } @@ -5172,3 +5257,21 @@ TrexDpdkPlatformApi::get_dp_core_count() const { return CGlobalInfo::m_options.preview.getCores(); } + +void +TrexDpdkPlatformApi::port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list) const { + + cores_id_list.clear(); + + /* iterate over all DP cores */ + for (uint8_t core_id = 0; core_id < g_trex.get_cores_tx(); core_id++) { + + /* iterate over all the directions*/ + for (uint8_t dir = 0 ; dir < CS_NUM; dir++) { + if (g_trex.m_cores_vif[core_id + 1]->get_ports()[dir].m_port->get_port_id() == port_id) { + cores_id_list.push_back(std::make_pair(core_id, dir)); + } + } + } +} + diff --git a/src/msg_manager.cpp b/src/msg_manager.cpp index 9f41d08c..5fe44771 100755 --- a/src/msg_manager.cpp +++ b/src/msg_manager.cpp @@ -51,15 +51,20 @@ bool CMessagingManager::Create(uint8_t num_dp_threads,std::string a_name){ return (true); } void CMessagingManager::Delete(){ - if (m_dp_to_cp) { - m_dp_to_cp->Delete(); - delete []m_dp_to_cp; - } - if (m_cp_to_dp) { - m_cp_to_dp->Delete(); - delete []m_cp_to_dp; + + assert(m_cp_to_dp); + assert(m_dp_to_cp); + int i; + for (i=0; i<m_num_dp_threads; i++) { + CNodeRing * lp; + lp=getRingCpToDp(i); + lp->Delete(); + lp=getRingDpToCp(i); + lp->Delete(); } + delete []m_dp_to_cp; + delete []m_cp_to_dp; } CNodeRing * CMessagingManager::getRingCpToDp(uint8_t thread_id){ @@ -76,6 +81,7 @@ CNodeRing * CMessagingManager::getRingDpToCp(uint8_t thread_id){ void CMsgIns::Free(){ if (m_ins) { + m_ins->Delete(); delete m_ins; } } @@ -98,6 +104,12 @@ bool CMsgIns::Create(uint8_t num_threads){ } +void CMsgIns::Delete(){ + m_cp_dp.Delete(); + m_rx_dp.Delete(); +} + + CMsgIns * CMsgIns::m_ins=0; diff --git a/src/msg_manager.h b/src/msg_manager.h index 8958f826..0390ce10 100755 --- a/src/msg_manager.h +++ b/src/msg_manager.h @@ -98,6 +98,7 @@ public: static CMsgIns * Ins(); static void Free(); bool Create(uint8_t num_threads); + void Delete(); public: CMessagingManager * getRxDp(){ return (&m_rx_dp); diff --git a/src/nat_check.cpp b/src/nat_check.cpp index 676c1292..170d2de6 100755 --- a/src/nat_check.cpp +++ b/src/nat_check.cpp @@ -171,8 +171,8 @@ void CNatRxManager::handle_packet_ipv4(CNatOption * option, } -#define MYDP(f) if (f) fprintf(fd," %-40s: %llu \n",#f,f) -#define MYDP_A(f) fprintf(fd," %-40s: %llu \n",#f,f) +#define MYDP(f) if (f) fprintf(fd," %-40s: %llu \n",#f,(unsigned long long)f) +#define MYDP_A(f) fprintf(fd," %-40s: %llu \n",#f, (unsigned long long)f) diff --git a/src/pal/linux/mbuf.cpp b/src/pal/linux/mbuf.cpp index 7eca8fd5..26a54fe9 100755 --- a/src/pal/linux/mbuf.cpp +++ b/src/pal/linux/mbuf.cpp @@ -78,6 +78,13 @@ rte_mempool_t * utl_rte_mempool_create(const char *name, return p; } +void utl_rte_mempool_delete(rte_mempool_t * & pool){ + if (pool) { + delete pool; + pool=0; + } +} + uint16_t rte_mbuf_refcnt_update(rte_mbuf_t *m, int16_t value) { diff --git a/src/pal/linux/mbuf.h b/src/pal/linux/mbuf.h index 35a442bf..4132f842 100755 --- a/src/pal/linux/mbuf.h +++ b/src/pal/linux/mbuf.h @@ -65,6 +65,8 @@ typedef struct rte_mempool rte_mempool_t; #define RTE_PKTMBUF_HEADROOM 0 +void utl_rte_mempool_delete(rte_mempool_t * &pool); + rte_mempool_t * utl_rte_mempool_create(const char *name, unsigned n, unsigned elt_size, diff --git a/src/pal/linux_dpdk/mbuf.h b/src/pal/linux_dpdk/mbuf.h index cde01077..339c0909 100755 --- a/src/pal/linux_dpdk/mbuf.h +++ b/src/pal/linux_dpdk/mbuf.h @@ -30,6 +30,10 @@ typedef struct rte_mbuf rte_mbuf_t; typedef struct rte_mempool rte_mempool_t; +inline void utl_rte_mempool_delete(rte_mempool_t * & pool){ +} + + rte_mempool_t * utl_rte_mempool_create(const char *name, unsigned n, unsigned elt_size, diff --git a/src/platform_cfg.cpp b/src/platform_cfg.cpp index 92ffefbd..547cc3ad 100755 --- a/src/platform_cfg.cpp +++ b/src/platform_cfg.cpp @@ -127,7 +127,7 @@ void CPlatformMemoryYamlInfo::Dump(FILE *fd){ int i=0; for (i=0; i<MBUF_SIZE; i++) { - fprintf(fd," %-40s : %lu \n",names[i].c_str(),m_mbuf[i]); + fprintf(fd," %-40s : %lu \n",names[i].c_str(), (ulong)m_mbuf[i]); } } @@ -379,7 +379,7 @@ void CPlatformYamlInfo::Dump(FILE *fd){ }else{ fprintf(fd," port limit : not configured \n"); } - fprintf(fd," port_bandwidth_gb : %lu \n",m_port_bandwidth_gb); + fprintf(fd," port_bandwidth_gb : %lu \n", (ulong)m_port_bandwidth_gb); if ( m_if_mask_exist && m_if_mask.size() ) { fprintf(fd," if_mask : "); @@ -387,7 +387,7 @@ void CPlatformYamlInfo::Dump(FILE *fd){ for (i=0; i<(int)m_if_mask.size(); i++) { fprintf(fd," %s,",m_if_mask[i].c_str()); } - fprintf(fd,"\n",m_if_mask[i].c_str()); + fprintf(fd,"\n"); }else{ fprintf(fd," if_mask : None \n"); @@ -414,7 +414,9 @@ void CPlatformYamlInfo::Dump(FILE *fd){ } if ( m_telnet_exist ){ fprintf(fd," telnet_port : %d \n",m_telnet_port); + } + fprintf(fd," m_zmq_rpc_port : %d \n",m_zmq_rpc_port); if ( m_mac_info_exist ){ int i; diff --git a/src/platform_cfg.h b/src/platform_cfg.h index b4b03b10..4fc3c3dd 100755 --- a/src/platform_cfg.h +++ b/src/platform_cfg.h @@ -180,11 +180,11 @@ public: m_enable_zmq_pub_exist=false; m_enable_zmq_pub=true; m_zmq_pub_port=4500; + m_zmq_rpc_port = 4501; m_telnet_exist=false; m_telnet_port=4502 ; - m_zmq_rpc_port = 5050; m_mac_info_exist=false; m_port_bandwidth_gb = 10; diff --git a/src/publisher/trex_publisher.cpp b/src/publisher/trex_publisher.cpp new file mode 100644 index 00000000..35653069 --- /dev/null +++ b/src/publisher/trex_publisher.cpp @@ -0,0 +1,107 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include "trex_publisher.h" +#include <zmq.h> +#include <assert.h> +#include <sstream> +#include <iostream> + +/** + * create the publisher + * + */ +bool +TrexPublisher::Create(uint16_t port, bool disable){ + + if (disable) { + return (true); + } + + m_context = zmq_ctx_new(); + if ( m_context == 0 ) { + show_zmq_last_error("can't connect to ZMQ library"); + } + + m_publisher = zmq_socket (m_context, ZMQ_PUB); + if ( m_context == 0 ) { + show_zmq_last_error("can't create ZMQ socket"); + } + + std::stringstream ss; + ss << "tcp://*:" << port; + + int rc = zmq_bind (m_publisher, ss.str().c_str()); + if (rc != 0 ) { + show_zmq_last_error("can't bind to ZMQ socket at " + ss.str()); + } + + std::cout << "zmq publisher at: " << ss.str() << "\n"; + return (true); +} + + +void +TrexPublisher::Delete(){ + if (m_publisher) { + zmq_close (m_publisher); + m_publisher = NULL; + } + if (m_context) { + zmq_ctx_destroy (m_context); + m_context = NULL; + } +} + + +void +TrexPublisher::publish_json(const std::string &s){ + if (m_publisher) { + int size = zmq_send (m_publisher, s.c_str(), s.length(), 0); + assert(size == s.length()); + } +} + +void +TrexPublisher::publish_event(event_type_e type, const Json::Value &data) { + Json::FastWriter writer; + Json::Value value; + std::string s; + + value["name"] = "trex-event"; + value["type"] = type; + value["data"] = data; + + s = writer.write(value); + publish_json(s); +} + +/** + * error handling + * + */ +void +TrexPublisher::show_zmq_last_error(const std::string &err){ + std::cout << " ERROR " << err << "\n"; + std::cout << " ZMQ: " << zmq_strerror (zmq_errno ()); + exit(-1); +} + diff --git a/src/publisher/trex_publisher.h b/src/publisher/trex_publisher.h new file mode 100644 index 00000000..82603fda --- /dev/null +++ b/src/publisher/trex_publisher.h @@ -0,0 +1,54 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_PUBLISHER_H__ +#define __TREX_PUBLISHER_H__ + +#include <stdint.h> +#include <string> +#include <json/json.h> + +class TrexPublisher { + +public: + + TrexPublisher() { + m_context = NULL; + m_publisher = NULL; + } + + bool Create(uint16_t port, bool disable); + void Delete(); + void publish_json(const std::string &s); + + enum event_type_e { + EVENT_PORT_STOPPED = 0 + }; + + void publish_event(event_type_e type, const Json::Value &data); + +private: + void show_zmq_last_error(const std::string &err); +private: + void * m_context; + void * m_publisher; +}; + +#endif /* __TREX_PUBLISHER_H__ */ diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index b40e996f..1a7132ff 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -29,7 +29,7 @@ limitations under the License. #include <iostream> #include <unistd.h> -#ifndef TREX_RPC_MOCK_SERVER +#ifdef RTE_DPDK #include <../linux_dpdk/version.h> #endif @@ -73,7 +73,7 @@ TrexRpcCmdGetVersion::_run(const Json::Value ¶ms, Json::Value &result) { Json::Value §ion = result["result"]; - #ifndef TREX_RPC_MOCK_SERVER + #ifdef RTE_DPDK section["version"] = VERSION_BUILD_NUM; section["build_date"] = get_build_date(); @@ -222,12 +222,12 @@ TrexRpcCmdAcquire::_run(const Json::Value ¶ms, Json::Value &result) { /* if not free and not you and not force - fail */ TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); - if ( (!port->is_free_to_aquire()) && (port->get_owner() != new_owner) && (!force)) { - generate_execute_err(result, "port is already taken by '" + port->get_owner() + "'"); + try { + port->acquire(new_owner, force); + } catch (const TrexRpcException &ex) { + generate_execute_err(result, ex.what()); } - port->set_owner(new_owner); - result["result"] = port->get_owner_handler(); return (TREX_RPC_CMD_OK); @@ -244,12 +244,12 @@ TrexRpcCmdRelease::_run(const Json::Value ¶ms, Json::Value &result) { TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); - if (port->get_state() == TrexStatelessPort::PORT_STATE_TRANSMITTING) { - generate_execute_err(result, "cannot release a port during transmission"); + try { + port->release(); + } catch (const TrexRpcException &ex) { + generate_execute_err(result, ex.what()); } - port->clear_owner(); - result["result"] = "ACK"; return (TREX_RPC_CMD_OK); @@ -266,13 +266,13 @@ TrexRpcCmdGetPortStats::_run(const Json::Value ¶ms, Json::Value &result) { TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); - if (port->get_state() == TrexStatelessPort::PORT_STATE_DOWN) { - generate_execute_err(result, "cannot get stats - port is down"); - } - result["result"]["status"] = port->get_state_as_string(); - port->encode_stats(result["result"]); + try { + port->encode_stats(result["result"]); + } catch (const TrexRpcException &ex) { + generate_execute_err(result, ex.what()); + } return (TREX_RPC_CMD_OK); } @@ -303,7 +303,7 @@ TrexRpcCmdSyncUser::_run(const Json::Value ¶ms, Json::Value &result) { owned_port["streams"] = Json::arrayValue; std::vector <TrexStream *> streams; - port->get_stream_table()->get_object_list(streams); + port->get_object_list(streams); for (auto stream : streams) { owned_port["streams"].append(stream->get_stream_json()); diff --git a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp index 9854cad7..cdd13ed6 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_stream.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_stream.cpp @@ -115,7 +115,12 @@ TrexRpcCmdAddStream::_run(const Json::Value ¶ms, Json::Value &result) { validate_stream(stream, result); TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(stream->m_port_id); - port->get_stream_table()->add_stream(stream); + + try { + port->add_stream(stream); + } catch (const TrexRpcException &ex) { + generate_execute_err(result, ex.what()); + } result["result"] = "ACK"; @@ -127,7 +132,7 @@ TrexRpcCmdAddStream::_run(const Json::Value ¶ms, Json::Value &result) { TrexStream * TrexRpcCmdAddStream::allocate_new_stream(const Json::Value §ion, uint8_t port_id, uint32_t stream_id, Json::Value &result) { - TrexStream *stream; + TrexStream *stream = NULL; const Json::Value &mode = parse_object(section, "mode", result); std::string type = parse_string(mode, "type", result); @@ -135,14 +140,22 @@ TrexRpcCmdAddStream::allocate_new_stream(const Json::Value §ion, uint8_t por if (type == "continuous") { double pps = parse_double(mode, "pps", result); - stream = new TrexStreamContinuous(port_id, stream_id, pps); + stream = new TrexStream( TrexStream::stCONTINUOUS, port_id, stream_id); + stream->set_pps(pps); + + if (stream->m_next_stream_id != -1) { + generate_parse_err(result, "continious stream cannot provide next stream id - only -1 is valid"); + } } else if (type == "single_burst") { uint32_t total_pkts = parse_int(mode, "total_pkts", result); double pps = parse_double(mode, "pps", result); - stream = new TrexStreamBurst(port_id, stream_id, total_pkts, pps); + stream = new TrexStream(TrexStream::stSINGLE_BURST,port_id, stream_id); + stream->set_pps(pps); + stream->set_single_burst(total_pkts); + } else if (type == "multi_burst") { @@ -151,8 +164,10 @@ TrexRpcCmdAddStream::allocate_new_stream(const Json::Value §ion, uint8_t por uint32_t num_bursts = parse_int(mode, "number_of_bursts", result); uint32_t pkts_per_burst = parse_int(mode, "pkts_per_burst", result); - stream = new TrexStreamMultiBurst(port_id, stream_id, pkts_per_burst, pps, num_bursts, ibg_usec); - + stream = new TrexStream(TrexStream::stMULTI_BURST,port_id, stream_id ); + stream->set_pps(pps); + stream->set_multi_burst(pkts_per_burst,num_bursts,ibg_usec); + } else { generate_parse_err(result, "bad stream type provided: '" + type + "'"); @@ -200,9 +215,9 @@ TrexRpcCmdAddStream::parse_vm_instr_flow_var(const Json::Value &inst, TrexStream std::string min_value_str = parse_string(inst, "min_value", result); std::string max_value_str = parse_string(inst, "max_value", result); - uint64_t init_value; - uint64_t min_value; - uint64_t max_value; + uint64_t init_value = 0; + uint64_t min_value = 0; + uint64_t max_value = 0; try { init_value = str2num(init_value_str); @@ -293,7 +308,7 @@ TrexRpcCmdAddStream::validate_stream(const TrexStream *stream, Json::Value &resu TrexStatelessPort * port = get_stateless_obj()->get_port_by_id(stream->m_port_id); /* does such a stream exists ? */ - if (port->get_stream_table()->get_stream_by_id(stream->m_stream_id)) { + if (port->get_stream_by_id(stream->m_stream_id)) { std::stringstream ss; ss << "stream " << stream->m_stream_id << " already exists"; delete stream; @@ -319,7 +334,7 @@ TrexRpcCmdRemoveStream::_run(const Json::Value ¶ms, Json::Value &result) { } TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); - TrexStream *stream = port->get_stream_table()->get_stream_by_id(stream_id); + TrexStream *stream = port->get_stream_by_id(stream_id); if (!stream) { std::stringstream ss; @@ -327,7 +342,12 @@ TrexRpcCmdRemoveStream::_run(const Json::Value ¶ms, Json::Value &result) { generate_execute_err(result, ss.str()); } - port->get_stream_table()->remove_stream(stream); + try { + port->remove_stream(stream); + } catch (const TrexRpcException &ex) { + generate_execute_err(result, ex.what()); + } + delete stream; result["result"] = "ACK"; @@ -350,12 +370,18 @@ TrexRpcCmdRemoveAllStreams::_run(const Json::Value ¶ms, Json::Value &result) generate_execute_err(result, ss.str()); } - TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); - port->get_stream_table()->remove_and_delete_all_streams(); + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); + + try { + port->remove_and_delete_all_streams(); + } catch (const TrexRpcException &ex) { + generate_execute_err(result, ex.what()); + } - result["result"] = "ACK"; - return (TREX_RPC_CMD_OK); + result["result"] = "ACK"; + + return (TREX_RPC_CMD_OK); } /*************************** @@ -377,7 +403,7 @@ TrexRpcCmdGetStreamList::_run(const Json::Value ¶ms, Json::Value &result) { TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); - port->get_stream_table()->get_id_list(stream_list); + port->get_id_list(stream_list); Json::Value json_list = Json::arrayValue; @@ -397,8 +423,8 @@ TrexRpcCmdGetStreamList::_run(const Json::Value ¶ms, Json::Value &result) { **************************/ trex_rpc_cmd_rc_e TrexRpcCmdGetStream::_run(const Json::Value ¶ms, Json::Value &result) { - uint8_t port_id = parse_byte(params, "port_id", result); - + uint8_t port_id = parse_byte(params, "port_id", result); + bool get_pkt = parse_bool(params, "get_pkt", result); uint32_t stream_id = parse_int(params, "stream_id", result); if (port_id >= get_stateless_obj()->get_port_count()) { @@ -409,7 +435,7 @@ TrexRpcCmdGetStream::_run(const Json::Value ¶ms, Json::Value &result) { TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); - TrexStream *stream = port->get_stream_table()->get_stream_by_id(stream_id); + TrexStream *stream = port->get_stream_by_id(stream_id); if (!stream) { std::stringstream ss; @@ -418,7 +444,12 @@ TrexRpcCmdGetStream::_run(const Json::Value ¶ms, Json::Value &result) { } /* return the stored stream json (instead of decoding it all over again) */ - result["result"]["stream"] = stream->get_stream_json(); + Json::Value j = stream->get_stream_json(); + if (!get_pkt) { + j.removeMember("packet"); + } + + result["result"]["stream"] = j; return (TREX_RPC_CMD_OK); @@ -431,8 +462,9 @@ TrexRpcCmdGetStream::_run(const Json::Value ¶ms, Json::Value &result) { trex_rpc_cmd_rc_e TrexRpcCmdStartTraffic::_run(const Json::Value ¶ms, Json::Value &result) { - uint8_t port_id = parse_byte(params, "port_id", result); - double mul = parse_double(params, "mul", result); + uint8_t port_id = parse_byte(params, "port_id", result); + double mul = parse_double(params, "mul", result); + double duration = parse_double(params, "duration", result); if (port_id >= get_stateless_obj()->get_port_count()) { std::stringstream ss; @@ -442,37 +474,121 @@ TrexRpcCmdStartTraffic::_run(const Json::Value ¶ms, Json::Value &result) { TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); - TrexStatelessPort::rc_e rc = port->start_traffic(mul); + try { + port->start_traffic(mul, duration); + } catch (const TrexRpcException &ex) { + generate_execute_err(result, ex.what()); + } - if (rc == TrexStatelessPort::RC_OK) { - result["result"] = "ACK"; - } else { + result["result"] = "ACK"; + + return (TREX_RPC_CMD_OK); +} + +/*************************** + * stop traffic on port + * + **************************/ +trex_rpc_cmd_rc_e +TrexRpcCmdStopTraffic::_run(const Json::Value ¶ms, Json::Value &result) { + uint8_t port_id = parse_byte(params, "port_id", result); + + if (port_id >= get_stateless_obj()->get_port_count()) { std::stringstream ss; - switch (rc) { - case TrexStatelessPort::RC_ERR_BAD_STATE_FOR_OP: - ss << "bad state for operations: port is either transmitting traffic or down"; - break; - case TrexStatelessPort::RC_ERR_NO_STREAMS: - ss << "no active streams on that port"; - break; - default: - ss << "failed to start traffic"; - break; + ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1; + generate_execute_err(result, ss.str()); + } + + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); + + try { + port->stop_traffic(); + } catch (const TrexRpcException &ex) { + generate_execute_err(result, ex.what()); + } + + result["result"] = "ACK"; + + return (TREX_RPC_CMD_OK); +} + +/*************************** + * get all streams + * + **************************/ +trex_rpc_cmd_rc_e +TrexRpcCmdGetAllStreams::_run(const Json::Value ¶ms, Json::Value &result) { + uint8_t port_id = parse_byte(params, "port_id", result); + bool get_pkt = parse_bool(params, "get_pkt", result); + + if (port_id >= get_stateless_obj()->get_port_count()) { + std::stringstream ss; + ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1; + generate_execute_err(result, ss.str()); + } + + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); + + std::vector <TrexStream *> streams; + port->get_object_list(streams); + + Json::Value streams_json = Json::objectValue; + for (auto stream : streams) { + + Json::Value j = stream->get_stream_json(); + + /* should we include the packet as well ? */ + if (!get_pkt) { + j.removeMember("packet"); } + std::stringstream ss; + ss << stream->m_stream_id; + + streams_json[ss.str()] = j; + } + + result["result"]["streams"] = streams_json; + + return (TREX_RPC_CMD_OK); +} + +/*************************** + * pause traffic + * + **************************/ +trex_rpc_cmd_rc_e +TrexRpcCmdPauseTraffic::_run(const Json::Value ¶ms, Json::Value &result) { + + uint8_t port_id = parse_byte(params, "port_id", result); + + if (port_id >= get_stateless_obj()->get_port_count()) { + std::stringstream ss; + ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1; generate_execute_err(result, ss.str()); } - return (TREX_RPC_CMD_OK); + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); + + try { + port->pause_traffic(); + } catch (const TrexRpcException &ex) { + generate_execute_err(result, ex.what()); + } + + result["result"] = "ACK"; + + return (TREX_RPC_CMD_OK); } /*************************** - * start traffic on port + * resume traffic * **************************/ trex_rpc_cmd_rc_e -TrexRpcCmdStopTraffic::_run(const Json::Value ¶ms, Json::Value &result) { - uint8_t port_id = parse_byte(params, "port_id", result); +TrexRpcCmdResumeTraffic::_run(const Json::Value ¶ms, Json::Value &result) { + + uint8_t port_id = parse_byte(params, "port_id", result); if (port_id >= get_stateless_obj()->get_port_count()) { std::stringstream ss; @@ -482,7 +598,41 @@ TrexRpcCmdStopTraffic::_run(const Json::Value ¶ms, Json::Value &result) { TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); - port->stop_traffic(); + try { + port->resume_traffic(); + } catch (const TrexRpcException &ex) { + generate_execute_err(result, ex.what()); + } + + result["result"] = "ACK"; + + return (TREX_RPC_CMD_OK); +} + +/*************************** + * update traffic + * + **************************/ +trex_rpc_cmd_rc_e +TrexRpcCmdUpdateTraffic::_run(const Json::Value ¶ms, Json::Value &result) { + + uint8_t port_id = parse_byte(params, "port_id", result); + double mul = parse_double(params, "mul", result); + + if (port_id >= get_stateless_obj()->get_port_count()) { + std::stringstream ss; + ss << "invalid port id - should be between 0 and " << (int)get_stateless_obj()->get_port_count() - 1; + generate_execute_err(result, ss.str()); + } + + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); + + try { + port->update_traffic(mul); + } catch (const TrexRpcException &ex) { + generate_execute_err(result, ex.what()); + } + result["result"] = "ACK"; return (TREX_RPC_CMD_OK); diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h index 91c29548..b4f37e3b 100644 --- a/src/rpc-server/commands/trex_rpc_cmds.h +++ b/src/rpc-server/commands/trex_rpc_cmds.h @@ -99,11 +99,18 @@ void parse_vm_instr_write_flow_var(const Json::Value &inst, TrexStream *stream, TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStreamList, "get_stream_list", 1, true); +TREX_RPC_CMD_DEFINE(TrexRpcCmdGetAllStreams, "get_all_streams", 2, true); -TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStream, "get_stream", 2, true); +TREX_RPC_CMD_DEFINE(TrexRpcCmdGetStream, "get_stream", 3, true); -TREX_RPC_CMD_DEFINE(TrexRpcCmdStartTraffic, "start_traffic", 2, true); -TREX_RPC_CMD_DEFINE(TrexRpcCmdStopTraffic, "stop_traffic", 1, true); + + +TREX_RPC_CMD_DEFINE(TrexRpcCmdStartTraffic, "start_traffic", 3, true); +TREX_RPC_CMD_DEFINE(TrexRpcCmdStopTraffic, "stop_traffic", 1, true); +TREX_RPC_CMD_DEFINE(TrexRpcCmdPauseTraffic, "pause_traffic", 1, true); +TREX_RPC_CMD_DEFINE(TrexRpcCmdResumeTraffic, "resume_traffic", 1, true); + +TREX_RPC_CMD_DEFINE(TrexRpcCmdUpdateTraffic, "update_traffic", 2, true); TREX_RPC_CMD_DEFINE(TrexRpcCmdSyncUser, "sync_user", 2, false); diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp index 46281aff..a65bbccf 100644 --- a/src/rpc-server/trex_rpc_cmds_table.cpp +++ b/src/rpc-server/trex_rpc_cmds_table.cpp @@ -50,8 +50,13 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() { register_command(new TrexRpcCmdRemoveAllStreams()); register_command(new TrexRpcCmdGetStreamList()); register_command(new TrexRpcCmdGetStream()); + register_command(new TrexRpcCmdGetAllStreams()); + register_command(new TrexRpcCmdStartTraffic()); register_command(new TrexRpcCmdStopTraffic()); + register_command(new TrexRpcCmdPauseTraffic()); + register_command(new TrexRpcCmdResumeTraffic()); + register_command(new TrexRpcCmdUpdateTraffic()); } TrexRpcCommandsTable::~TrexRpcCommandsTable() { diff --git a/src/rpc-server/trex_rpc_req_resp_server.h b/src/rpc-server/trex_rpc_req_resp_server.h index 1f638adf..bc38c0ef 100644 --- a/src/rpc-server/trex_rpc_req_resp_server.h +++ b/src/rpc-server/trex_rpc_req_resp_server.h @@ -43,7 +43,7 @@ private: void handle_request(const std::string &request); void handle_server_error(const std::string &specific_err); - static const int RPC_MAX_MSG_SIZE = (20 * 1024); + static const int RPC_MAX_MSG_SIZE = (200 * 1024); void *m_context; void *m_socket; uint8_t m_msg_buffer[RPC_MAX_MSG_SIZE]; diff --git a/src/rx_check.cpp b/src/rx_check.cpp index 3a67ca23..59b42e1a 100755 --- a/src/rx_check.cpp +++ b/src/rx_check.cpp @@ -45,8 +45,8 @@ void CRxCheckFlowTableStats::Clear(){ } -#define MYDP(f) if (f) fprintf(fd," %-40s: %llu \n",#f,f) -#define MYDP_A(f) fprintf(fd," %-40s: %llu \n",#f,f) +#define MYDP(f) if (f) fprintf(fd," %-40s: %llu \n",#f,(unsigned long long)f) +#define MYDP_A(f) fprintf(fd," %-40s: %llu \n",#f,(unsigned long long)f) #define MYDP_J(f) json+=add_json(#f,f); #define MYDP_J_LAST(f) json+=add_json(#f,f,true); @@ -146,7 +146,7 @@ void CRxCheckFlowTableMap::dump_all(FILE *fd){ rx_check_flow_map_iter_t it; for (it= m_map.begin(); it != m_map.end(); ++it) { CRxCheckFlow *lp = it->second; - printf ("flow_id: %d \n",lp->m_flow_id); + printf ("flow_id: %llu \n",(unsigned long long)lp->m_flow_id); } } @@ -208,7 +208,7 @@ std::string CPerTxthreadTemplateInfo::dump_as_json(std::string name){ int i; for (i=0;i<MAX_TEMPLATES_STATS;i++){ char buff[200]; - sprintf(buff,"%llu",m_template_info[i]); + sprintf(buff,"%llu", (unsigned long long)m_template_info[i]); json+=std::string(buff); if ( i < MAX_TEMPLATES_STATS-1) { json+=std::string(","); @@ -231,7 +231,7 @@ void CPerTxthreadTemplateInfo::Dump(FILE *fd){ int i; for (i=0; i<MAX_TEMPLATES_STATS; i++) { if (m_template_info[i]) { - fprintf (fd," template id: %llu %llu \n",i,m_template_info[i]); + fprintf (fd," template id: %d %llu \n",i, (unsigned long long)m_template_info[i]); } } } @@ -484,7 +484,7 @@ void RxCheckManager::DumpTemplate(FILE *fd,bool verbose){ if (cnt==0){ fprintf(fd,"\n"); } - fprintf(fd,"[id:%2d val:%8d,rx:%8d], ",i,lp->get_error_counter(),lp->get_rx_counter()); + fprintf(fd,"[id:%2d val:%8llu,rx:%8llu], ",i, (unsigned long long)lp->get_error_counter(), (unsigned long long)lp->get_rx_counter()); cnt++; if (cnt>5) { cnt=0; @@ -500,7 +500,11 @@ void RxCheckManager::DumpTemplateFull(FILE *fd){ int i; for (i=0; i<MAX_TEMPLATES_STATS;i++ ) { CPerTemplateInfo * lp=get_template(i); - fprintf(fd," template_id_%2d , errors:%8d, jitter: %lu rx : %lu \n",i,lp->get_error_counter(),lp->get_jitter_usec(),lp->get_rx_counter() ); + fprintf(fd," template_id_%2d , errors:%8llu, jitter: %llu rx : %llu \n", + i, + (unsigned long long)lp->get_error_counter(), + (unsigned long long)lp->get_jitter_usec(), + (unsigned long long)lp->get_rx_counter() ); } } @@ -514,7 +518,11 @@ void RxCheckManager::DumpShort(FILE *fd){ DumpTemplate(fd,false); fprintf(fd,"\n"); fprintf(fd,"---\n"); - fprintf(fd," active flows: %8d, fif: %8d, drop: %8d, errors: %8d \n",m_stats.m_active,m_stats.m_fif,m_stats.m_err_drop,m_stats.get_total_err()); + fprintf(fd," active flows: %8llu, fif: %8llu, drop: %8llu, errors: %8llu \n", + (unsigned long long)m_stats.m_active, + (unsigned long long)m_stats.m_fif, + (unsigned long long)m_stats.m_err_drop, + (unsigned long long)m_stats.get_total_err()); fprintf(fd,"------------------------------------------------------------------------------------------------------------\n"); } diff --git a/src/rx_check.h b/src/rx_check.h index 6f9763a2..07f5684c 100755 --- a/src/rx_check.h +++ b/src/rx_check.h @@ -30,9 +30,10 @@ limitations under the License. typedef enum { - CLIENT_SIDE=0, - SERVER_SIDE=1, - CS_NUM=2 + CLIENT_SIDE = 0, + SERVER_SIDE = 1, + CS_NUM = 2, + CS_INVALID = 255 } pkt_dir_enum_t; typedef uint8_t pkt_dir_t ; diff --git a/src/rx_check_header.cpp b/src/rx_check_header.cpp index 8ee580db..5934ee15 100755 --- a/src/rx_check_header.cpp +++ b/src/rx_check_header.cpp @@ -42,11 +42,11 @@ void CRx_check_header::dump(FILE *fd){ void CNatOption::dump(FILE *fd){ - fprintf(fd," op : %lx \n",get_option_type()); - fprintf(fd," ol : %lx \n",get_option_len()); - fprintf(fd," thread_id : %lx \n",get_thread_id()); - fprintf(fd," magic : %lx \n",get_magic()); - fprintf(fd," fid : %lx \n",get_fid()); + fprintf(fd," op : %x \n",get_option_type()); + fprintf(fd," ol : %x \n",get_option_len()); + fprintf(fd," thread_id : %x \n",get_thread_id()); + fprintf(fd," magic : %x \n",get_magic()); + fprintf(fd," fid : %x \n",get_fid()); utl_DumpBuffer(stdout,(void *)&u.m_data[0],8,0); } diff --git a/src/stateless/cp/trex_dp_port_events.cpp b/src/stateless/cp/trex_dp_port_events.cpp new file mode 100644 index 00000000..ba327e59 --- /dev/null +++ b/src/stateless/cp/trex_dp_port_events.cpp @@ -0,0 +1,220 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include <trex_dp_port_events.h> +#include <sstream> +#include <os_time.h> +#include <trex_stateless.h> + +/** + * port events + */ +void +TrexDpPortEvents::create(TrexStatelessPort *port) { + m_port = port; + + for (int i = 0; i < TrexDpPortEvent::EVENT_MAX; i++) { + m_events[i].create((TrexDpPortEvent::event_e) i, port); + } + + m_event_id_counter = EVENT_ID_INVALID; +} + +/** + * generate a new event ID + * + */ +int +TrexDpPortEvents::generate_event_id() { + return (++m_event_id_counter); +} + +/** + * mark the next allowed event + * all other events will be disabled + * + */ +void +TrexDpPortEvents::wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms) { + + /* first disable all events */ + for (TrexDpPortEvent & e : m_events) { + e.disable(); + } + + /* mark this event as allowed */ + m_events[ev].wait_for_event(event_id, timeout_ms); +} + +void +TrexDpPortEvents::disable(TrexDpPortEvent::event_e ev) { + m_events[ev].disable(); +} + +/** + * handle an event + * + */ +void +TrexDpPortEvents::handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id) { + m_events[ev].handle_event(thread_id, event_id); +} + +/*********** + * single event object + * + */ + +void +TrexDpPortEvent::create(event_e type, TrexStatelessPort *port) { + m_event_type = type; + m_port = port; + + /* add the core ids to the hash */ + m_signal.clear(); + for (int core_id : m_port->get_core_id_list()) { + m_signal[core_id] = false; + } + + /* event is disabled */ + disable(); +} + + +/** + * wait the event using event id and timeout + * + */ +void +TrexDpPortEvent::wait_for_event(int event_id, int timeout_ms) { + + /* set a new event id */ + m_event_id = event_id; + + /* do we have a timeout ? */ + if (timeout_ms > 0) { + m_expire_limit_ms = os_get_time_msec() + timeout_ms; + } else { + m_expire_limit_ms = -1; + } + + /* prepare the signal array */ + m_pending_cnt = 0; + for (auto & core_pair : m_signal) { + core_pair.second = false; + m_pending_cnt++; + } +} + +void +TrexDpPortEvent::disable() { + m_event_id = TrexDpPortEvents::EVENT_ID_INVALID; +} + +/** + * get the event status + * + */ + +TrexDpPortEvent::event_status_e +TrexDpPortEvent::status() { + + /* is it even active ? */ + if (m_event_id == TrexDpPortEvents::EVENT_ID_INVALID) { + return (EVENT_DISABLE); + } + + /* did it occured ? */ + if (m_pending_cnt == 0) { + return (EVENT_OCCURED); + } + + /* so we are enabled and the event did not occur - maybe we timed out ? */ + if ( (m_expire_limit_ms > 0) && (os_get_time_msec() > m_expire_limit_ms) ) { + return (EVENT_TIMED_OUT); + } + + /* so we are still waiting... */ + return (EVENT_PENDING); + +} + +void +TrexDpPortEvent::err(int thread_id, int event_id, const std::string &err_msg) { + std::stringstream err; + err << "DP event '" << event_name(m_event_type) << "' on thread id '" << thread_id << "' with key '" << event_id <<"' - "; +} + +/** + * event occured + * + */ +void +TrexDpPortEvent::handle_event(int thread_id, int event_id) { + + /* if the event is disabled - we don't care */ + if (!is_active()) { + return; + } + + /* check the event id is matching the required event - if not maybe its an old signal */ + if (event_id != m_event_id) { + return; + } + + /* mark sure no double signal */ + if (m_signal.at(thread_id)) { + err(thread_id, event_id, "double signal"); + + } else { + /* mark */ + m_signal.at(thread_id) = true; + m_pending_cnt--; + } + + /* event occured */ + if (m_pending_cnt == 0) { + m_port->on_dp_event_occured(m_event_type); + m_event_id = TrexDpPortEvents::EVENT_ID_INVALID; + } +} + +bool +TrexDpPortEvent::is_active() { + return (status() != EVENT_DISABLE); +} + +bool +TrexDpPortEvent::has_timeout_expired() { + return (status() == EVENT_TIMED_OUT); +} + +const char * +TrexDpPortEvent::event_name(event_e type) { + switch (type) { + case EVENT_STOP: + return "DP STOP"; + + default: + throw TrexException("unknown event type"); + } + +} diff --git a/src/stateless/cp/trex_dp_port_events.h b/src/stateless/cp/trex_dp_port_events.h new file mode 100644 index 00000000..557e590b --- /dev/null +++ b/src/stateless/cp/trex_dp_port_events.h @@ -0,0 +1,171 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_DP_PORT_EVENTS_H__ +#define __TREX_DP_PORT_EVENTS_H__ + +#include <unordered_map> +#include <string> + +class TrexStatelessPort; + +/** + * describes a single DP event related to port + * + * @author imarom (18-Nov-15) + */ +class TrexDpPortEvent { +public: + + enum event_e { + EVENT_STOP = 1, + EVENT_MAX + }; + + /** + * status of the event for the port + */ + enum event_status_e { + EVENT_DISABLE, + EVENT_PENDING, + EVENT_TIMED_OUT, + EVENT_OCCURED + }; + + /** + * init for the event + * + */ + void create(event_e type, TrexStatelessPort *port); + + /** + * create a new pending event + * + */ + void wait_for_event(int event_id, int timeout_ms = -1); + + /** + * mark event as not allowed to happen + * + */ + void disable(); + + /** + * get the event status + * + */ + event_status_e status(); + + /** + * event occured + * + */ + void handle_event(int thread_id, int event_id); + + /** + * returns true if event is active + * + */ + bool is_active(); + + /** + * has timeout already expired ? + * + */ + bool has_timeout_expired(); + + /** + * generate error + * + */ + void err(int thread_id, int event_id, const std::string &err_msg); + + /** + * event to name + * + */ + static const char * event_name(event_e type); + + +private: + + event_e m_event_type; + std::unordered_map<int, bool> m_signal; + int m_pending_cnt; + + TrexStatelessPort *m_port; + int m_event_id; + int m_expire_limit_ms; + +}; + +/** + * all the events related to a port + * + */ +class TrexDpPortEvents { +public: + friend class TrexDpPortEvent; + + void create(TrexStatelessPort *port); + + /** + * generate a new event ID to be used with wait_for_event + * + */ + int generate_event_id(); + + /** + * wait a new DP event on the port + * returns a key which will be used to identify + * the event happened + * + * @author imarom (18-Nov-15) + * + * @param ev - type of event + * @param event_id - a unique identifier for the event + * @param timeout_ms - does it has a timeout ? + * + */ + void wait_for_event(TrexDpPortEvent::event_e ev, int event_id, int timeout_ms = -1); + + /** + * disable an event (don't care) + * + */ + void disable(TrexDpPortEvent::event_e ev); + + /** + * event has occured + * + */ + void handle_event(TrexDpPortEvent::event_e ev, int thread_id, int event_id); + +private: + static const int EVENT_ID_INVALID = -1; + + TrexDpPortEvent m_events[TrexDpPortEvent::EVENT_MAX]; + int m_event_id_counter; + + TrexStatelessPort *m_port; + +}; + +#endif /* __TREX_DP_PORT_EVENTS_H__ */ diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index e0e95450..a4522837 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -47,10 +47,12 @@ TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) { m_port_count = cfg.m_port_count; for (int i = 0; i < m_port_count; i++) { - m_ports.push_back(new TrexStatelessPort(i)); + m_ports.push_back(new TrexStatelessPort(i, cfg.m_platform_api)); } m_platform_api = cfg.m_platform_api; + m_publisher = cfg.m_publisher; + } /** diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h index 57c6ef1d..5c11be1e 100644 --- a/src/stateless/cp/trex_stateless.h +++ b/src/stateless/cp/trex_stateless.h @@ -30,6 +30,7 @@ limitations under the License. #include <trex_stream.h> #include <trex_stateless_port.h> #include <trex_rpc_server_api.h> +#include <publisher/trex_publisher.h> #include <internal_api/trex_platform_api.h> @@ -93,6 +94,7 @@ public: m_rpc_async_cfg = NULL; m_rpc_server_verbose = false; m_platform_api = NULL; + m_publisher = NULL; } const TrexRpcServerConfig *m_rpc_req_resp_cfg; @@ -100,6 +102,7 @@ public: const TrexPlatformApi *m_platform_api; bool m_rpc_server_verbose; uint8_t m_port_count; + TrexPublisher *m_publisher; }; /** @@ -150,6 +153,10 @@ public: return (m_platform_api); } + TrexPublisher * get_publisher() { + return m_publisher; + } + const std::vector <TrexStatelessPort *> get_port_list() { return m_ports; } @@ -170,6 +177,8 @@ protected: /* platform API */ const TrexPlatformApi *m_platform_api; + TrexPublisher *m_publisher; + std::mutex m_global_cp_lock; }; diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index a0b57b63..40392e68 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -27,45 +27,89 @@ limitations under the License. #include <string> #ifndef TREX_RPC_MOCK_SERVER + // DPDK c++ issue -#define UINT8_MAX 255 -#define UINT16_MAX 0xFFFF +#ifndef UINT8_MAX + #define UINT8_MAX 255 +#endif + +#ifndef UINT16_MAX + #define UINT16_MAX 0xFFFF +#endif + // DPDK c++ issue #endif #include <rte_ethdev.h> #include <os_time.h> +void +port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list); + using namespace std; /*************************** * trex stateless port * **************************/ -TrexStatelessPort::TrexStatelessPort(uint8_t port_id) : m_port_id(port_id) { - m_port_state = PORT_STATE_UP_IDLE; +TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) { + std::vector<std::pair<uint8_t, uint8_t>> core_pair_list; + + m_port_id = port_id; + + m_port_state = PORT_STATE_IDLE; clear_owner(); + + /* get the DP cores belonging to this port */ + api->port_id_to_cores(m_port_id, core_pair_list); + + for (auto core_pair : core_pair_list) { + + /* send the core id */ + m_cores_id_list.push_back(core_pair.first); + } + + /* init the events DP DB */ + m_dp_events.create(this); } /** - * starts the traffic on the port + * acquire the port + * + * @author imarom (09-Nov-15) * + * @param user + * @param force */ -TrexStatelessPort::rc_e -TrexStatelessPort::start_traffic(double mul) { - - if (m_port_state != PORT_STATE_UP_IDLE) { - return (RC_ERR_BAD_STATE_FOR_OP); +void +TrexStatelessPort::acquire(const std::string &user, bool force) { + if ( (!is_free_to_aquire()) && (get_owner() != user) && (!force)) { + throw TrexRpcException("port is already taken by '" + get_owner() + "'"); } - if (get_stream_table()->size() == 0) { - return (RC_ERR_NO_STREAMS); - } + set_owner(user); +} + +void +TrexStatelessPort::release(void) { + verify_state( ~(PORT_STATE_TX | PORT_STATE_PAUSE) ); + clear_owner(); +} + +/** + * starts the traffic on the port + * + */ +void +TrexStatelessPort::start_traffic(double mul, double duration) { + + /* command allowed only on state stream */ + verify_state(PORT_STATE_STREAMS); /* fetch all the streams from the table */ vector<TrexStream *> streams; - get_stream_table()->get_object_list(streams); + get_object_list(streams); /* compiler it */ TrexStreamsCompiler compiler; @@ -73,68 +117,121 @@ TrexStatelessPort::start_traffic(double mul) { bool rc = compiler.compile(streams, *compiled_obj); if (!rc) { - return (RC_ERR_FAILED_TO_COMPILE_STREAMS); + throw TrexRpcException("Failed to compile streams"); } /* generate a message to all the relevant DP cores to start transmitting */ - TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(compiled_obj); + int event_id = m_dp_events.generate_event_id(); + /* mark that DP event of stoppped is possible */ + m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, event_id); - // FIXME (add the right core list) - CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(0); + TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_obj, duration); - ring->Enqueue((CGenNode *)start_msg); + m_last_all_streams_continues = compiled_obj->get_all_streams_continues(); + m_last_duration =duration; - /* move the state to transmiting */ - m_port_state = PORT_STATE_TRANSMITTING; + change_state(PORT_STATE_TX); - return (RC_OK); + send_message_to_dp(start_msg); + } -TrexStatelessPort::rc_e +/** + * stop traffic on port + * + * @author imarom (09-Nov-15) + * + * @return TrexStatelessPort::rc_e + */ +void TrexStatelessPort::stop_traffic(void) { - /* real code goes here */ - if (m_port_state != PORT_STATE_TRANSMITTING) { - return (RC_ERR_BAD_STATE_FOR_OP); + if (!( (m_port_state == PORT_STATE_TX) + || (m_port_state ==PORT_STATE_PAUSE) )) { + return; } + /* mask out the DP stop event */ + m_dp_events.disable(TrexDpPortEvent::EVENT_STOP); + /* generate a message to all the relevant DP cores to start transmitting */ TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id); - // FIXME (add the right core list) - CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(0); + send_message_to_dp(stop_msg); - ring->Enqueue((CGenNode *)stop_msg); + change_state(PORT_STATE_STREAMS); + +} - m_port_state = PORT_STATE_UP_IDLE; +void +TrexStatelessPort::pause_traffic(void) { + + verify_state(PORT_STATE_TX); - return (RC_OK); + if (m_last_all_streams_continues == false) { + throw TrexRpcException(" pause is supported when all streams are in continues mode "); + } + + if ( m_last_duration>0.0 ) { + throw TrexRpcException(" pause is supported when duration is not enable is start command "); + } + + TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpPause(m_port_id); + + send_message_to_dp(stop_msg); + + change_state(PORT_STATE_PAUSE); } -/** -* access the stream table -* -*/ -TrexStreamTable * TrexStatelessPort::get_stream_table() { - return &m_stream_table; +void +TrexStatelessPort::resume_traffic(void) { + + verify_state(PORT_STATE_PAUSE); + + /* generate a message to all the relevant DP cores to start transmitting */ + TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpResume(m_port_id); + + send_message_to_dp(stop_msg); + + change_state(PORT_STATE_TX); } +void +TrexStatelessPort::update_traffic(double mul) { + + verify_state(PORT_STATE_STREAMS | PORT_STATE_TX | PORT_STATE_PAUSE); + + #if 0 + /* generate a message to all the relevant DP cores to start transmitting */ + TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id); + + send_message_to_dp(stop_msg); + + m_port_state = PORT_STATE_UP_IDLE; + #endif +} std::string -TrexStatelessPort::get_state_as_string() { +TrexStatelessPort::get_state_as_string() const { switch (get_state()) { case PORT_STATE_DOWN: - return "down"; + return "DOWN"; + + case PORT_STATE_IDLE: + return "IDLE"; - case PORT_STATE_UP_IDLE: - return "idle"; + case PORT_STATE_STREAMS: + return "STREAMS"; - case PORT_STATE_TRANSMITTING: - return "transmitting"; + case PORT_STATE_TX: + return "TX"; + + case PORT_STATE_PAUSE: + return "PAUSE"; } - return "unknown"; + return "UNKNOWN"; } void @@ -145,6 +242,24 @@ TrexStatelessPort::get_properties(string &driver, string &speed) { speed = "1 Gbps"; } +bool +TrexStatelessPort::verify_state(int state, bool should_throw) const { + if ( (state & m_port_state) == 0 ) { + if (should_throw) { + throw TrexRpcException("command cannot be executed on current state: '" + get_state_as_string() + "'"); + } else { + return false; + } + } + + return true; +} + +void +TrexStatelessPort::change_state(port_state_e new_state) { + + m_port_state = new_state; +} /** * generate a random connection handler @@ -191,3 +306,39 @@ TrexStatelessPort::encode_stats(Json::Value &port) { port["tx_rx_errors"] = Json::Value::UInt64(stats.m_stats.m_tx_rx_errors); } +void +TrexStatelessPort::send_message_to_dp(TrexStatelessCpToDpMsgBase *msg) { + + for (auto core_id : m_cores_id_list) { + + /* send the message to the core */ + CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_id); + ring->Enqueue((CGenNode *)msg->clone()); + } + +} + +/** + * when a DP (async) event occurs - handle it + * + */ +void +TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) { + Json::Value data; + + switch (event_type) { + + case TrexDpPortEvent::EVENT_STOP: + /* set a stop event */ + change_state(PORT_STATE_STREAMS); + /* send a ZMQ event */ + + data["port_id"] = m_port_id; + get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STOPPED, data); + break; + + default: + assert(0); + + } +} diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index 3e071954..006ec97c 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -22,6 +22,10 @@ limitations under the License. #define __TREX_STATELESS_PORT_H__ #include <trex_stream.h> +#include <trex_dp_port_events.h> + +class TrexPlatformApi; +class TrexStatelessCpToDpMsgBase; /** * describes a stateless port @@ -29,15 +33,19 @@ limitations under the License. * @author imarom (31-Aug-15) */ class TrexStatelessPort { + friend class TrexDpPortEvent; + public: /** * port state */ enum port_state_e { - PORT_STATE_DOWN, - PORT_STATE_UP_IDLE, - PORT_STATE_TRANSMITTING + PORT_STATE_DOWN = 0x1, + PORT_STATE_IDLE = 0x2, + PORT_STATE_STREAMS = 0x4, + PORT_STATE_TX = 0x8, + PORT_STATE_PAUSE = 0x10, }; /** @@ -50,31 +58,55 @@ public: RC_ERR_FAILED_TO_COMPILE_STREAMS }; - TrexStatelessPort(uint8_t port_id); + TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api); + + /** + * acquire port + * throws TrexException in case of an error + */ + void acquire(const std::string &user, bool force = false); + + /** + * release the port from the current user + * throws TrexException in case of an error + */ + void release(void); /** * start traffic - * + * throws TrexException in case of an error */ - rc_e start_traffic(double mul); + void start_traffic(double mul, double duration = -1); /** * stop traffic - * + * throws TrexException in case of an error + */ + void stop_traffic(void); + + /** + * pause traffic + * throws TrexException in case of an error + */ + void pause_traffic(void); + + /** + * resume traffic + * throws TrexException in case of an error */ - rc_e stop_traffic(void); + void resume_traffic(void); /** - * access the stream table + * update current traffic on port * */ - TrexStreamTable *get_stream_table(); + void update_traffic(double mul); /** * get the port state * */ - port_state_e get_state() { + port_state_e get_state() const { return m_port_state; } @@ -82,7 +114,7 @@ public: * port state as string * */ - std::string get_state_as_string(); + std::string get_state_as_string() const; /** * fill up properties of the port @@ -94,6 +126,7 @@ public: */ void get_properties(std::string &driver, std::string &speed); + /** * query for ownership * @@ -111,10 +144,75 @@ public: return m_owner_handler; } - bool is_free_to_aquire() { - return (m_owner == "none"); + + bool verify_owner_handler(const std::string &handler) { + + return ( (m_owner != "none") && (m_owner_handler == handler) ); + + } + + /** + * encode stats as JSON + */ + void encode_stats(Json::Value &port); + + uint8_t get_port_id() { + return m_port_id; + } + + /** + * delegators + * + */ + + void add_stream(TrexStream *stream) { + verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS); + + m_stream_table.add_stream(stream); + + change_state(PORT_STATE_STREAMS); } + void remove_stream(TrexStream *stream) { + verify_state(PORT_STATE_STREAMS); + + m_stream_table.remove_stream(stream); + + if (m_stream_table.size() == 0) { + change_state(PORT_STATE_IDLE); + } + } + + void remove_and_delete_all_streams() { + verify_state(PORT_STATE_IDLE | PORT_STATE_STREAMS); + + m_stream_table.remove_and_delete_all_streams(); + + change_state(PORT_STATE_IDLE); + } + + TrexStream * get_stream_by_id(uint32_t stream_id) { + return m_stream_table.get_stream_by_id(stream_id); + } + + void get_id_list(std::vector<uint32_t> &id_list) { + m_stream_table.get_id_list(id_list); + } + + void get_object_list(std::vector<TrexStream *> &object_list) { + m_stream_table.get_object_list(object_list); + } + + TrexDpPortEvents & get_dp_events() { + return m_dp_events; + } + + + +private: + + + /** * take ownership of the server array * this is static @@ -131,30 +229,43 @@ public: m_owner_handler = ""; } - bool verify_owner_handler(const std::string &handler) { + bool is_free_to_aquire() { + return (m_owner == "none"); + } - return ( (m_owner != "none") && (m_owner_handler == handler) ); + const std::vector<int> get_core_id_list () { + return m_cores_id_list; } + bool verify_state(int state, bool should_throw = true) const; + + void change_state(port_state_e new_state); + + std::string generate_handler(); + + void send_message_to_dp(TrexStatelessCpToDpMsgBase *msg); + /** - * encode stats as JSON + * triggered when event occurs + * */ - void encode_stats(Json::Value &port); + void on_dp_event_occured(TrexDpPortEvent::event_e event_type); - uint8_t get_port_id() { - return m_port_id; - } -private: + TrexStreamTable m_stream_table; + uint8_t m_port_id; + port_state_e m_port_state; + std::string m_owner; + std::string m_owner_handler; - std::string generate_handler(); + /* holds the DP cores associated with this port */ + std::vector<int> m_cores_id_list; + + bool m_last_all_streams_continues; + double m_last_duration; - TrexStreamTable m_stream_table; - uint8_t m_port_id; - port_state_e m_port_state; - std::string m_owner; - std::string m_owner_handler; + TrexDpPortEvents m_dp_events; }; #endif /* __TREX_STATELESS_PORT_H__ */ diff --git a/src/stateless/cp/trex_stream.cpp b/src/stateless/cp/trex_stream.cpp index ba306137..5203b2a2 100644 --- a/src/stateless/cp/trex_stream.cpp +++ b/src/stateless/cp/trex_stream.cpp @@ -25,9 +25,76 @@ limitations under the License. /************************************** * stream *************************************/ -TrexStream::TrexStream(uint8_t port_id, uint32_t stream_id) : m_port_id(port_id), m_stream_id(stream_id) { + + +std::string TrexStream::get_stream_type_str(stream_type_t stream_type){ + + std::string res; + + + switch (stream_type) { + + case stCONTINUOUS : + res="stCONTINUOUS "; + break; + + case stSINGLE_BURST : + res="stSINGLE_BURST "; + break; + + case stMULTI_BURST : + res="stMULTI_BURST "; + break; + default: + res="Unknow "; + }; + return(res); +} + + +void TrexStream::Dump(FILE *fd){ + + fprintf(fd,"\n"); + fprintf(fd,"==> Stream_id : %lu \n",(ulong)m_stream_id); + fprintf(fd," Enabled : %lu \n",(ulong)(m_enabled?1:0)); + fprintf(fd," Self_start : %lu \n",(ulong)(m_self_start?1:0)); + + if (m_next_stream_id>=0) { + fprintf(fd," Nex_stream_id : %lu \n",(ulong)m_next_stream_id); + }else { + fprintf(fd," Nex_stream_id : %d \n",m_next_stream_id); + } + + fprintf(fd," Port_id : %lu \n",(ulong)m_port_id); + + if (m_isg_usec>0.0) { + fprintf(fd," isg : %6.2f \n",m_isg_usec); + } + fprintf(fd," type : %s \n",get_stream_type_str(m_type).c_str()); + + if ( m_type == TrexStream::stCONTINUOUS ) { + fprintf(fd," pps : %f \n",m_pps); + } + if (m_type == TrexStream::stSINGLE_BURST) { + fprintf(fd," pps : %f \n",m_pps); + fprintf(fd," burst : %lu \n",(ulong)m_burst_total_pkts); + } + if (m_type == TrexStream::stMULTI_BURST) { + fprintf(fd," pps : %f \n",m_pps); + fprintf(fd," burst : %lu \n",(ulong)m_burst_total_pkts); + fprintf(fd," mburst : %lu \n",(ulong)m_num_bursts); + if (m_ibg_usec>0.0) { + fprintf(fd," m_ibg_usec : %f \n",m_ibg_usec); + } + } +} + + +TrexStream::TrexStream(uint8_t type, + uint8_t port_id, uint32_t stream_id) : m_port_id(port_id), m_stream_id(stream_id) { /* default values */ + m_type = type; m_isg_usec = 0; m_next_stream_id = -1; m_enabled = false; @@ -38,6 +105,11 @@ TrexStream::TrexStream(uint8_t port_id, uint32_t stream_id) : m_port_id(port_id) m_rx_check.m_enable = false; + + m_pps=-1.0; + m_burst_total_pkts=0; + m_num_bursts=1; + m_ibg_usec=0.0; } TrexStream::~TrexStream() { diff --git a/src/stateless/cp/trex_stream.h b/src/stateless/cp/trex_stream.h index c8a15240..0634829e 100644 --- a/src/stateless/cp/trex_stream.h +++ b/src/stateless/cp/trex_stream.h @@ -1,5 +1,6 @@ /* Itay Marom + Hanoch Haim Cisco Systems, Inc. */ @@ -29,9 +30,28 @@ limitations under the License. #include <json/json.h> #include <trex_stream_vm.h> +#include <stdio.h> +#include <string.h> class TrexRpcCmdAddStream; + +struct CStreamPktData { + uint8_t *binary; + uint16_t len; + + std::string meta; + +public: + inline void clone(uint8_t * in_binary, + uint32_t in_pkt_size){ + binary = new uint8_t[in_pkt_size]; + len = in_pkt_size; + memcpy(binary,in_binary,in_pkt_size); + } +}; + + /** * Stateless Stream * @@ -39,8 +59,20 @@ class TrexRpcCmdAddStream; class TrexStream { public: - TrexStream(uint8_t port_id, uint32_t stream_id); - virtual ~TrexStream() = 0; + enum STREAM_TYPE { + stNONE = 0, + stCONTINUOUS = 4, + stSINGLE_BURST = 5, + stMULTI_BURST = 6 + }; + + typedef uint8_t stream_type_t ; + + static std::string get_stream_type_str(stream_type_t stream_type); + +public: + TrexStream(uint8_t type,uint8_t port_id, uint32_t stream_id); + virtual ~TrexStream(); /* defines the min max per packet supported */ static const uint32_t MIN_PKT_SIZE_BYTES = 1; @@ -52,10 +84,78 @@ public: /* access the stream json */ const Json::Value & get_stream_json(); + /* compress the stream id to be zero based */ + void fix_dp_stream_id(uint32_t my_stream_id,int next_stream_id){ + m_stream_id = my_stream_id; + m_next_stream_id = next_stream_id; + } + + double get_pps() { + return m_pps; + } + + void set_pps(double pps){ + m_pps = pps; + } + + void set_type(uint8_t type){ + m_type = type; + } + + uint8_t get_type(void) const { + return ( m_type ); + } + + bool is_dp_next_stream(){ + if (m_next_stream_id<0) { + return (false); + }else{ + return (true); + } + } + + + + void set_multi_burst(uint32_t burst_total_pkts, + uint32_t num_bursts, + double ibg_usec) { + m_burst_total_pkts = burst_total_pkts; + m_num_bursts = num_bursts; + m_ibg_usec = ibg_usec; + } + + void set_single_burst(uint32_t burst_total_pkts){ + set_multi_burst(burst_total_pkts,1,0.0); + } + + /* create new stream */ + TrexStream * clone_as_dp(){ + TrexStream * dp=new TrexStream(m_type,m_port_id,m_stream_id); + + + dp->m_isg_usec = m_isg_usec; + dp->m_next_stream_id = m_next_stream_id; + + dp->m_enabled = m_enabled; + dp->m_self_start = m_self_start; + + /* deep copy */ + dp->m_pkt.clone(m_pkt.binary,m_pkt.len); + + dp->m_rx_check = m_rx_check; + dp->m_pps = m_pps; + dp->m_burst_total_pkts = m_burst_total_pkts; + dp->m_num_bursts = m_num_bursts; + dp->m_ibg_usec = m_ibg_usec ; + return (dp); + } + + void Dump(FILE *fd); public: /* basic */ + uint8_t m_type; uint8_t m_port_id; - uint32_t m_stream_id; + uint32_t m_stream_id; /* id from RPC can be anything */ /* config fields */ @@ -65,13 +165,9 @@ public: /* indicators */ bool m_enabled; bool m_self_start; - + + CStreamPktData m_pkt; /* pkt */ - struct { - uint8_t *binary; - uint16_t len; - std::string meta; - } m_pkt; /* VM */ StreamVm m_vm; @@ -85,64 +181,19 @@ public: } m_rx_check; + double m_pps; - /* original template provided by requester */ - Json::Value m_stream_json; -}; + uint32_t m_burst_total_pkts; /* valid in case of burst stSINGLE_BURST,stMULTI_BURST*/ -/** - * continuous stream - * - */ -class TrexStreamContinuous : public TrexStream { -public: - TrexStreamContinuous(uint8_t port_id, uint32_t stream_id, double pps) : TrexStream(port_id, stream_id), m_pps(pps) { - } + uint32_t m_num_bursts; /* valid in case of stMULTI_BURST */ - double get_pps() { - return m_pps; - } + double m_ibg_usec; /* valid in case of stMULTI_BURST */ -protected: - double m_pps; -}; - -/** - * single burst - * - */ -class TrexStreamBurst : public TrexStream { -public: - TrexStreamBurst(uint8_t port_id, uint32_t stream_id, uint32_t total_pkts, double pps) : - TrexStream(port_id, stream_id), - m_total_pkts(total_pkts), - m_pps(pps) { - } + /* original template provided by requester */ + Json::Value m_stream_json; -protected: - uint32_t m_total_pkts; - double m_pps; }; -/** - * multi burst - * - */ -class TrexStreamMultiBurst : public TrexStreamBurst { -public: - TrexStreamMultiBurst(uint8_t port_id, - uint32_t stream_id, - uint32_t pkts_per_burst, - double pps, - uint32_t num_bursts, - double ibg_usec) : TrexStreamBurst(port_id, stream_id, pkts_per_burst, pps), m_num_bursts(num_bursts), m_ibg_usec(ibg_usec) { - - } -protected: - uint32_t m_num_bursts; - double m_ibg_usec; - -}; /** * holds all the streams diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp index 5e2602ec..302863ae 100644 --- a/src/stateless/cp/trex_streams_compiler.cpp +++ b/src/stateless/cp/trex_streams_compiler.cpp @@ -19,42 +19,390 @@ See the License for the specific language governing permissions and limitations under the License. */ -#include <string.h> +#include <string> +#include <sstream> #include <trex_streams_compiler.h> #include <trex_stream.h> +#include <assert.h> +#include <trex_stateless.h> +#include <iostream> + +/** + * describes a graph node in the pre compile check + * + * @author imarom (16-Nov-15) + */ +class GraphNode { +public: + GraphNode(TrexStream *stream, GraphNode *next) : m_stream(stream), m_next(next) { + marked = false; + m_compressed_stream_id=-1; + } + + uint32_t get_stream_id() const { + return m_stream->m_stream_id; + } + + const TrexStream *m_stream; + GraphNode *m_next; + std::vector<const GraphNode *> m_parents; + bool marked; + int m_compressed_stream_id; +}; + +/** + * node map + * + */ +class GraphNodeMap { +public: + + GraphNodeMap() : m_dead_end(NULL, NULL) { + + } + + bool add(GraphNode *node) { + if (has(node->get_stream_id())) { + return false; + } + + m_nodes[node->get_stream_id()] = node; + + if (node->m_stream->m_self_start) { + m_roots.push_back(node); + } + + return true; + } + + bool has(uint32_t stream_id) { + + return (get(stream_id) != NULL); + } + + GraphNode * get(uint32_t stream_id) { + + if (stream_id == -1) { + return &m_dead_end; + } + + auto search = m_nodes.find(stream_id); + + if (search != m_nodes.end()) { + return search->second; + } else { + return NULL; + } + } + + void clear_marks() { + for (auto node : m_nodes) { + node.second->marked = false; + } + } + + void get_unmarked(std::vector <GraphNode *> &unmarked) { + for (auto node : m_nodes) { + if (!node.second->marked) { + unmarked.push_back(node.second); + } + } + } + + + ~GraphNodeMap() { + for (auto node : m_nodes) { + delete node.second; + } + m_nodes.clear(); + } + + std::vector <GraphNode *> & get_roots() { + return m_roots; + } + + + std::unordered_map<uint32_t, GraphNode *> get_nodes() { + return m_nodes; + } + +private: + std::unordered_map<uint32_t, GraphNode *> m_nodes; + std::vector <GraphNode *> m_roots; + GraphNode m_dead_end; +}; /************************************** * stream compiled object *************************************/ TrexStreamsCompiledObj::TrexStreamsCompiledObj(uint8_t port_id, double mul) : m_port_id(port_id), m_mul(mul) { + m_all_continues=false; } TrexStreamsCompiledObj::~TrexStreamsCompiledObj() { - for (auto &obj : m_objs) { - delete obj.m_pkt; + for (auto obj : m_objs) { + delete obj.m_stream; } m_objs.clear(); } + void -TrexStreamsCompiledObj::add_compiled_stream(double pps, uint8_t *pkt, uint16_t pkt_len) { +TrexStreamsCompiledObj::add_compiled_stream(TrexStream * stream){ + obj_st obj; - obj.m_port_id = m_port_id; - obj.m_pps = pps * m_mul; - obj.m_pkt_len = pkt_len; + obj.m_stream = stream->clone_as_dp(); + + m_objs.push_back(obj); +} + +void +TrexStreamsCompiledObj::add_compiled_stream(TrexStream * stream, + uint32_t my_dp_id, int next_dp_id) { + obj_st obj; - obj.m_pkt = new uint8_t[pkt_len]; - memcpy(obj.m_pkt, pkt, pkt_len); + obj.m_stream = stream->clone_as_dp(); + /* compress the id's*/ + obj.m_stream->fix_dp_stream_id(my_dp_id,next_dp_id); m_objs.push_back(obj); } +void TrexStreamsCompiledObj::Dump(FILE *fd){ + for (auto obj : m_objs) { + obj.m_stream->Dump(fd); + } +} + + + +TrexStreamsCompiledObj * +TrexStreamsCompiledObj::clone() { + + /* use multiplier of 1 to avoid double mult */ + TrexStreamsCompiledObj *new_compiled_obj = new TrexStreamsCompiledObj(m_port_id, 1); + + /** + * clone each element + */ + for (auto obj : m_objs) { + new_compiled_obj->add_compiled_stream(obj.m_stream); + } + + new_compiled_obj->m_mul = m_mul; + + return new_compiled_obj; +} + +void +TrexStreamsCompiler::add_warning(const std::string &warning) { + m_warnings.push_back("*** warning: " + warning); +} + +void +TrexStreamsCompiler::err(const std::string &err) { + throw TrexException("*** error: " + err); +} + +void +TrexStreamsCompiler::check_stream(const TrexStream *stream) { + std::stringstream ss; + + /* cont. stream can point only on itself */ + if (stream->get_type() == TrexStream::stCONTINUOUS) { + if (stream->m_next_stream_id != -1) { + ss << "continous stream '" << stream->m_stream_id << "' cannot point on another stream"; + err(ss.str()); + } + } +} + +void +TrexStreamsCompiler::allocate_pass(const std::vector<TrexStream *> &streams, + GraphNodeMap *nodes) { + std::stringstream ss; + uint32_t compressed_stream_id=0; + + + /* first pass - allocate all nodes and check for duplicates */ + for (auto stream : streams) { + + /* skip non enabled streams */ + if (!stream->m_enabled) { + continue; + } + + /* sanity check on the stream itself */ + check_stream(stream); + + /* duplicate stream id ? */ + if (nodes->has(stream->m_stream_id)) { + ss << "duplicate instance of stream id " << stream->m_stream_id; + err(ss.str()); + } + + GraphNode *node = new GraphNode(stream, NULL); + /* allocate new compressed id */ + node->m_compressed_stream_id = compressed_stream_id; + + compressed_stream_id++; + + /* add to the map */ + assert(nodes->add(node)); + } + +} + +/** + * on this pass we direct the graph to point to the right nodes + * + */ +void +TrexStreamsCompiler::direct_pass(GraphNodeMap *nodes) { + + /* second pass - direct the graph */ + for (auto p : nodes->get_nodes()) { + + GraphNode *node = p.second; + const TrexStream *stream = node->m_stream; + + /* check the stream points on an existing stream */ + GraphNode *next_node = nodes->get(stream->m_next_stream_id); + if (!next_node) { + std::stringstream ss; + ss << "stream " << node->get_stream_id() << " is pointing on non existent stream " << stream->m_next_stream_id; + err(ss.str()); + } + + node->m_next = next_node; + + /* do we have more than one parent ? */ + next_node->m_parents.push_back(node); + } + + + /* check for multiple parents */ + for (auto p : nodes->get_nodes()) { + GraphNode *node = p.second; + + if (node->m_parents.size() > 0 ) { + std::stringstream ss; + + ss << "stream " << node->get_stream_id() << " is triggered by multiple streams: "; + for (auto x : node->m_parents) { + ss << x->get_stream_id() << " "; + } + + add_warning(ss.str()); + } + } +} + +/** + * mark sure all the streams are reachable + * + */ +void +TrexStreamsCompiler::check_for_unreachable_streams(GraphNodeMap *nodes) { + /* start with the roots */ + std::vector <GraphNode *> next_nodes = nodes->get_roots(); + + + nodes->clear_marks(); + + /* run BFS from all the roots */ + while (!next_nodes.empty()) { + + /* pull one */ + GraphNode *node = next_nodes.back(); + next_nodes.pop_back(); + if (node->marked) { + continue; + } + + node->marked = true; + + if (node->m_next != NULL) { + next_nodes.push_back(node->m_next); + } + + } + + std::vector <GraphNode *> unmarked; + nodes->get_unmarked(unmarked); + + if (!unmarked.empty()) { + std::stringstream ss; + for (auto node : unmarked) { + ss << "stream " << node->get_stream_id() << " is unreachable from any other stream\n"; + } + err(ss.str()); + } + + +} + +/** + * check validation of streams for compile + * + * @author imarom (16-Nov-15) + * + * @param streams + * @param fail_msg + * + * @return bool + */ +void +TrexStreamsCompiler::pre_compile_check(const std::vector<TrexStream *> &streams, + GraphNodeMap & nodes) { + + m_warnings.clear(); + + /* allocate nodes */ + allocate_pass(streams, &nodes); + + /* direct the graph */ + direct_pass(&nodes); + + /* check for non reachable streams inside the graph */ + check_for_unreachable_streams(&nodes); + +} + /************************************** * stream compiler *************************************/ bool -TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams, TrexStreamsCompiledObj &obj) { +TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams, + TrexStreamsCompiledObj &obj, + std::string *fail_msg) { + +#if 0 + fprintf(stdout,"------------pre compile \n"); + for (auto stream : streams) { + stream->Dump(stdout); + } + fprintf(stdout,"------------pre compile \n"); +#endif + + GraphNodeMap nodes; + + + /* compile checks */ + try { + pre_compile_check(streams,nodes); + } catch (const TrexException &ex) { + if (fail_msg) { + *fail_msg = ex.what(); + } else { + std::cout << ex.what(); + } + return false; + } + + + bool all_continues=true; /* for now we do something trivial, */ for (auto stream : streams) { @@ -62,24 +410,26 @@ TrexStreamsCompiler::compile(const std::vector<TrexStream *> &streams, TrexStrea if (!stream->m_enabled) { continue; } - - /* for now skip also non self started streams */ - if (!stream->m_self_start) { - continue; + if (stream->get_type() != TrexStream::stCONTINUOUS ) { + all_continues=false; } - /* for now support only continous ... */ - TrexStreamContinuous *cont_stream = dynamic_cast<TrexStreamContinuous *>(stream); - if (!cont_stream) { - continue; + int new_id= nodes.get(stream->m_stream_id)->m_compressed_stream_id; + assert(new_id>=0); + uint32_t my_stream_id = (uint32_t)new_id; + int my_next_stream_id=-1; + if (stream->m_next_stream_id>=0) { + my_next_stream_id=nodes.get(stream->m_next_stream_id)->m_compressed_stream_id; } /* add it */ - obj.add_compiled_stream(cont_stream->get_pps(), - cont_stream->m_pkt.binary, - cont_stream->m_pkt.len); + obj.add_compiled_stream(stream, + my_stream_id, + my_next_stream_id + ); } - + obj.m_all_continues =all_continues; return true; } + diff --git a/src/stateless/cp/trex_streams_compiler.h b/src/stateless/cp/trex_streams_compiler.h index 06f992ed..17ca3c74 100644 --- a/src/stateless/cp/trex_streams_compiler.h +++ b/src/stateless/cp/trex_streams_compiler.h @@ -23,9 +23,11 @@ limitations under the License. #include <stdint.h> #include <vector> +#include <string> class TrexStreamsCompiler; class TrexStream; +class GraphNodeMap; /** * compiled object for a table of streams @@ -40,33 +42,78 @@ public: ~TrexStreamsCompiledObj(); struct obj_st { - double m_pps; - uint8_t *m_pkt; - uint16_t m_pkt_len; - uint8_t m_port_id; + + TrexStream * m_stream; }; const std::vector<obj_st> & get_objects() { return m_objs; } + uint8_t get_port_id(){ + return (m_port_id); + } + + /** + * clone the compiled object + * + */ + TrexStreamsCompiledObj * clone(); + + double get_multiplier(){ + return (m_mul); + } + + bool get_all_streams_continues(){ + return (m_all_continues); + } + + void Dump(FILE *fd); + private: - void add_compiled_stream(double pps, uint8_t *pkt, uint16_t pkt_len); + void add_compiled_stream(TrexStream * stream, + uint32_t my_dp_id, int next_dp_id); + void add_compiled_stream(TrexStream * stream); + std::vector<obj_st> m_objs; + bool m_all_continues; uint8_t m_port_id; double m_mul; }; class TrexStreamsCompiler { public: + /** * compiles a vector of streams to an object passable to the DP * * @author imarom (28-Oct-15) * */ - bool compile(const std::vector<TrexStream *> &streams, TrexStreamsCompiledObj &obj); + bool compile(const std::vector<TrexStream *> &streams, TrexStreamsCompiledObj &obj, std::string *fail_msg = NULL); + + /** + * + * returns a reference pointer to the last compile warnings + * if no warnings were produced - the vector is empty + */ + const std::vector<std::string> & get_last_compile_warnings() { + return m_warnings; + } + +private: + + void pre_compile_check(const std::vector<TrexStream *> &streams, + GraphNodeMap & nodes); + void allocate_pass(const std::vector<TrexStream *> &streams, GraphNodeMap *nodes); + void direct_pass(GraphNodeMap *nodes); + void check_for_unreachable_streams(GraphNodeMap *nodes); + void check_stream(const TrexStream *stream); + void add_warning(const std::string &warning); + void err(const std::string &err); + + std::vector<std::string> m_warnings; }; #endif /* __TREX_STREAMS_COMPILER_H__ */ diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index 306b23d0..9b4a6ad9 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -1,5 +1,6 @@ /* Itay Marom + Hanoch Haim Cisco Systems, Inc. */ @@ -22,12 +23,193 @@ limitations under the License. #include <trex_stateless_messaging.h> #include <trex_streams_compiler.h> #include <trex_stream_node.h> +#include <trex_stream.h> #include <bp_sim.h> -TrexStatelessDpCore::TrexStatelessDpCore(uint8_t thread_id, CFlowGenListPerThread *core) { + +void CDpOneStream::Delete(CFlowGenListPerThread * core){ + assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE); + core->free_node((CGenNode *)m_node); + delete m_dp_stream; + m_node=0; + m_dp_stream=0; +} + +void CDpOneStream::DeleteOnlyStream(){ + assert(m_dp_stream); + delete m_dp_stream; + m_dp_stream=0; +} + +int CGenNodeStateless::get_stream_id(){ + if (m_state ==CGenNodeStateless::ss_FREE_RESUSE) { + return (-1); // not valid + } + assert(m_ref_stream_info); + return ((int)m_ref_stream_info->m_stream_id); +} + + +void CGenNodeStateless::DumpHeader(FILE *fd){ + fprintf(fd," pkt_id, time, port , action , state, stream_id , stype , m-burst# , burst# \n"); + +} +void CGenNodeStateless::Dump(FILE *fd){ + fprintf(fd," %2.4f, %3lu, %s,%s, %3d, %s, %3lu, %3lu \n", + m_time, + (ulong)m_port_id, + "s-pkt", //action + get_stream_state_str(m_state ).c_str(), + get_stream_id(), //stream_id + TrexStream::get_stream_type_str(m_stream_type).c_str(), //stype + (ulong)m_multi_bursts, + (ulong)m_single_burst + ); +} + + +void CGenNodeStateless::refresh(){ + + /* refill the stream info */ + m_single_burst = m_single_burst_refill; + m_multi_bursts = m_ref_stream_info->m_num_bursts; + m_state = CGenNodeStateless::ss_ACTIVE; +} + + +void CGenNodeCommand::free_command(){ + + assert(m_cmd); + m_cmd->on_node_remove(); + delete m_cmd; +} + + +std::string CGenNodeStateless::get_stream_state_str(stream_state_t stream_state){ + std::string res; + + switch (stream_state) { + case CGenNodeStateless::ss_FREE_RESUSE : + res="FREE "; + break; + case CGenNodeStateless::ss_INACTIVE : + res="INACTIVE "; + break; + case CGenNodeStateless::ss_ACTIVE : + res="ACTIVE "; + break; + default: + res="Unknow "; + }; + return(res); +} + + +void CGenNodeStateless::free_stl_node(){ + /* if we have cache mbuf free it */ + rte_mbuf_t * m=get_cache_mbuf(); + if (m) { + rte_pktmbuf_free(m); + m_cache_mbuf=0; + } +} + + +bool TrexStatelessDpPerPort::update_number_of_active_streams(uint32_t d){ + m_active_streams-=d; /* reduce the number of streams */ + if (m_active_streams == 0) { + return (true); + } + return (false); +} + +bool TrexStatelessDpPerPort::resume_traffic(uint8_t port_id){ + + /* we are working with continues streams so we must be in transmit mode */ + assert(m_state == TrexStatelessDpPerPort::ppSTATE_PAUSE); + + for (auto dp_stream : m_active_nodes) { + CGenNodeStateless * node =dp_stream.m_node; + assert(node->get_port_id() == port_id); + assert(node->is_pause() == true); + node->set_pause(false); + } + m_state = TrexStatelessDpPerPort::ppSTATE_TRANSMITTING; + return (true); +} + + +bool TrexStatelessDpPerPort::pause_traffic(uint8_t port_id){ + + /* we are working with continues streams so we must be in transmit mode */ + assert(m_state == TrexStatelessDpPerPort::ppSTATE_TRANSMITTING); + + for (auto dp_stream : m_active_nodes) { + CGenNodeStateless * node =dp_stream.m_node; + assert(node->get_port_id() == port_id); + assert(node->is_pause() == false); + node->set_pause(true); + } + m_state = TrexStatelessDpPerPort::ppSTATE_PAUSE; + return (true); +} + + +bool TrexStatelessDpPerPort::stop_traffic(uint8_t port_id, + bool stop_on_id, + int event_id){ + + + if (m_state == TrexStatelessDpPerPort::ppSTATE_IDLE) { + assert(m_active_streams==0); + return false; + } + + /* there could be race of stop after stop */ + if ( stop_on_id ) { + if (event_id != m_event_id){ + /* we can't stop it is an old message */ + return false; + } + } + + for (auto dp_stream : m_active_nodes) { + CGenNodeStateless * node =dp_stream.m_node; + assert(node->get_port_id() == port_id); + if ( node->get_state() == CGenNodeStateless::ss_ACTIVE) { + node->mark_for_free(); + m_active_streams--; + dp_stream.DeleteOnlyStream(); + + }else{ + dp_stream.Delete(m_core); + } + } + + /* active stream should be zero */ + assert(m_active_streams==0); + m_active_nodes.clear(); + m_state=TrexStatelessDpPerPort::ppSTATE_IDLE; + return (true); +} + + +void TrexStatelessDpPerPort::create(CFlowGenListPerThread * core){ + m_core=core; + m_state=TrexStatelessDpPerPort::ppSTATE_IDLE; + m_port_id=0; + m_active_streams=0; + m_active_nodes.clear(); +} + + + +void +TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) { m_thread_id = thread_id; m_core = core; + m_local_port_offset = 2*core->getDualPortId(); CMessagingManager * cp_dp = CMsgIns::Ins()->getCpDp(); @@ -35,8 +217,54 @@ TrexStatelessDpCore::TrexStatelessDpCore(uint8_t thread_id, CFlowGenListPerThrea m_ring_to_cp = cp_dp->getRingDpToCp(thread_id); m_state = STATE_IDLE; + + int i; + for (i=0; i<NUM_PORTS_PER_CORE; i++) { + m_ports[i].create(core); + } } + +/* move to the next stream, old stream move to INACTIVE */ +bool TrexStatelessDpCore::set_stateless_next_node(CGenNodeStateless * cur_node, + CGenNodeStateless * next_node){ + + assert(cur_node); + TrexStatelessDpPerPort * lp_port = get_port_db(cur_node->m_port_id); + bool schedule =false; + + bool to_stop_port=false; + + if (next_node == NULL) { + /* there is no next stream , reduce the number of active streams*/ + to_stop_port = lp_port->update_number_of_active_streams(1); + + }else{ + uint8_t state=next_node->get_state(); + + /* can't be FREE_RESUSE */ + assert(state != CGenNodeStateless::ss_FREE_RESUSE); + if (next_node->get_state() == CGenNodeStateless::ss_INACTIVE ) { + + /* refill start info and scedule, no update in active streams */ + next_node->refresh(); + schedule = true; + + }else{ + to_stop_port = lp_port->update_number_of_active_streams(1); + } + } + + if ( to_stop_port ) { + /* call stop port explictly to move the state */ + stop_traffic(cur_node->m_port_id,false,0); + } + + return ( schedule ); +} + + + /** * in idle state loop, the processor most of the time sleeps * and periodically checks for messages @@ -52,6 +280,15 @@ TrexStatelessDpCore::idle_state_loop() { } } + + +void TrexStatelessDpCore::quit_main_loop(){ + m_core->set_terminate_mode(true); /* mark it as terminated */ + m_state = STATE_TERMINATE; + add_global_duration(0.0001); +} + + /** * scehduler runs when traffic exists * it will return when no more transmitting is done on this @@ -68,37 +305,172 @@ TrexStatelessDpCore::start_scheduler() { m_core->m_node_gen.add_node(node_sync); double old_offset = 0.0; - m_core->m_node_gen.flush_file(100000000, 0.0, false, m_core, old_offset); + m_core->m_node_gen.flush_file(-1, 0.0, false, m_core, old_offset); + /* bail out in case of terminate */ + if (m_state != TrexStatelessDpCore::STATE_TERMINATE) { + m_core->m_node_gen.close_file(m_core); + } } + +void +TrexStatelessDpCore::run_once(){ + + idle_state_loop(); + + if ( m_state == STATE_TERMINATE ){ + return; + } + + start_scheduler(); +} + + + + void TrexStatelessDpCore::start() { while (true) { - idle_state_loop(); + run_once(); + + if ( m_core->is_terminated_by_master() ) { + break; + } + } +} + +/* only if both port are idle we can exit */ +void +TrexStatelessDpCore::schedule_exit(){ + + CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ; + + node->m_type = CGenNode::COMMAND; - start_scheduler(); + node->m_cmd = new TrexStatelessDpCanQuit(); + + /* make sure it will be scheduled after the current node */ + node->m_time = m_core->m_cur_time_sec ; + + m_core->m_node_gen.add_node((CGenNode *)node); +} + + +void +TrexStatelessDpCore::add_global_duration(double duration){ + if (duration > 0.0) { + CGenNode *node = m_core->create_node() ; + + node->m_type = CGenNode::EXIT_SCHED; + + /* make sure it will be scheduled after the current node */ + node->m_time = m_core->m_cur_time_sec + duration ; + + m_core->m_node_gen.add_node(node); } } +/* add per port exit */ void -TrexStatelessDpCore::add_cont_stream(double pps, const uint8_t *pkt, uint16_t pkt_len) { +TrexStatelessDpCore::add_port_duration(double duration, + uint8_t port_id, + int event_id){ + if (duration > 0.0) { + CGenNodeCommand *node = (CGenNodeCommand *)m_core->create_node() ; + + node->m_type = CGenNode::COMMAND; + + /* make sure it will be scheduled after the current node */ + node->m_time = m_core->m_cur_time_sec + duration ; + + TrexStatelessDpStop * cmd=new TrexStatelessDpStop(port_id); + + + /* test this */ + m_core->m_non_active_nodes++; + cmd->set_core_ptr(m_core); + cmd->set_event_id(event_id); + cmd->set_wait_for_event_id(true); + + node->m_cmd = cmd; + + m_core->m_node_gen.add_node((CGenNode *)node); + } +} + + +void +TrexStatelessDpCore::add_cont_stream(TrexStatelessDpPerPort * lp_port, + TrexStream * stream, + TrexStreamsCompiledObj *comp) { + CGenNodeStateless *node = m_core->create_node_sl(); /* add periodic */ node->m_type = CGenNode::STATELESS_PKT; - node->m_time = m_core->m_cur_time_sec + 0.0 /* STREAM ISG */; + + node->m_ref_stream_info = stream->clone_as_dp(); + + node->m_next_stream=0; /* will be fixed later */ + + + if ( stream->m_self_start ){ + /* if self start it is in active mode */ + node->m_state =CGenNodeStateless::ss_ACTIVE; + lp_port->m_active_streams++; + }else{ + node->m_state =CGenNodeStateless::ss_INACTIVE; + } + + node->m_time = m_core->m_cur_time_sec + usec_to_sec(stream->m_isg_usec); + + pkt_dir_t dir = m_core->m_node_gen.m_v_if->port_id_to_dir(stream->m_port_id); node->m_flags = 0; /* set socket id */ node->set_socket_id(m_core->m_node_gen.m_socket_id); /* build a mbuf from a packet */ - uint16_t pkt_size = pkt_len; - const uint8_t *stream_pkt = pkt; + + uint16_t pkt_size = stream->m_pkt.len; + const uint8_t *stream_pkt = stream->m_pkt.binary; + + node->m_pause =0; + node->m_stream_type = stream->m_type; + node->m_next_time_offset = 1.0 / (stream->get_pps() * comp->get_multiplier()) ; + + + /* stateless specific fields */ + switch ( stream->m_type ) { + + case TrexStream::stCONTINUOUS : + node->m_single_burst=0; + node->m_single_burst_refill=0; + node->m_multi_bursts=0; + node->m_ibg_sec = 0.0; + break; - node->m_next_time_offset = 1.0 / pps; - node->m_is_stream_active = 1; + case TrexStream::stSINGLE_BURST : + node->m_stream_type = TrexStream::stMULTI_BURST; + node->m_single_burst = stream->m_burst_total_pkts; + node->m_single_burst_refill = stream->m_burst_total_pkts; + node->m_multi_bursts = 1; /* single burst in multi burst of 1 */ + node->m_ibg_sec = 0.0; + break; + + case TrexStream::stMULTI_BURST : + node->m_single_burst = stream->m_burst_total_pkts; + node->m_single_burst_refill = stream->m_burst_total_pkts; + node->m_multi_bursts = stream->m_num_bursts; + node->m_ibg_sec = usec_to_sec( stream->m_ibg_usec ); + break; + default: + + assert(0); + }; + + node->m_port_id = stream->m_port_id; /* allocate const mbuf */ rte_mbuf_t *m = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), pkt_size); @@ -110,7 +482,6 @@ TrexStatelessDpCore::add_cont_stream(double pps, const uint8_t *pkt, uint16_t pk memcpy(p,stream_pkt,pkt_size); /* set dir 0 or 1 client or server */ - pkt_dir_t dir = 0; node->set_mbuf_cache_dir(dir); /* TBD repace the mac if req we should add flag */ @@ -119,55 +490,130 @@ TrexStatelessDpCore::add_cont_stream(double pps, const uint8_t *pkt, uint16_t pk /* set the packet as a readonly */ node->set_cache_mbuf(m); - /* keep track */ - m_active_nodes.push_back(node); + CDpOneStream one_stream; - /* schedule */ - m_core->m_node_gen.add_node((CGenNode *)node); + one_stream.m_dp_stream = node->m_ref_stream_info; + one_stream.m_node =node; - m_state = TrexStatelessDpCore::STATE_TRANSMITTING; + lp_port->m_active_nodes.push_back(one_stream); + /* schedule only if active */ + if (node->m_state == CGenNodeStateless::ss_ACTIVE) { + m_core->m_node_gen.add_node((CGenNode *)node); + } } void -TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj) { +TrexStatelessDpCore::start_traffic(TrexStreamsCompiledObj *obj, + double duration, + int event_id) { + + + TrexStatelessDpPerPort * lp_port=get_port_db(obj->get_port_id()); + lp_port->m_active_streams = 0; + lp_port->set_event_id(event_id); + + /* no nodes in the list */ + assert(lp_port->m_active_nodes.size()==0); + + for (auto single_stream : obj->get_objects()) { + /* all commands should be for the same port */ + assert(obj->get_port_id() == single_stream.m_stream->m_port_id); + add_cont_stream(lp_port,single_stream.m_stream,obj); + } + + uint32_t nodes = lp_port->m_active_nodes.size(); + /* find next stream */ + assert(nodes == obj->get_objects().size()); + + int cnt=0; + + /* set the next_stream pointer */ for (auto single_stream : obj->get_objects()) { - add_cont_stream(single_stream.m_pps, single_stream.m_pkt, single_stream.m_pkt_len); + + if (single_stream.m_stream->is_dp_next_stream() ) { + int stream_id = single_stream.m_stream->m_next_stream_id; + assert(stream_id<nodes); + /* point to the next stream , stream_id is fixed */ + lp_port->m_active_nodes[cnt].m_node->m_next_stream = lp_port->m_active_nodes[stream_id].m_node ; + } + cnt++; + } + + lp_port->m_state =TrexStatelessDpPerPort::ppSTATE_TRANSMITTING; + m_state = TrexStatelessDpCore::STATE_TRANSMITTING; + + + if ( duration > 0.0 ){ + add_port_duration( duration ,obj->get_port_id(),event_id ); } + } -void -TrexStatelessDpCore::stop_traffic(uint8_t port_id) { - /* we cannot remove nodes not from the top of the queue so - for every active node - make sure next time - the scheduler invokes it, it will be free */ - for (auto node : m_active_nodes) { - if (node->m_port_id == port_id) { - node->m_is_stream_active = 0; + +bool TrexStatelessDpCore::are_all_ports_idle(){ + + bool res=true; + int i; + for (i=0; i<NUM_PORTS_PER_CORE; i++) { + if ( m_ports[i].m_state != TrexStatelessDpPerPort::ppSTATE_IDLE ){ + res=false; } } + return (res); +} - /* remove all the non active nodes */ - auto pred = std::remove_if(m_active_nodes.begin(), - m_active_nodes.end(), - [](CGenNodeStateless *node) { return (!node->m_is_stream_active); }); - m_active_nodes.erase(pred, m_active_nodes.end()); +void +TrexStatelessDpCore::resume_traffic(uint8_t port_id){ + + TrexStatelessDpPerPort * lp_port = get_port_db(port_id); + + lp_port->resume_traffic(port_id); +} - if (m_active_nodes.size() == 0) { - m_state = STATE_IDLE; - /* stop the scheduler */ - CGenNode *node = m_core->create_node() ; +void +TrexStatelessDpCore::pause_traffic(uint8_t port_id){ - node->m_type = CGenNode::EXIT_SCHED; + TrexStatelessDpPerPort * lp_port = get_port_db(port_id); - /* make sure it will be scheduled after the current node */ - node->m_time = m_core->m_node_gen.m_p_queue.top()->m_time; + lp_port->pause_traffic(port_id); +} - m_core->m_node_gen.add_node(node); + +void +TrexStatelessDpCore::stop_traffic(uint8_t port_id, + bool stop_on_id, + int event_id) { + /* we cannot remove nodes not from the top of the queue so + for every active node - make sure next time + the scheduler invokes it, it will be free */ + + TrexStatelessDpPerPort * lp_port = get_port_db(port_id); + + if ( lp_port->stop_traffic(port_id,stop_on_id,event_id) == false){ + /* nothing to do ! already stopped */ + //printf(" skip .. %f\n",m_core->m_cur_time_sec); + return; } - + +#if 0 + if ( are_all_ports_idle() ) { + /* just a place holder if we will need to do somthing in that case */ + } +#endif + + /* inform the control plane we stopped - this might be a async stop + (streams ended) + */ + CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(m_core->m_thread_id); + TrexStatelessDpToCpMsgBase *event_msg = new TrexDpPortEventMsg(m_core->m_thread_id, + port_id, + TrexDpPortEvent::EVENT_STOP, + lp_port->get_event_id()); + ring->Enqueue((CGenNode *)event_msg); + } /** diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h index 698cac2f..eda1ae59 100644 --- a/src/stateless/dp/trex_stateless_dp_core.h +++ b/src/stateless/dp/trex_stateless_dp_core.h @@ -1,5 +1,6 @@ /* Itay Marom + Hanoch Haim Cisco Systems, Inc. */ @@ -31,6 +32,74 @@ class TrexStatelessDpStart; class CFlowGenListPerThread; class CGenNodeStateless; class TrexStreamsCompiledObj; +class TrexStream; + + +class CDpOneStream { +public: + void Create(){ + } + + void Delete(CFlowGenListPerThread * core); + void DeleteOnlyStream(); + + CGenNodeStateless * m_node; // schedule node + TrexStream * m_dp_stream; // stream info +}; + +class TrexStatelessDpPerPort { + +public: + /* states */ + enum state_e { + ppSTATE_IDLE, + ppSTATE_TRANSMITTING, + ppSTATE_PAUSE + + }; + +public: + TrexStatelessDpPerPort(){ + } + + void create(CFlowGenListPerThread * core); + + bool pause_traffic(uint8_t port_id); + + bool resume_traffic(uint8_t port_id); + + bool stop_traffic(uint8_t port_id, + bool stop_on_id, + int event_id); + + bool update_number_of_active_streams(uint32_t d); + + state_e get_state() { + return m_state; + } + + void set_event_id(int event_id) { + m_event_id = event_id; + } + + int get_event_id() { + return m_event_id; + } + +public: + + state_e m_state; + uint8_t m_port_id; + + uint32_t m_active_streams; /* how many active streams on this port */ + + std::vector<CDpOneStream> m_active_nodes; /* holds the current active nodes */ + CFlowGenListPerThread * m_core ; + int m_event_id; +}; + +/* for now */ +#define NUM_PORTS_PER_CORE 2 class TrexStatelessDpCore { @@ -39,10 +108,24 @@ public: /* states */ enum state_e { STATE_IDLE, - STATE_TRANSMITTING + STATE_TRANSMITTING, + STATE_TERMINATE + }; - TrexStatelessDpCore(uint8_t thread_id, CFlowGenListPerThread *core); + TrexStatelessDpCore() { + m_thread_id = 0; + m_core = NULL; + m_duration = -1; + } + + /** + * "static constructor" + * + * @param thread_id + * @param core + */ + void create(uint8_t thread_id, CFlowGenListPerThread *core); /** * launch the stateless DP core code @@ -50,6 +133,10 @@ public: */ void start(); + + /* exit after batch of commands */ + void run_once(); + /** * dummy traffic creator * @@ -58,13 +145,30 @@ public: * @param pkt * @param pkt_len */ - void start_traffic(TrexStreamsCompiledObj *obj); + void start_traffic(TrexStreamsCompiledObj *obj, + double duration, + int m_event_id); + + + /* pause the streams, work only if all are continues */ + void pause_traffic(uint8_t port_id); + + + + void resume_traffic(uint8_t port_id); + /** + * * stop all traffic for this core * */ - void stop_traffic(uint8_t port_id); + void stop_traffic(uint8_t port_id,bool stop_on_id, int event_id); + + + /* return if all ports are idel */ + bool are_all_ports_idle(); + /** * check for and handle messages from CP @@ -92,7 +196,31 @@ public: } + /* quit the main loop, work in both stateless in stateful, don't free memory trigger from master */ + void quit_main_loop(); + + state_e get_state() { + return m_state; + } + + bool set_stateless_next_node(CGenNodeStateless * cur_node, + CGenNodeStateless * next_node); + + + TrexStatelessDpPerPort * get_port_db(uint8_t port_id){ + assert((m_local_port_offset==port_id) ||(m_local_port_offset+1==port_id)); + uint8_t local_port_id = port_id -m_local_port_offset; + assert(local_port_id<NUM_PORTS_PER_CORE); + return (&m_ports[local_port_id]); + } + + + private: + + void schedule_exit(); + + /** * in idle state loop, the processor most of the time sleeps * and periodically checks for messages @@ -115,18 +243,30 @@ private: */ void handle_cp_msg(TrexStatelessCpToDpMsgBase *msg); - void add_cont_stream(double pps, const uint8_t *pkt, uint16_t pkt_len); + + void add_port_duration(double duration, + uint8_t port_id, + int event_id); + + void add_global_duration(double duration); + + void add_cont_stream(TrexStatelessDpPerPort * lp_port, + TrexStream * stream, + TrexStreamsCompiledObj *comp); uint8_t m_thread_id; - state_e m_state; + uint8_t m_local_port_offset; + + state_e m_state; /* state of all ports */ CNodeRing *m_ring_from_cp; CNodeRing *m_ring_to_cp; - /* holds the current active nodes */ - std::vector<CGenNodeStateless *> m_active_nodes; + TrexStatelessDpPerPort m_ports[NUM_PORTS_PER_CORE]; /* pointer to the main object */ - CFlowGenListPerThread *m_core; + CFlowGenListPerThread * m_core; + + double m_duration; }; #endif /* __TREX_STATELESS_DP_CORE_H__ */ diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h index 92b428ab..ccf99eaa 100644 --- a/src/stateless/dp/trex_stream_node.h +++ b/src/stateless/dp/trex_stream_node.h @@ -1,5 +1,6 @@ /* Itay Marom + Hanoh Haim Cisco Systems, Inc. */ @@ -22,44 +23,201 @@ limitations under the License. #define __TREX_STREAM_NODE_H__ #include <bp_sim.h> +#include <stdio.h> class TrexStatelessDpCore; +#include <trex_stream.h> + +class TrexStatelessCpToDpMsgBase; +class CFlowGenListPerThread; + +struct CGenNodeCommand : public CGenNodeBase { + +friend class TrexStatelessDpCore; + +public: + TrexStatelessCpToDpMsgBase * m_cmd; + + uint8_t m_pad_end[104]; + +public: + void free_command(); + +} __rte_cache_aligned;; + + +static_assert(sizeof(CGenNodeCommand) == sizeof(CGenNode), "sizeof(CGenNodeCommand) != sizeof(CGenNode)" ); + /* this is a event for stateless */ struct CGenNodeStateless : public CGenNodeBase { friend class TrexStatelessDpCore; +public: + enum { + ss_FREE_RESUSE =1, /* should be free by scheduler */ + ss_INACTIVE =2, /* will be active by other stream or stopped */ + ss_ACTIVE =3 /* the stream is active */ + }; + typedef uint8_t stream_state_t ; + + static std::string get_stream_state_str(stream_state_t stream_state); + private: + /* cache line 0 */ + /* important stuff here */ void * m_cache_mbuf; - double m_next_time_offset; - uint8_t m_is_stream_active; + double m_next_time_offset; /* in sec */ + double m_ibg_sec; /* inter burst time in sec */ + + + stream_state_t m_state; uint8_t m_port_id; + uint8_t m_stream_type; /* see TrexStream::STREAM_TYPE ,stream_type_t */ + uint8_t m_pause; + + uint32_t m_single_burst; /* the number of bursts in case of burst */ + uint32_t m_single_burst_refill; + + uint32_t m_multi_bursts; /* in case of multi_burst how many bursts */ + + /* cache line 1 */ + TrexStream * m_ref_stream_info; /* the stream info */ + CGenNodeStateless * m_next_stream; /* pad to match the size of CGenNode */ - uint8_t m_pad_end[87]; + uint8_t m_pad_end[56]; + + public: - inline bool is_active() { - return m_is_stream_active; + uint8_t get_port_id(){ + return (m_port_id); + } + + + /* we restart the stream, schedule it using stream isg */ + inline void update_refresh_time(double cur_time){ + m_time = cur_time + usec_to_sec(m_ref_stream_info->m_isg_usec); + } + + inline bool is_mask_for_free(){ + return (get_state() == CGenNodeStateless::ss_FREE_RESUSE ?true:false); + + } + inline void mark_for_free(){ + set_state(CGenNodeStateless::ss_FREE_RESUSE); + /* only to be safe */ + m_ref_stream_info= NULL; + m_next_stream= NULL; + } + + bool is_pause(){ + return (m_pause==1?true:false); + } + + void set_pause(bool enable){ + if ( enable ){ + m_pause=1; + }else{ + m_pause=0; + } + } + + inline uint8_t get_stream_type(){ + return (m_stream_type); + } + + inline uint32_t get_single_burst_cnt(){ + return (m_single_burst); + } + + inline double get_multi_ibg_sec(){ + return (m_ibg_sec); + } + + inline uint32_t get_multi_burst_cnt(){ + return (m_multi_bursts); + } + + inline void set_state(stream_state_t new_state){ + m_state=new_state; + } + + + inline stream_state_t get_state() { + return m_state; + } + + void refresh(); + + inline void handle_continues(CFlowGenListPerThread *thread) { + + if (unlikely (is_pause()==false)) { + thread->m_node_gen.m_v_if->send_node( (CGenNode *)this); + } + + /* in case of continues */ + m_time += m_next_time_offset; + + /* insert a new event */ + thread->m_node_gen.m_p_queue.push( (CGenNode *)this); + } + + inline void handle_multi_burst(CFlowGenListPerThread *thread) { + thread->m_node_gen.m_v_if->send_node( (CGenNode *)this); + + m_single_burst--; + if (m_single_burst > 0 ) { + /* in case of continues */ + m_time += m_next_time_offset; + + thread->m_node_gen.m_p_queue.push( (CGenNode *)this); + }else{ + m_multi_bursts--; + if ( m_multi_bursts == 0 ) { + set_state(CGenNodeStateless::ss_INACTIVE); + if ( thread->set_stateless_next_node(this,m_next_stream) ){ + /* update the next stream time using isg */ + m_next_stream->update_refresh_time(m_time); + + thread->m_node_gen.m_p_queue.push( (CGenNode *)m_next_stream); + }else{ + // in case of zero we will schedule a command to stop + // will be called from set_stateless_next_node + } + + }else{ + m_time += m_ibg_sec; + m_single_burst = m_single_burst_refill; + thread->m_node_gen.m_p_queue.push( (CGenNode *)this); + } + } } /** * main function to handle an event of a packet tx * + * + * */ - inline void handle(CFlowGenListPerThread *thread) { - thread->m_node_gen.m_v_if->send_node( (CGenNode *)this); + inline void handle(CFlowGenListPerThread *thread) { - /* in case of continues */ - m_time += m_next_time_offset; + if (m_stream_type == TrexStream::stCONTINUOUS ) { + handle_continues(thread) ; + }else{ + if (m_stream_type == TrexStream::stMULTI_BURST) { + handle_multi_burst(thread); + }else{ + assert(0); + } + } - /* insert a new event */ - thread->m_node_gen.m_p_queue.push( (CGenNode *)this); } void set_socket_id(socket_id_t socket){ @@ -82,8 +240,6 @@ public: return ((pkt_dir_t)( m_flags &1)); } - - inline void set_cache_mbuf(rte_mbuf_t * m){ m_cache_mbuf=(void *)m; m_flags |= NODE_FLAGS_MBUF_CACHE; @@ -97,9 +253,22 @@ public: } } + void free_stl_node(); + +public: + /* debug functions */ + + int get_stream_id(); + + static void DumpHeader(FILE *fd); + + void Dump(FILE *fd); } __rte_cache_aligned; -static_assert(sizeof(CGenNodeStateless) == sizeof(CGenNode), "sizeof(CGenNodeStateless) != sizeof(CGenNode)"); +static_assert(sizeof(CGenNodeStateless) == sizeof(CGenNode), "sizeof(CGenNodeStateless) != sizeof(CGenNode)" ); + + + #endif /* __TREX_STREAM_NODE_H__ */ diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index 3e754649..ec8b7839 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -1,5 +1,6 @@ /* Itay Marom + Hanoch Haim Cisco Systems, Inc. */ @@ -21,12 +22,34 @@ limitations under the License. #include <trex_stateless_messaging.h> #include <trex_stateless_dp_core.h> #include <trex_streams_compiler.h> +#include <trex_stateless.h> +#include <bp_sim.h> + #include <string.h> /************************* start traffic message ************************/ -TrexStatelessDpStart::TrexStatelessDpStart(TrexStreamsCompiledObj *obj) : m_obj(obj) { +TrexStatelessDpStart::TrexStatelessDpStart(uint8_t port_id, int event_id, TrexStreamsCompiledObj *obj, double duration) { + m_port_id = port_id; + m_event_id = event_id; + m_obj = obj; + m_duration = duration; +} + + +/** + * clone for DP start message + * + */ +TrexStatelessCpToDpMsgBase * +TrexStatelessDpStart::clone() { + + TrexStreamsCompiledObj *new_obj = m_obj->clone(); + + TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpStart(m_port_id, m_event_id, new_obj, m_duration); + + return new_msg; } TrexStatelessDpStart::~TrexStatelessDpStart() { @@ -38,7 +61,9 @@ TrexStatelessDpStart::~TrexStatelessDpStart() { bool TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) { - dp_core->start_traffic(m_obj); + /* staet traffic */ + dp_core->start_traffic(m_obj, m_duration,m_event_id); + return true; } @@ -47,7 +72,104 @@ TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) { ************************/ bool TrexStatelessDpStop::handle(TrexStatelessDpCore *dp_core) { - dp_core->stop_traffic(m_port_id); + + + dp_core->stop_traffic(m_port_id,m_stop_only_for_event_id,m_event_id); return true; } + +void TrexStatelessDpStop::on_node_remove(){ + if ( m_core ) { + assert(m_core->m_non_active_nodes>0); + m_core->m_non_active_nodes--; + } +} + + +TrexStatelessCpToDpMsgBase * TrexStatelessDpPause::clone(){ + + TrexStatelessDpPause *new_msg = new TrexStatelessDpPause(m_port_id); + return new_msg; +} + + +bool TrexStatelessDpPause::handle(TrexStatelessDpCore *dp_core){ + dp_core->pause_traffic(m_port_id); + return (true); +} + + + +TrexStatelessCpToDpMsgBase * TrexStatelessDpResume::clone(){ + TrexStatelessDpResume *new_msg = new TrexStatelessDpResume(m_port_id); + return new_msg; +} + +bool TrexStatelessDpResume::handle(TrexStatelessDpCore *dp_core){ + dp_core->resume_traffic(m_port_id); + return (true); +} + + +/** + * clone for DP stop message + * + */ +TrexStatelessCpToDpMsgBase * +TrexStatelessDpStop::clone() { + TrexStatelessDpStop *new_msg = new TrexStatelessDpStop(m_port_id); + + new_msg->set_event_id(m_event_id); + new_msg->set_wait_for_event_id(m_stop_only_for_event_id); + /* set back pointer to master */ + new_msg->set_core_ptr(m_core); + + return new_msg; +} + + + +TrexStatelessCpToDpMsgBase * +TrexStatelessDpQuit::clone(){ + + TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpQuit(); + + return new_msg; +} + + +bool TrexStatelessDpQuit::handle(TrexStatelessDpCore *dp_core){ + + /* quit */ + dp_core->quit_main_loop(); + return (true); +} + +bool TrexStatelessDpCanQuit::handle(TrexStatelessDpCore *dp_core){ + + if ( dp_core->are_all_ports_idle() ){ + /* if all ports are idle quit now */ + set_quit(true); + } + return (true); +} + +TrexStatelessCpToDpMsgBase * +TrexStatelessDpCanQuit::clone(){ + + TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpCanQuit(); + + return new_msg; +} + + +/************************* messages from DP to CP **********************/ +bool +TrexDpPortEventMsg::handle() { + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(m_port_id); + port->get_dp_events().handle_event(m_event_type, m_thread_id, m_event_id); + + return (true); +} + diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index 381e146d..6bd0dbe3 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -1,5 +1,6 @@ /* Itay Marom + Hanoch Haim Cisco Systems, Inc. */ @@ -22,9 +23,11 @@ limitations under the License. #define __TREX_STATELESS_MESSAGING_H__ #include <msg_manager.h> +#include <trex_dp_port_events.h> class TrexStatelessDpCore; class TrexStreamsCompiledObj; +class CFlowGenListPerThread; /** * defines the base class for CP to DP messages @@ -35,16 +38,40 @@ class TrexStatelessCpToDpMsgBase { public: TrexStatelessCpToDpMsgBase() { + m_quit_scheduler=false; } virtual ~TrexStatelessCpToDpMsgBase() { } + + virtual bool handle(TrexStatelessDpCore *dp_core) = 0; + /** - * virtual function to handle a message + * clone the current message * */ - virtual bool handle(TrexStatelessDpCore *dp_core) = 0; + virtual TrexStatelessCpToDpMsgBase * clone() = 0; + + /* do we want to quit scheduler, can be set by handle function */ + void set_quit(bool enable){ + m_quit_scheduler=enable; + } + + bool is_quit(){ + return ( m_quit_scheduler); + } + + /* this node is called from scheduler in case the node is free */ + virtual void on_node_remove(){ + } + + /* no copy constructor */ + TrexStatelessCpToDpMsgBase(TrexStatelessCpToDpMsgBase &) = delete; + +protected: + int m_event_id; + bool m_quit_scheduler; }; /** @@ -55,16 +82,59 @@ public: class TrexStatelessDpStart : public TrexStatelessCpToDpMsgBase { public: - TrexStatelessDpStart(TrexStreamsCompiledObj *obj); + TrexStatelessDpStart(uint8_t m_port_id, int m_event_id, TrexStreamsCompiledObj *obj, double duration); ~TrexStatelessDpStart(); + virtual TrexStatelessCpToDpMsgBase * clone(); + virtual bool handle(TrexStatelessDpCore *dp_core); private: + + uint8_t m_port_id; + int m_event_id; TrexStreamsCompiledObj *m_obj; + double m_duration; + }; +class TrexStatelessDpPause : public TrexStatelessCpToDpMsgBase { +public: + + TrexStatelessDpPause(uint8_t port_id) : m_port_id(port_id) { + } + + + virtual TrexStatelessCpToDpMsgBase * clone(); + + + virtual bool handle(TrexStatelessDpCore *dp_core); + + +private: + uint8_t m_port_id; +}; + + +class TrexStatelessDpResume : public TrexStatelessCpToDpMsgBase { +public: + + TrexStatelessDpResume(uint8_t port_id) : m_port_id(port_id) { + } + + + virtual TrexStatelessCpToDpMsgBase * clone(); + + + virtual bool handle(TrexStatelessDpCore *dp_core); + + +private: + uint8_t m_port_id; +}; + + /** * a message to stop traffic * @@ -74,13 +144,156 @@ class TrexStatelessDpStop : public TrexStatelessCpToDpMsgBase { public: TrexStatelessDpStop(uint8_t port_id) : m_port_id(port_id) { + m_stop_only_for_event_id=false; + m_event_id=0; + m_core = NULL; } + virtual TrexStatelessCpToDpMsgBase * clone(); + + virtual bool handle(TrexStatelessDpCore *dp_core); + void set_core_ptr(CFlowGenListPerThread * core){ + m_core = core; + } + + CFlowGenListPerThread * get_core_ptr(){ + return ( m_core); + } + + + void set_event_id(int event_id){ + m_event_id = event_id; + } + + void set_wait_for_event_id(bool wait){ + m_stop_only_for_event_id = wait; + } + + virtual void on_node_remove(); + + + bool get_is_stop_by_event_id(){ + return (m_stop_only_for_event_id); + } + + int get_event_id(){ + return (m_event_id); + } + private: uint8_t m_port_id; + bool m_stop_only_for_event_id; + int m_event_id; + CFlowGenListPerThread * m_core ; + +}; + +/** + * a message to Quit the datapath traffic. support only stateless for now + * + * @author hhaim + */ +class TrexStatelessDpQuit : public TrexStatelessCpToDpMsgBase { +public: + + TrexStatelessDpQuit() { + } + + + virtual TrexStatelessCpToDpMsgBase * clone(); + + virtual bool handle(TrexStatelessDpCore *dp_core); + }; +/** + * a message to check if both port are idel and exit + * + * @author hhaim + */ +class TrexStatelessDpCanQuit : public TrexStatelessCpToDpMsgBase { +public: + + TrexStatelessDpCanQuit() { + } + + virtual bool handle(TrexStatelessDpCore *dp_core); + + virtual TrexStatelessCpToDpMsgBase * clone(); +}; + + + +/************************* messages from DP to CP **********************/ + +/** + * defines the base class for CP to DP messages + * + * @author imarom (27-Oct-15) + */ +class TrexStatelessDpToCpMsgBase { +public: + + TrexStatelessDpToCpMsgBase() { + } + + virtual ~TrexStatelessDpToCpMsgBase() { + } + + /** + * virtual function to handle a message + * + */ + virtual bool handle() = 0; + + /* no copy constructor */ + TrexStatelessDpToCpMsgBase(TrexStatelessDpToCpMsgBase &) = delete; + +}; + + +/** + * a message indicating an event has happened on a port at the + * DP + * + */ +class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase { +public: + + TrexDpPortEventMsg(int thread_id, uint8_t port_id, TrexDpPortEvent::event_e type, int event_id) { + m_thread_id = thread_id; + m_port_id = port_id; + m_event_type = type; + m_event_id = event_id; + } + + virtual bool handle(); + + int get_thread_id() { + return m_thread_id; + } + + uint8_t get_port_id() { + return m_port_id; + } + + TrexDpPortEvent::event_e get_event_type() { + return m_event_type; + } + + int get_event_id() { + return m_event_id; + } + +private: + int m_thread_id; + uint8_t m_port_id; + TrexDpPortEvent::event_e m_event_type; + int m_event_id; + +}; #endif /* __TREX_STATELESS_MESSAGING_H__ */ + diff --git a/src/stub/trex_stateless_stub.cpp b/src/stub/trex_stateless_stub.cpp index de56e57a..199356d8 100644 --- a/src/stub/trex_stateless_stub.cpp +++ b/src/stub/trex_stateless_stub.cpp @@ -4,7 +4,8 @@ class CFlowGenListPerThread; class TrexStatelessCpToDpMsgBase; -TrexStatelessDpCore::TrexStatelessDpCore(unsigned char, CFlowGenListPerThread*) { +void +TrexStatelessDpCore::create(unsigned char, CFlowGenListPerThread*) { m_thread_id = 0; m_core = NULL; diff --git a/src/time_histogram.cpp b/src/time_histogram.cpp index f1b47e59..96796bfc 100755 --- a/src/time_histogram.cpp +++ b/src/time_histogram.cpp @@ -182,10 +182,10 @@ void CTimeHistogram::DumpWinMax(FILE *fd){ } void CTimeHistogram::Dump(FILE *fd){ - fprintf (fd," min_delta : %lu usec \n",get_usec(m_min_delta)); + fprintf (fd," min_delta : %lu usec \n", (ulong)get_usec(m_min_delta)); fprintf (fd," cnt : %lu \n",m_cnt); fprintf (fd," high_cnt : %lu \n",m_high_cnt); - fprintf (fd," max_d_time : %lu usec\n",get_usec(m_max_dt)); + fprintf (fd," max_d_time : %lu usec\n", (ulong)get_usec(m_max_dt)); //fprintf (fd," average : %.0f usec\n", get_total_average()); fprintf (fd," sliding_average : %.0f usec\n", get_average_latency()); fprintf (fd," precent : %.1f %%\n",(100.0*(double)m_high_cnt/(double)m_cnt)); @@ -198,7 +198,7 @@ void CTimeHistogram::Dump(FILE *fd){ for (j=0; j<HISTOGRAM_SIZE_LOG; j++) { for (i=0; i<HISTOGRAM_SIZE; i++) { if (m_hcnt[j][i] >0 ) { - fprintf (fd," h[%lu] : %lu \n",(base*(i+1)),m_hcnt[j][i]); + fprintf (fd," h[%u] : %llu \n",(base*(i+1)),(unsigned long long)m_hcnt[j][i]); } } base=base*10; diff --git a/src/tuple_gen.h b/src/tuple_gen.h index 29adbd69..d34e27bc 100755 --- a/src/tuple_gen.h +++ b/src/tuple_gen.h @@ -553,6 +553,9 @@ public: class CServerPoolBase { public: + + virtual ~CServerPoolBase() {} + virtual void GenerateTuple(CTupleBase& tuple) = 0; virtual uint16_t GenerateOnePort(uint32_t idx) = 0; virtual void Delete() = 0; diff --git a/src/utl_json.cpp b/src/utl_json.cpp index 990346f5..fb55be0a 100755 --- a/src/utl_json.cpp +++ b/src/utl_json.cpp @@ -25,7 +25,7 @@ limitations under the License. std::string add_json(std::string name, uint32_t counter,bool last){ char buff[200]; - sprintf(buff,"\"%s\":%lu",name.c_str(),counter); + sprintf(buff,"\"%s\":%lu",name.c_str(), (ulong)counter); std::string s= std::string(buff); if (!last) { s+=","; @@ -35,7 +35,7 @@ std::string add_json(std::string name, uint32_t counter,bool last){ std::string add_json(std::string name, uint64_t counter,bool last){ char buff[200]; - sprintf(buff,"\"%s\":%llu",name.c_str(),counter); + sprintf(buff,"\"%s\":%llu",name.c_str(), (unsigned long long)counter); std::string s= std::string(buff); if (!last) { s+=","; diff --git a/src/utl_yaml.cpp b/src/utl_yaml.cpp index 5f3ca735..828817e4 100755 --- a/src/utl_yaml.cpp +++ b/src/utl_yaml.cpp @@ -104,6 +104,8 @@ bool utl_yaml_read_uint16(const YAML::Node& node, val = (uint16_t)val_tmp; res=true; } + + return (res); } bool utl_yaml_read_bool(const YAML::Node& node, |