diff options
Diffstat (limited to 'src')
-rwxr-xr-x | src/bp_sim.cpp | 36 | ||||
-rwxr-xr-x | src/bp_sim.h | 11 | ||||
-rw-r--r-- | src/internal_api/trex_platform_api.h | 2 | ||||
-rwxr-xr-x | src/main.cpp | 739 | ||||
-rwxr-xr-x | src/main_dpdk.cpp | 146 | ||||
-rw-r--r-- | src/mock/trex_platform_api_mock.cpp | 4 | ||||
-rw-r--r-- | src/mock/trex_rpc_server_mock.cpp | 4 | ||||
-rw-r--r-- | src/publisher/trex_publisher.h | 10 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_async_server.cpp | 4 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_async_server.h | 1 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_cmd.cpp | 6 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_cmd_api.h | 12 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_req_resp_server.cpp | 56 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_req_resp_server.h | 26 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_server.cpp | 23 | ||||
-rw-r--r-- | src/rpc-server/trex_rpc_server_api.h | 25 | ||||
-rw-r--r-- | src/sim/trex_sim.h | 148 | ||||
-rw-r--r-- | src/sim/trex_sim_stateful.cpp | 600 | ||||
-rw-r--r-- | src/sim/trex_sim_stateless.cpp | 346 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless.h | 6 | ||||
-rw-r--r-- | src/stateless/dp/trex_stateless_dp_core.cpp | 7 | ||||
-rw-r--r-- | src/stateless/dp/trex_stateless_dp_core.h | 6 |
22 files changed, 1501 insertions, 717 deletions
diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp index 63a3c144..fcef049c 100755 --- a/src/bp_sim.cpp +++ b/src/bp_sim.cpp @@ -3164,7 +3164,8 @@ int CNodeGenerator::open_file(std::string file_name, /* ser preview mode */ m_v_if->set_review_mode(preview_mode); m_v_if->open_file(file_name); - m_cnt = 1; + m_cnt = 0; + m_limit = 0; return (0); } @@ -3177,11 +3178,13 @@ int CNodeGenerator::close_file(CFlowGenListPerThread * thread){ } int CNodeGenerator::update_stl_stats(CGenNodeStateless *node_sl){ + m_cnt++; + if ( m_preview_mode.getVMode() >2 ){ fprintf(stdout," %4lu ,", (ulong)m_cnt); node_sl->Dump(stdout); - m_cnt++; } + return (0); } @@ -3195,6 +3198,10 @@ int CNodeGenerator::update_stats(CGenNode * node){ return (0); } +bool CNodeGenerator::has_limit_reached() { + /* do we have a limit and has it passed ? */ + return ( (m_limit > 0) && (m_cnt >= m_limit) ); +} bool CFlowGenListPerThread::Create(uint32_t thread_id, uint32_t core_id, @@ -3217,7 +3224,9 @@ bool CFlowGenListPerThread::Create(uint32_t thread_id, char name[100]; sprintf(name,"nodes-%d",m_core_id); - printf(" create thread %d %s socket: %d \n",m_core_id,name,socket_id); + + //printf(" create thread %d %s socket: %d \n",m_core_id,name,socket_id); + m_node_pool = utl_rte_mempool_create_non_pkt(name, CGlobalInfo::m_memory_cfg.get_each_core_dp_flows(), sizeof(CGenNode), @@ -3225,7 +3234,8 @@ bool CFlowGenListPerThread::Create(uint32_t thread_id, 0 , socket_id); - printf(" pool %p \n",m_node_pool); + //printf(" pool %p \n",m_node_pool); + m_node_gen.Create(this); m_flow_id_to_node_lookup.Create(); @@ -3537,16 +3547,21 @@ int CNodeGenerator::flush_file(dsec_t max_time, m_p_queue.pop(); CGenNodeStateless *node_sl = (CGenNodeStateless *)node; - #ifdef _DEBUG - update_stl_stats(node_sl); - #endif - /* if the stream has been deactivated - end */ if ( unlikely( node_sl->is_mask_for_free() ) ) { thread->free_node(node); } else { node_sl->handle(thread); + + #ifdef _DEBUG + update_stl_stats(node_sl); + if (has_limit_reached()) { + thread->m_stateless_dp_info.stop_traffic(node_sl->get_port_id(), false, 0); + } + #endif + } + }else{ if ( likely( type == CGenNode::FLOW_PKT ) ) { @@ -3987,9 +4002,11 @@ void CFlowGenListPerThread::check_msgs(void) { void CFlowGenListPerThread::start_stateless_simulation_file(std::string erf_file_name, - CPreviewMode &preview){ + CPreviewMode &preview, + uint64_t limit){ m_preview_mode = preview; m_node_gen.open_file(erf_file_name,&m_preview_mode); + m_node_gen.set_packet_limit(limit); } void CFlowGenListPerThread::stop_stateless_simulation_file(){ @@ -4000,7 +4017,6 @@ void CFlowGenListPerThread::start_stateless_daemon_simulation(){ m_cur_time_sec = 0; m_stateless_dp_info.run_once(); - } diff --git a/src/bp_sim.h b/src/bp_sim.h index 471d7639..4b906912 100755 --- a/src/bp_sim.h +++ b/src/bp_sim.h @@ -1932,6 +1932,12 @@ public: add_node(node); } + /** + * set packet limit for the generator + */ + void set_packet_limit(uint64_t limit) { + m_limit = limit; + } void DumpHist(FILE *fd){ fprintf(fd,"\n"); @@ -1950,7 +1956,7 @@ private: } int update_stats(CGenNode * node); int update_stl_stats(CGenNodeStateless *node_sl); - + bool has_limit_reached(); FORCE_NO_INLINE bool handle_slow_messages(uint8_t type, CGenNode * node, @@ -1966,6 +1972,7 @@ public: CFlowGenListPerThread * m_parent; CPreviewMode m_preview_mode; uint64_t m_cnt; + uint64_t m_limit; CTimeHistogram m_realtime_his; }; @@ -3484,7 +3491,7 @@ public: void start_stateless_daemon_simulation(); /* open a file for simulation */ - void start_stateless_simulation_file(std::string erf_file_name,CPreviewMode &preview); + void start_stateless_simulation_file(std::string erf_file_name,CPreviewMode &preview, uint64_t limit = 0); /* close a file for simulation */ void stop_stateless_simulation_file(); diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h index 343b8004..3ae49da8 100644 --- a/src/internal_api/trex_platform_api.h +++ b/src/internal_api/trex_platform_api.h @@ -138,7 +138,7 @@ public: */ class TrexMockPlatformApi : public TrexPlatformApi { public: - void port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list) const {} + void port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list) const; void get_global_stats(TrexPlatformGlobalStats &stats) const; void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const; void get_interface_info(uint8_t interface_id, std::string &driver_name, driver_speed_e &speed) const { diff --git a/src/main.cpp b/src/main.cpp index 1b219a8c..ea8e1e44 100755 --- a/src/main.cpp +++ b/src/main.cpp @@ -23,16 +23,35 @@ limitations under the License. #include "bp_sim.h" #include "os_time.h" +#include <unordered_map> +#include <string> #include <common/arg/SimpleGlob.h> #include <common/arg/SimpleOpt.h> #include <stateless/cp/trex_stateless.h> +#include <sim/trex_sim.h> +using namespace std; // An enum for all the option types enum { OPT_HELP, OPT_CFG, OPT_NODE_DUMP, OP_STATS, - OPT_FILE_OUT, OPT_UT, OPT_PCAP, OPT_IPV6, OPT_MAC_FILE}; - + OPT_FILE_OUT, OPT_UT, OPT_PCAP, OPT_IPV6, OPT_MAC_FILE, + OPT_SL, OPT_DP_CORE_COUNT, OPT_DP_CORE_INDEX, OPT_LIMIT}; + + + +/** + * type of run + * GTEST + * Stateful + * Stateless + */ +typedef enum { + OPT_TYPE_GTEST = 7, + OPT_TYPE_SF, + OPT_TYPE_SL +} opt_type_e; + /* these are the argument types: SO_NONE -- no argument needed @@ -41,24 +60,30 @@ enum { OPT_HELP, OPT_CFG, OPT_NODE_DUMP, OP_STATS, */ static CSimpleOpt::SOption parser_options[] = { - { OPT_HELP, "-?", SO_NONE }, - { OPT_HELP, "-h", SO_NONE }, - { OPT_HELP, "--help", SO_NONE }, - { OPT_UT, "--ut", SO_NONE }, - { OP_STATS, "-s", SO_NONE }, - { OPT_CFG, "-f", SO_REQ_SEP}, - { OPT_MAC_FILE, "--mac", SO_REQ_SEP}, - { OPT_FILE_OUT , "-o", SO_REQ_SEP }, - { OPT_NODE_DUMP , "-v", SO_REQ_SEP }, - { OPT_PCAP, "--pcap", SO_NONE }, - { OPT_IPV6, "--ipv6", SO_NONE }, + { OPT_HELP, "-?", SO_NONE }, + { OPT_HELP, "-h", SO_NONE }, + { OPT_HELP, "--help", SO_NONE }, + { OPT_UT, "--ut", SO_NONE }, + { OP_STATS, "-s", SO_NONE }, + { OPT_CFG, "-f", SO_REQ_SEP }, + { OPT_MAC_FILE, "--mac", SO_REQ_SEP }, + { OPT_FILE_OUT , "-o", SO_REQ_SEP }, + { OPT_NODE_DUMP , "-v", SO_REQ_SEP }, + { OPT_PCAP, "--pcap", SO_NONE }, + { OPT_IPV6, "--ipv6", SO_NONE }, + { OPT_SL, "--sl", SO_NONE }, + { OPT_DP_CORE_COUNT, "--cores", SO_REQ_SEP }, + { OPT_DP_CORE_INDEX, "--core_index", SO_REQ_SEP }, + { OPT_LIMIT, "--limit", SO_REQ_SEP }, SO_END_OF_OPTIONS }; - +static bool in_range(int x, int low, int high) { + return ( (x >= low) && (x <= high) ); +} static int usage(){ @@ -94,9 +119,12 @@ static int usage(){ return (0); } -int gtest_main(int argc, char **argv) ; -static int parse_options(int argc, char *argv[], CParserOption* po, bool & is_gtest ) { +static int parse_options(int argc, + char *argv[], + CParserOption* po, + std::unordered_map<std::string, int> ¶ms) { + CSimpleOpt args(argc, argv, parser_options); int a=0; @@ -104,36 +132,63 @@ static int parse_options(int argc, char *argv[], CParserOption* po, bool & is_gt po->preview.clean(); po->preview.setFileWrite(true); + /* by default - type is stateful */ + params["type"] = OPT_TYPE_SF; + while ( args.Next() ){ if (args.LastError() == SO_SUCCESS) { switch (args.OptionId()) { case OPT_UT : - is_gtest=true; + params["type"] = OPT_TYPE_GTEST; return (0); break; + case OPT_HELP: usage(); return -1; + + case OPT_SL: + params["type"] = OPT_TYPE_SL; + break; + case OPT_CFG: po->cfg_file = args.OptionArg(); break; + case OPT_MAC_FILE: po->mac_file = args.OptionArg(); break; + case OPT_FILE_OUT: po->out_file = args.OptionArg(); break; + case OPT_IPV6: po->preview.set_ipv6_mode_enable(true); break; + case OPT_NODE_DUMP: a=atoi(args.OptionArg()); node_dump=1; po->preview.setFileWrite(false); break; + case OPT_PCAP: po->preview.set_pcap_mode_enable(true); break; + + case OPT_DP_CORE_COUNT: + params["dp_core_count"] = atoi(args.OptionArg()); + break; + + case OPT_DP_CORE_INDEX: + params["dp_core_index"] = atoi(args.OptionArg()); + break; + + case OPT_LIMIT: + params["limit"] = atoi(args.OptionArg()); + break; + default: usage(); return -1; @@ -162,640 +217,74 @@ static int parse_options(int argc, char *argv[], CParserOption* po, bool & is_gt return -1; } } - return 0; -} - -int cores=1; - -/* - -int curent_time(){ - - time_init(); - - int i; - for (i=0; i<100000000; i++){ - now=now_sec(); - } - return (0); -}*/ - -#ifdef LINUX - - - -#include <pthread.h> - -struct per_thread_t { - pthread_t tid; -}; - -#define MAX_THREADS 200 -static per_thread_t tr_info[MAX_THREADS]; - - -////////////// - -struct test_t_info1 { - CPreviewMode * preview_info; - CFlowGenListPerThread * thread_info; - uint32_t thread_id; -}; - -void * thread_task(void *info){ - - test_t_info1 * obj =(test_t_info1 *)info; - - CFlowGenListPerThread * lpt=obj->thread_info; - - printf("start thread %d \n",obj->thread_id); - //delay(obj->thread_id *3000); - printf("-->start thread %d \n",obj->thread_id); - if (1/*obj->thread_id ==3*/) { - - char buf[100]; - sprintf(buf,"my%d.erf",obj->thread_id); - lpt->start_generate_stateful(buf,*obj->preview_info); - lpt->m_node_gen.DumpHist(stdout); - printf("end thread %d \n",obj->thread_id); - } - - return (NULL); -} - - -void test_load_list_of_cap_files_linux(CParserOption * op){ - - CFlowGenList fl; - //CNullIF erf_vif; - //CErfIF erf_vif; - - fl.Create(); - - fl.load_from_yaml(op->cfg_file,cores); - fl.DumpPktSize(); - - - fl.generate_p_thread_info(cores); - CFlowGenListPerThread * lpt; - - /* set the ERF file */ - //fl.set_vif_all(&erf_vif); - - int i; - for (i=0; i<cores; i++) { - lpt=fl.m_threads_info[i]; - test_t_info1 * obj = new test_t_info1(); - obj->preview_info =&op->preview; - obj->thread_info = fl.m_threads_info[i]; - obj->thread_id = i; - CNullIF * erf_vif = new CNullIF(); - //CErfIF * erf_vif = new CErfIF(); - - lpt->set_vif(erf_vif); - - assert(pthread_create( &tr_info[i].tid, NULL, thread_task, obj)==0); - } - for (i=0; i<cores; i++) { - /* wait for all of them to stop */ - assert(pthread_join((pthread_t)tr_info[i].tid,NULL )==0); - } + /* did the user configure dp core count or dp core index ? */ - printf("compare files \n"); - for (i=1; i<cores; i++) { - - CErfCmp cmp; - char buf[100]; - sprintf(buf,"my%d.erf",i); - char buf1[100]; - sprintf(buf1,"my%d.erf",0); - if ( cmp.compare(std::string(buf),std::string(buf1)) != true ) { - printf(" ERROR cap file is not ex !! \n"); - assert(0); + if (params.count("dp_core_count") > 0) { + if (!in_range(params["dp_core_count"], 1, 8)) { + printf("dp core count must be a value between 1 and 8\n"); + return (-1); } - printf(" thread %d is ok \n",i); } - fl.Delete(); -} - - -#endif - -/*************************************************************/ -void test_load_list_of_cap_files(CParserOption * op){ - - CFlowGenList fl; - CNullIF erf_vif; - - fl.Create(); - - #define NUM 1 - - fl.load_from_yaml(op->cfg_file,NUM); - fl.DumpPktSize(); - - - fl.generate_p_thread_info(NUM); - CFlowGenListPerThread * lpt; - - /* set the ERF file */ - //fl.set_vif_all(&erf_vif); - - int i; - for (i=0; i<NUM; i++) { - lpt=fl.m_threads_info[i]; - char buf[100]; - sprintf(buf,"my%d.erf",i); - lpt->start_generate_stateful(buf,op->preview); - lpt->m_node_gen.DumpHist(stdout); - } - //sprintf(buf,"my%d.erf",7); - //lpt=fl.m_threads_info[7]; - - //fl.Dump(stdout); - fl.Delete(); -} - -int load_list_of_cap_files(CParserOption * op){ - CFlowGenList fl; - fl.Create(); - fl.load_from_yaml(op->cfg_file,1); - if ( op->preview.getVMode() >0 ) { - fl.DumpCsv(stdout); - } - uint32_t start= os_get_time_msec(); - - CErfIF erf_vif; - //CNullIF erf_vif; - - fl.generate_p_thread_info(1); - CFlowGenListPerThread * lpt; - lpt=fl.m_threads_info[0]; - lpt->set_vif(&erf_vif); - - if ( (op->preview.getVMode() >1) || op->preview.getFileWrite() ) { - lpt->start_generate_stateful(op->out_file,op->preview); + if (params.count("dp_core_index") > 0) { + if (!in_range(params["dp_core_index"], 0, params["dp_core_count"] - 1)) { + printf("dp core index must be a value between 0 and cores - 1\n"); + return (-1); + } } - lpt->m_node_gen.DumpHist(stdout); - - uint32_t stop= os_get_time_msec(); - printf(" d time = %ul %ul \n",stop-start,os_get_time_freq()); - fl.Delete(); - return (0); -} - - -int test_dns(){ - - time_init(); - CGlobalInfo::init_pools(1000); - - CParserOption po ; - - //po.cfg_file = "cap2/dns.yaml"; - //po.cfg_file = "cap2/sfr3.yaml"; - po.cfg_file = "cap2/sfr.yaml"; - - po.preview.setVMode(0); - po.preview.setFileWrite(true); - #ifdef LINUX - test_load_list_of_cap_files_linux(&po); - #else - test_load_list_of_cap_files(&po); - #endif - return (0); + return 0; } -void test_pkt_mbuf(void); - -void test_compare_files(void); -#if 0 -static int b=0; -static int c=0; -static int d=0; - -int test_instructions(){ - int i; - for (i=0; i<100000;i++) { - b+=b+1; - c+=+b+c+1; - d+=+(b*2+1); - } - return (b+c+d); -} - -#include <valgrind/callgrind.h> -#endif +int main(int argc , char * argv[]){ + std::unordered_map<std::string, int> params; -void update_tcp_seq_num(CCapFileFlowInfo * obj, - int pkt_id, - int size_change){ - CFlowPktInfo * pkt=obj->GetPacket(pkt_id); - if ( pkt->m_pkt_indication.m_desc.IsUdp() ){ - /* nothing to do */ - return; + if ( parse_options(argc, argv, &CGlobalInfo::m_options , params) != 0) { + exit(-1); } - bool o_init=pkt->m_pkt_indication.m_desc.IsInitSide(); - TCPHeader * tcp ; - int s= (int)obj->Size(); - int i; - - for (i=pkt_id+1; i<s; i++) { + opt_type_e type = (opt_type_e) params["type"]; - pkt=obj->GetPacket(i); - tcp=pkt->m_pkt_indication.l4.m_tcp; - bool init=pkt->m_pkt_indication.m_desc.IsInitSide(); - if (init == o_init) { - /* same dir update the seq number */ - tcp->setSeqNumber (tcp->getSeqNumber ()+size_change); - - }else{ - /* update the ack number */ - tcp->setAckNumber (tcp->getAckNumber ()+size_change); + switch (type) { + case OPT_TYPE_GTEST: + { + SimGtest test; + return test.run(argc, argv); } - } -} - - - -void change_pkt_len(CCapFileFlowInfo * obj,int pkt_id, int size ){ - CFlowPktInfo * pkt=obj->GetPacket(pkt_id); - - /* enlarge the packet size by 9 */ - - char * p=pkt->m_packet->append(size); - /* set it to 0xaa*/ - memmove(p+size-4,p-4,4); /* CRCbytes */ - memset(p-4,0x0a,size); - - /* refresh the pointers */ - pkt->m_pkt_indication.RefreshPointers(); - - IPHeader * ipv4 = pkt->m_pkt_indication.l3.m_ipv4; - ipv4->updateTotalLength (ipv4->getTotalLength()+size ); - - /* update seq numbers if needed */ - update_tcp_seq_num(obj,pkt_id,size); -} -void dump_tcp_seq_num_(CCapFileFlowInfo * obj){ - int s= (int)obj->Size(); - int i; - uint32_t i_seq; - uint32_t r_seq; - - CFlowPktInfo * pkt=obj->GetPacket(0); - TCPHeader * tcp = pkt->m_pkt_indication.l4.m_tcp; - i_seq=tcp->getSeqNumber (); - - pkt=obj->GetPacket(1); - tcp = pkt->m_pkt_indication.l4.m_tcp; - r_seq=tcp->getSeqNumber (); - - for (i=2; i<s; i++) { - uint32_t seq; - uint32_t ack; - - pkt=obj->GetPacket(i); - tcp=pkt->m_pkt_indication.l4.m_tcp; - bool init=pkt->m_pkt_indication.m_desc.IsInitSide(); - seq=tcp->getSeqNumber (); - ack=tcp->getAckNumber (); - if (init) { - seq=seq-i_seq; - ack=ack-r_seq; - }else{ - seq=seq-r_seq; - ack=ack-i_seq; + case OPT_TYPE_SF: + { + SimStateful sf; + return sf.run(); } - printf(" %4d ",i); - if (!init) { - printf(" "); - } - printf(" %s seq: %4d ack : %4d \n",init?"I":"R",seq,ack); - } -} - - -int manipolate_capfile() { - time_init(); - CGlobalInfo::init_pools(1000); - - CCapFileFlowInfo flow_info; - flow_info.Create(); - - flow_info.load_cap_file("avl/delay_10_rtsp_0.pcap",0,0); - - change_pkt_len(&flow_info,4-1 ,6); - change_pkt_len(&flow_info,5-1 ,6); - change_pkt_len(&flow_info,6-1 ,6+2); - change_pkt_len(&flow_info,7-1 ,4); - change_pkt_len(&flow_info,8-1 ,6+2); - change_pkt_len(&flow_info,9-1 ,4); - change_pkt_len(&flow_info,10-1,6); - change_pkt_len(&flow_info,13-1,6); - change_pkt_len(&flow_info,16-1,6); - change_pkt_len(&flow_info,19-1,6); - - flow_info.save_to_erf("exp/c.pcap",1); - - return (1); -} - -int manipolate_capfile_sip() { - time_init(); - CGlobalInfo::init_pools(1000); - - CCapFileFlowInfo flow_info; - flow_info.Create(); - - flow_info.load_cap_file("avl/delay_10_sip_0.pcap",0,0); - - change_pkt_len(&flow_info,1-1 ,6+6); - change_pkt_len(&flow_info,2-1 ,6+6); - - flow_info.save_to_erf("exp/delay_10_sip_0_fixed.pcap",1); - - return (1); -} - -int manipolate_capfile_sip1() { - time_init(); - CGlobalInfo::init_pools(1000); - - CCapFileFlowInfo flow_info; - flow_info.Create(); - - flow_info.load_cap_file("avl/delay_sip_0.pcap",0,0); - flow_info.GetPacket(1); - - change_pkt_len(&flow_info,1-1 ,6+6+10); - - change_pkt_len(&flow_info,2-1 ,6+6+10); - - flow_info.save_to_erf("exp/delay_sip_0_fixed_1.pcap",1); - - return (1); -} - - -class CMergeCapFileRec { -public: - - CCapFileFlowInfo m_cap; - - int m_index; - int m_limit_number_of_packets; /* limit number of packets */ - bool m_stop; /* Do we have more packets */ - - double m_offset; /* offset should be positive */ - double m_start_time; - -public: - bool Create(std::string cap_file,double offset); - void Delete(); - void IncPacket(); - bool GetCurPacket(double & time); - CPacketIndication * GetUpdatedPacket(); - - void Dump(FILE *fd,int _id); -}; - - -void CMergeCapFileRec::Dump(FILE *fd,int _id){ - double time = 0.0; - bool stop=GetCurPacket(time); - fprintf (fd," id:%2d stop : %d index:%4d %3.4f \n",_id,stop?1:0,m_index,time); -} - - -CPacketIndication * CMergeCapFileRec::GetUpdatedPacket(){ - double t1; - assert(GetCurPacket(t1)==false); - CFlowPktInfo * pkt = m_cap.GetPacket(m_index); - pkt->m_pkt_indication.m_packet->set_new_time(t1); - return (&pkt->m_pkt_indication); -} - - -bool CMergeCapFileRec::GetCurPacket(double & time){ - if (m_stop) { - return(true); - } - CFlowPktInfo * pkt = m_cap.GetPacket(m_index); - time= (pkt->m_packet->get_time() -m_start_time + m_offset); - return (false); -} - -void CMergeCapFileRec::IncPacket(){ - m_index++; - if ( (m_limit_number_of_packets) && (m_index > m_limit_number_of_packets ) ) { - m_stop=true; - return; - } - - if ( m_index == (int)m_cap.Size() ) { - m_stop=true; - } -} - -void CMergeCapFileRec::Delete(){ - m_cap.Delete(); -} - -bool CMergeCapFileRec::Create(std::string cap_file, - double offset){ - m_cap.Create(); - m_cap.load_cap_file(cap_file,0,0); - CFlowPktInfo * pkt = m_cap.GetPacket(0); - - m_index=0; - m_stop=false; - m_limit_number_of_packets =0; - m_start_time = pkt->m_packet->get_time() ; - m_offset = offset; - - return (true); -} - - -#define MERGE_CAP_FILES (2) + case OPT_TYPE_SL: + { + SimStateless &st = SimStateless::get_instance(); -class CMergeCapFile { -public: - bool Create(); - void Delete(); - bool run_merge(std::string to_cap_file); -private: - void append(int _cap_id); - -public: - CMergeCapFileRec m[MERGE_CAP_FILES]; - CCapFileFlowInfo m_results; -}; - -bool CMergeCapFile::Create(){ - m_results.Create(); - return(true); -} - -void CMergeCapFile::Delete(){ - m_results.Delete(); -} - -void CMergeCapFile::append(int _cap_id){ - CPacketIndication * lp=m[_cap_id].GetUpdatedPacket(); - lp->m_packet->Dump(stdout,0); - m_results.Append(lp); -} - - -bool CMergeCapFile::run_merge(std::string to_cap_file){ - - int i=0; - int cnt=0; - while ( true ) { - int min_index=0; - double min_time; + if (params.count("dp_core_count") == 0) { + params["dp_core_count"] = 1; + } - fprintf(stdout," --------------\n"); - fprintf(stdout," pkt : %d \n",cnt); - for (i=0; i<MERGE_CAP_FILES; i++) { - m[i].Dump(stdout,i); - } - fprintf(stdout," --------------\n"); - - bool valid = false; - for (i=0; i<MERGE_CAP_FILES; i++) { - double t1; - if ( m[i].GetCurPacket(t1) == false ){ - /* not in stop */ - if (!valid) { - min_time = t1; - min_index = i; - valid=true; - }else{ - if (t1 < min_time) { - min_time=t1; - min_index = i; - } - } + if (params.count("dp_core_index") == 0) { + params["dp_core_index"] = -1; + } + if (params.count("limit") == 0) { + params["limit"] = 5000; } - } - /* nothing to do */ - if (valid==false) { - fprintf(stdout,"nothing to do \n"); - break; + return st.run(CGlobalInfo::m_options.cfg_file, + CGlobalInfo::m_options.out_file, + 2, + params["dp_core_count"], + params["dp_core_index"], + params["limit"]); } - - cnt++; - fprintf(stdout," choose id %d \n",min_index); - append(min_index); - m[min_index].IncPacket(); - }; - - m_results.save_to_erf(to_cap_file,1); - - return (true); -} - - - -int merge_3_cap_files() { - time_init(); - CGlobalInfo::init_pools(1000); - - CMergeCapFile merger; - merger.Create(); - merger.m[0].Create("exp/c.pcap",0.001); - merger.m[1].Create("avl/delay_10_rtp_160k_0.pcap",0.31); - merger.m[2].Create("avl/delay_10_rtp_160k_1.pcap",0.311); - - //merger.m[1].Create("avl/delay_10_rtp_250k_0_0.pcap",0.31); - //merger.m[1].m_limit_number_of_packets =6; - //merger.m[2].Create("avl/delay_10_rtp_250k_1_0.pcap",0.311); - //merger.m[2].m_limit_number_of_packets =6; - - merger.run_merge("exp/delay_10_rtp_160k_full.pcap"); - - return (0); -} - -int merge_2_cap_files_sip() { - time_init(); - CGlobalInfo::init_pools(1000); - - CMergeCapFile merger; - merger.Create(); - merger.m[0].Create("exp/delay_sip_0_fixed_1.pcap",0.001); - merger.m[1].Create("avl/delay_video_call_rtp_0.pcap",0.51); - //merger.m[1].m_limit_number_of_packets=7; - - //merger.m[1].Create("avl/delay_10_rtp_250k_0_0.pcap",0.31); - //merger.m[1].m_limit_number_of_packets =6; - //merger.m[2].Create("avl/delay_10_rtp_250k_1_0.pcap",0.311); - //merger.m[2].m_limit_number_of_packets =6; - - merger.run_merge("avl/delay_10_sip_video_call_full.pcap"); - - return (0); -} - -static TrexStateless *g_trex_stateless; - - -TrexStateless * get_stateless_obj() { - return g_trex_stateless; -} - -extern "C" const char * get_build_date(void){ - return (__DATE__); -} - -extern "C" const char * get_build_time(void){ - return (__TIME__ ); -} - - - - -int main(int argc , char * argv[]){ - - int res=0; - time_init(); - CGlobalInfo::m_socket.Create(0); - CGlobalInfo::init_pools(1000); - assert( CMsgIns::Ins()->Create(4) ); - - - bool is_gtest=false; - - if ( parse_options(argc, argv, &CGlobalInfo::m_options , is_gtest) != 0){ - exit(-1); } - - if ( is_gtest ) { - res = gtest_main(argc, argv); - }else{ - res = load_list_of_cap_files(&CGlobalInfo::m_options); - } - - CMsgIns::Ins()->Free(); - CGlobalInfo::free_pools(); - CGlobalInfo::m_socket.Delete(); - - - return (res); - } diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index 16e36a61..3a31945f 100755 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -140,7 +140,7 @@ public: return(false); } - virtual int configure_drop_queue(CPhyEthIF * _if)=0; + virtual int configure_drop_queue(CPhyEthIF * _if); virtual void get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats)=0; virtual void clear_extended_stats(CPhyEthIF * _if)=0; virtual int wait_for_stable_link()=0; @@ -174,14 +174,13 @@ public: return (true); } + virtual int configure_drop_queue(CPhyEthIF * _if); virtual int configure_rx_filter_rules(CPhyEthIF * _if); virtual bool is_hardware_support_drop_queue(){ return(true); } - virtual int configure_drop_queue(CPhyEthIF * _if); - virtual void get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats); virtual void clear_extended_stats(CPhyEthIF * _if); @@ -265,8 +264,6 @@ public: virtual bool is_hardware_support_drop_queue(){ return(true); } - virtual int configure_drop_queue(CPhyEthIF * _if); - virtual void get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats); virtual void clear_extended_stats(CPhyEthIF * _if); virtual int wait_for_stable_link(); @@ -299,10 +296,6 @@ public: virtual bool is_hardware_support_drop_queue(){ return(true); } - virtual int configure_drop_queue(CPhyEthIF * _if); - - - virtual void get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats); virtual void clear_extended_stats(CPhyEthIF * _if); virtual int wait_for_stable_link(); @@ -4549,6 +4542,53 @@ int update_global_info_from_platform_file(){ return (0); } +extern "C" int eal_cpu_detected(unsigned lcore_id); +// return mask representing available cores +int core_mask_calc() { + uint32_t mask = 0; + int lcore_id; + + for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) { + if (eal_cpu_detected(lcore_id)) { + mask |= (1 << lcore_id); + } + } + + return mask; +} + +// Return number of set bits in i +uint32_t num_set_bits(uint32_t i) +{ + i = i - ((i >> 1) & 0x55555555); + i = (i & 0x33333333) + ((i >> 2) & 0x33333333); + return (((i + (i >> 4)) & 0x0F0F0F0F) * 0x01010101) >> 24; +} + +// sanity check if the cores we want to use really exist +int core_mask_sanity(uint32_t wanted_core_mask) { + uint32_t calc_core_mask = core_mask_calc(); + uint32_t wanted_core_num, calc_core_num; + + wanted_core_num = num_set_bits(wanted_core_mask); + calc_core_num = num_set_bits(calc_core_mask); + + if (wanted_core_num > calc_core_num) { + printf("Error: You have %d threads available, but you asked for %d threads.\n", calc_core_num, wanted_core_num); + printf(" Calculation is: -c <num>(%d) * dual ports (%d) + 1 master thread %s" + , CGlobalInfo::m_options.preview.getCores(), CGlobalInfo::m_options.get_expected_dual_ports() + , get_is_latency_thread_enable() ? "+1 latency thread (because of -l flag)\n" : "\n"); + printf(" Maybe try smaller -c <num>.\n"); + return -1; + } + + if (wanted_core_mask != (wanted_core_mask & calc_core_mask)) { + printf ("Serious error: Something is wrong with the hardware. Wanted core mask is %x. Existing core mask is %x\n", wanted_core_mask, calc_core_mask); + return -1; + } + + return 0; +} int update_dpdk_args(void){ @@ -4567,8 +4607,10 @@ int update_dpdk_args(void){ lpsock->dump(stdout); } - sprintf(global_cores_str,"0x%llx",(unsigned long long)lpsock->get_cores_mask()); + if (core_mask_sanity(strtol(global_cores_str, NULL, 16)) < 0) { + return -1; + } /* set the DPDK options */ global_dpdk_args_num =7; @@ -4678,7 +4720,10 @@ int main_test(int argc , char * argv[]){ CGlobalInfo::m_memory_cfg.Dump(stdout); } - update_dpdk_args(); + + if (update_dpdk_args() < 0) { + return -1; + } CParserOption * po=&CGlobalInfo::m_options; @@ -4786,6 +4831,11 @@ int main_test(int argc , char * argv[]){ ////////////////////////////////////////////////////////////////////////////////////////////// // driver section ////////////////////////////////////////////////////////////////////////////////////////////// +int CTRexExtendedDriverBase::configure_drop_queue(CPhyEthIF * _if) { + uint8_t port_id=_if->get_rte_port_id(); + return (rte_eth_dev_rx_queue_stop(port_id, 0)); +} + void wait_x_sec(int sec) { int i; printf(" wait %d sec ", sec); @@ -4809,26 +4859,6 @@ int CTRexExtendedDriverBase1G::wait_for_stable_link(){ return(0); } -int CTRexExtendedDriverBase1G::configure_drop_queue(CPhyEthIF * _if){ - uint8_t protocol; - if (CGlobalInfo::m_options.m_l_pkt_mode == 0) { - protocol = IPPROTO_SCTP; - } else { - protocol = IPPROTO_ICMP; - } - - _if->pci_reg_write( E1000_RXDCTL(0) , 0); - - /* enable filter to pass packet to rx queue 1 */ - _if->pci_reg_write( E1000_IMIR(0), 0x00020000); - _if->pci_reg_write( E1000_IMIREXT(0), 0x00081000); - _if->pci_reg_write( E1000_TTQF(0), protocol - | 0x00008100 /* enable */ - | 0xE0010000 /* RX queue is 1 */ - ); - return (0); -} - void CTRexExtendedDriverBase1G::update_configuration(port_cfg_t * cfg){ cfg->m_tx_conf.tx_thresh.pthresh = TX_PTHRESH_1G; @@ -4840,10 +4870,32 @@ void CTRexExtendedDriverBase1G::update_global_config_fdir(port_cfg_t * cfg){ // Configuration is done in configure_rx_filter_rules by writing to registers } +// e1000 driver does not support the generic stop queue API, so we need to implement ourselves +int CTRexExtendedDriverBase1G::configure_drop_queue(CPhyEthIF * _if) { + // Drop packets coming to RX queue 0 + _if->pci_reg_write( E1000_RXDCTL(0) , 0); + return 0; +} + int CTRexExtendedDriverBase1G::configure_rx_filter_rules(CPhyEthIF * _if){ uint16_t hops = get_rx_check_hops(); uint16_t v4_hops = (hops << 8)&0xff00; + uint8_t protocol; + + if (CGlobalInfo::m_options.m_l_pkt_mode == 0) { + protocol = IPPROTO_SCTP; + } else { + protocol = IPPROTO_ICMP; + } + /* enable filter to pass packet to rx queue 1 */ + _if->pci_reg_write( E1000_IMIR(0), 0x00020000); + _if->pci_reg_write( E1000_IMIREXT(0), 0x00081000); + _if->pci_reg_write( E1000_TTQF(0), protocol + | 0x00008100 /* enable */ + | 0xE0010000 /* RX queue is 1 */ + ); + /* 16 : 12 MAC , (2)0x0800,2 | DW0 , DW1 6 bytes , TTL , PROTO | DW2=0 , DW3=0x0000FF06 @@ -4981,6 +5033,14 @@ int CTRexExtendedDriverBase10G::configure_rx_filter_rules(CPhyEthIF * _if){ uint16_t hops = get_rx_check_hops(); uint16_t v4_hops = (hops << 8)&0xff00; + /* enable rule 0 SCTP -> queue 1 for latency */ + /* 1<<21 means that queue 1 is for SCTP */ + _if->pci_reg_write(IXGBE_L34T_IMIR(0),(1<<21)); + _if->pci_reg_write(IXGBE_FTQF(0), + IXGBE_FTQF_PROTOCOL_SCTP| + (IXGBE_FTQF_PRIORITY_MASK<<IXGBE_FTQF_PRIORITY_SHIFT)| + ((0x0f)<<IXGBE_FTQF_5TUPLE_MASK_SHIFT)|IXGBE_FTQF_QUEUE_ENABLE); + // IPv4: bytes being compared are {TTL, Protocol} uint16_t ff_rules_v4[6]={ (uint16_t)(0xFF11 - v4_hops), @@ -5036,21 +5096,6 @@ int CTRexExtendedDriverBase10G::configure_rx_filter_rules(CPhyEthIF * _if){ return (0); } -int CTRexExtendedDriverBase10G::configure_drop_queue(CPhyEthIF * _if){ - /* enable rule 0 SCTP -> queue 1 for latency */ - /* 1<<21 means that queue 1 is for SCTP */ - _if->pci_reg_write(IXGBE_L34T_IMIR(0),(1<<21)); - - _if->pci_reg_write(IXGBE_FTQF(0), - IXGBE_FTQF_PROTOCOL_SCTP| - (IXGBE_FTQF_PRIORITY_MASK<<IXGBE_FTQF_PRIORITY_SHIFT)| - ((0x0f)<<IXGBE_FTQF_5TUPLE_MASK_SHIFT)|IXGBE_FTQF_QUEUE_ENABLE); - - /* disable queue zero - default all traffic will go to here and will be dropped */ - _if->pci_reg_write( IXGBE_RXDCTL(0) , 0); - return (0); -} - void CTRexExtendedDriverBase10G::get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats){ int i; @@ -5173,15 +5218,10 @@ int CTRexExtendedDriverBase40G::configure_rx_filter_rules(CPhyEthIF * _if){ add_rules(_if,RTE_ETH_FLOW_NONFRAG_IPV6_TCP, ttl); } - return (0); -} - - -int CTRexExtendedDriverBase40G::configure_drop_queue(CPhyEthIF * _if){ - /* Configure queue for latency packets */ add_rules(_if,RTE_ETH_FLOW_NONFRAG_IPV4_OTHER, 255); add_rules(_if,RTE_ETH_FLOW_NONFRAG_IPV4_SCTP, 255); + return (0); } @@ -5250,8 +5290,6 @@ void CTRexExtendedDriverBase1GVm::clear_extended_stats(CPhyEthIF * _if){ } int CTRexExtendedDriverBase1GVm::configure_drop_queue(CPhyEthIF * _if){ - - return (0); } diff --git a/src/mock/trex_platform_api_mock.cpp b/src/mock/trex_platform_api_mock.cpp index 54f71e10..416c4b69 100644 --- a/src/mock/trex_platform_api_mock.cpp +++ b/src/mock/trex_platform_api_mock.cpp @@ -47,3 +47,7 @@ TrexMockPlatformApi::get_dp_core_count() const { return (1); } +void +TrexMockPlatformApi::port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list) const { + cores_id_list.push_back(std::make_pair(0, 0)); +} diff --git a/src/mock/trex_rpc_server_mock.cpp b/src/mock/trex_rpc_server_mock.cpp index 0bdf6cf1..ecfa308d 100644 --- a/src/mock/trex_rpc_server_mock.cpp +++ b/src/mock/trex_rpc_server_mock.cpp @@ -78,10 +78,6 @@ find_free_tcp_port(uint16_t start_port = 5050) { return port; } -TrexStateless * get_stateless_obj() { - return g_trex_stateless; -} - uint16_t gtest_get_mock_server_port() { return g_rpc_port; } diff --git a/src/publisher/trex_publisher.h b/src/publisher/trex_publisher.h index bd4392f7..52978476 100644 --- a/src/publisher/trex_publisher.h +++ b/src/publisher/trex_publisher.h @@ -34,9 +34,11 @@ public: m_publisher = NULL; } - bool Create(uint16_t port, bool disable); - void Delete(); - void publish_json(const std::string &s); + virtual ~TrexPublisher() {} + + virtual bool Create(uint16_t port, bool disable); + virtual void Delete(); + virtual void publish_json(const std::string &s); enum event_type_e { EVENT_PORT_STARTED = 0, @@ -51,7 +53,7 @@ public: }; - void publish_event(event_type_e type, const Json::Value &data = Json::nullValue); + virtual void publish_event(event_type_e type, const Json::Value &data = Json::nullValue); private: void show_zmq_last_error(const std::string &err); diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp index 46fe499b..6e5fbfc6 100644 --- a/src/rpc-server/trex_rpc_async_server.cpp +++ b/src/rpc-server/trex_rpc_async_server.cpp @@ -41,6 +41,10 @@ TrexRpcServerAsync::TrexRpcServerAsync(const TrexRpcServerConfig &cfg, std::mute m_context = zmq_ctx_new(); } +void +TrexRpcServerAsync::_prepare() { +} + /** * publisher thread * diff --git a/src/rpc-server/trex_rpc_async_server.h b/src/rpc-server/trex_rpc_async_server.h index 02d1490e..80d92c2f 100644 --- a/src/rpc-server/trex_rpc_async_server.h +++ b/src/rpc-server/trex_rpc_async_server.h @@ -36,6 +36,7 @@ public: TrexRpcServerAsync(const TrexRpcServerConfig &cfg, std::mutex *lock = NULL); protected: + void _prepare(); void _rpc_thread_cb(); void _stop_rpc_thread(); diff --git a/src/rpc-server/trex_rpc_cmd.cpp b/src/rpc-server/trex_rpc_cmd.cpp index b5dd121c..aea7980b 100644 --- a/src/rpc-server/trex_rpc_cmd.cpp +++ b/src/rpc-server/trex_rpc_cmd.cpp @@ -32,7 +32,7 @@ TrexRpcCommand::run(const Json::Value ¶ms, Json::Value &result) { check_param_count(params, m_param_count, result); - if (m_needs_ownership) { + if (m_needs_ownership && !g_test_override_ownership) { verify_ownership(params, result); } @@ -372,3 +372,7 @@ TrexRpcCommand::generate_execute_err(Json::Value &result, const std::string &msg throw (TrexRpcCommandException(TREX_RPC_CMD_EXECUTE_ERR)); } +/** + * by default this is off + */ +bool TrexRpcCommand::g_test_override_ownership = false; diff --git a/src/rpc-server/trex_rpc_cmd_api.h b/src/rpc-server/trex_rpc_cmd_api.h index 7cbdf4ff..675d2900 100644 --- a/src/rpc-server/trex_rpc_cmd_api.h +++ b/src/rpc-server/trex_rpc_cmd_api.h @@ -89,6 +89,16 @@ public: return m_name; } + /** + * on test we enable this override + * + * + * @param enable + */ + static void test_set_override_ownership(bool enable) { + g_test_override_ownership = enable; + } + virtual ~TrexRpcCommand() {} protected: @@ -241,6 +251,8 @@ protected: std::string m_name; int m_param_count; bool m_needs_ownership; + + static bool g_test_override_ownership; }; #endif /* __TREX_RPC_CMD_API_H__ */ diff --git a/src/rpc-server/trex_rpc_req_resp_server.cpp b/src/rpc-server/trex_rpc_req_resp_server.cpp index eb7825ac..1e8e177d 100644 --- a/src/rpc-server/trex_rpc_req_resp_server.cpp +++ b/src/rpc-server/trex_rpc_req_resp_server.cpp @@ -36,7 +36,10 @@ limitations under the License. * */ TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock) : TrexRpcServerInterface(cfg, "req resp", lock) { - /* ZMQ is not thread safe - this should be outside */ + +} + +void TrexRpcServerReqRes::_prepare() { m_context = zmq_ctx_new(); } @@ -123,15 +126,28 @@ TrexRpcServerReqRes::fetch_one_request(std::string &msg) { */ void TrexRpcServerReqRes::_stop_rpc_thread() { /* by calling zmq_term we signal the blocked thread to exit */ - zmq_term(m_context); + if (m_context) { + zmq_term(m_context); + } } + /** * handles a request given to the server * respondes to the request */ void TrexRpcServerReqRes::handle_request(const std::string &request) { + std::string response_str = process_request(request); + zmq_send(m_socket, response_str.c_str(), response_str.size(), 0); +} + +/** + * main processing of the request + * + */ +std::string TrexRpcServerReqRes::process_request(const std::string &request) { + std::vector<TrexJsonRpcV2ParsedObject *> commands; Json::FastWriter writer; @@ -175,8 +191,7 @@ void TrexRpcServerReqRes::handle_request(const std::string &request) { verbose_json("Server Replied: ", response_str); - zmq_send(m_socket, response_str.c_str(), response_str.size(), 0); - + return response_str; } /** @@ -198,3 +213,36 @@ TrexRpcServerReqRes::handle_server_error(const std::string &specific_err) { zmq_send(m_socket, response_str.c_str(), response_str.size(), 0); } + + + +std::string +TrexRpcServerReqRes::test_inject_request(const std::string &req) { + return process_request(req); +} + + +/** + * MOCK req resp server + */ +TrexRpcServerReqResMock::TrexRpcServerReqResMock(const TrexRpcServerConfig &cfg) : TrexRpcServerReqRes(cfg) { +} + +/** + * override start + * + */ +void +TrexRpcServerReqResMock::start() { + +} + + +/** + * override stop + */ +void +TrexRpcServerReqResMock::stop() { + +} + diff --git a/src/rpc-server/trex_rpc_req_resp_server.h b/src/rpc-server/trex_rpc_req_resp_server.h index 2876206c..97efbe08 100644 --- a/src/rpc-server/trex_rpc_req_resp_server.h +++ b/src/rpc-server/trex_rpc_req_resp_server.h @@ -34,13 +34,19 @@ public: TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock = NULL); + /* for test purposes - bypass the ZMQ and inject a message */ + std::string test_inject_request(const std::string &req); + protected: + + void _prepare(); void _rpc_thread_cb(); void _stop_rpc_thread(); -private: bool fetch_one_request(std::string &msg); void handle_request(const std::string &request); + std::string process_request(const std::string &request); + void handle_server_error(const std::string &specific_err); void *m_context; @@ -48,4 +54,22 @@ private: }; +/** + * a mock req resp server (for tests) + * + * @author imarom (03-Jan-16) + */ +class TrexRpcServerReqResMock : public TrexRpcServerReqRes { + +public: + TrexRpcServerReqResMock(const TrexRpcServerConfig &cfg); + + /* override the interface functions */ + virtual void start(); + virtual void stop(); + + +}; + #endif /* __TREX_RPC_REQ_RESP_API_H__ */ + diff --git a/src/rpc-server/trex_rpc_server.cpp b/src/rpc-server/trex_rpc_server.cpp index a14e6f97..1dfc4494 100644 --- a/src/rpc-server/trex_rpc_server.cpp +++ b/src/rpc-server/trex_rpc_server.cpp @@ -63,6 +63,9 @@ void TrexRpcServerInterface::start() { verbose_msg("Starting RPC Server"); + /* prepare for run */ + _prepare(); + m_thread = new std::thread(&TrexRpcServerInterface::_rpc_thread_cb, this); if (!m_thread) { throw TrexRpcException("unable to create RPC thread"); @@ -117,9 +120,18 @@ TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg, const TrexRpcServerConfig *async_cfg, std::mutex *lock) { + m_req_resp = NULL; + /* add the request response server */ if (req_resp_cfg) { - m_servers.push_back(new TrexRpcServerReqRes(*req_resp_cfg, lock)); + + if (req_resp_cfg->get_protocol() == TrexRpcServerConfig::RPC_PROT_MOCK) { + m_req_resp = new TrexRpcServerReqResMock(*req_resp_cfg); + } else { + m_req_resp = new TrexRpcServerReqRes(*req_resp_cfg, lock); + } + + m_servers.push_back(m_req_resp); } /* add async publisher */ @@ -166,3 +178,12 @@ void TrexRpcServer::set_verbose(bool verbose) { } } + +std::string +TrexRpcServer::test_inject_request(const std::string &req_str) { + if (m_req_resp) { + return m_req_resp->test_inject_request(req_str); + } else { + return ""; + } +} diff --git a/src/rpc-server/trex_rpc_server_api.h b/src/rpc-server/trex_rpc_server_api.h index ff876ac4..1ab5dce9 100644 --- a/src/rpc-server/trex_rpc_server_api.h +++ b/src/rpc-server/trex_rpc_server_api.h @@ -29,8 +29,10 @@ limitations under the License. #include <string> #include <stdexcept> #include <trex_rpc_exception_api.h> +#include <json/json.h> class TrexRpcServerInterface; +class TrexRpcServerReqRes; /** * defines a configuration of generic RPC server @@ -41,18 +43,19 @@ class TrexRpcServerConfig { public: enum rpc_prot_e { - RPC_PROT_TCP + RPC_PROT_TCP, + RPC_PROT_MOCK }; TrexRpcServerConfig(rpc_prot_e protocol, uint16_t port) : m_protocol(protocol), m_port(port) { } - uint16_t get_port() { + uint16_t get_port() const { return m_port; } - rpc_prot_e get_protocol() { + rpc_prot_e get_protocol() const { return m_protocol; } @@ -76,13 +79,13 @@ public: * starts the server * */ - void start(); + virtual void start(); /** * stops the server * */ - void stop(); + virtual void stop(); /** * set verbose on or off @@ -107,6 +110,7 @@ protected: * instances implement this * */ + virtual void _prepare() = 0; virtual void _rpc_thread_cb() = 0; virtual void _stop_rpc_thread() = 0; @@ -169,12 +173,23 @@ public: } + /** + * allow injecting of a JSON and get a response + * + * @author imarom (27-Dec-15) + * + * @return std::string + */ + std::string test_inject_request(const std::string &request_str); private: static std::string generate_handler(); std::vector<TrexRpcServerInterface *> m_servers; + // an alias to the req resp server + TrexRpcServerReqRes *m_req_resp; + bool m_verbose; static const std::string s_server_uptime; diff --git a/src/sim/trex_sim.h b/src/sim/trex_sim.h new file mode 100644 index 00000000..a541ce01 --- /dev/null +++ b/src/sim/trex_sim.h @@ -0,0 +1,148 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_SIM_H__ +#define __TREX_SIM_H__ + +#include <string> +#include <os_time.h> +#include <bp_sim.h> +#include <json/json.h> + +int gtest_main(int argc, char **argv); + +class TrexStateless; +class TrexPublisher; +class DpToCpHandler; + +/** + * interface for a sim target + * + */ +class SimInterface { +public: + + SimInterface() { + + time_init(); + CGlobalInfo::m_socket.Create(0); + CGlobalInfo::init_pools(1000); + } + + virtual ~SimInterface() { + + CMsgIns::Ins()->Free(); + CGlobalInfo::free_pools(); + CGlobalInfo::m_socket.Delete(); + } + + +}; + +/** + * gtest target + * + * @author imarom (28-Dec-15) + */ +class SimGtest : public SimInterface { +public: + + int run(int argc, char **argv) { + assert( CMsgIns::Ins()->Create(4) ); + return gtest_main(argc, argv); + } +}; + + + +/** + * stateful target + * + */ +class SimStateful : public SimInterface { + +public: + int run(); +}; + + + +/** + * target for sim stateless + * + * @author imarom (28-Dec-15) + */ +class SimStateless : public SimInterface { + +public: + static SimStateless& get_instance() { + static SimStateless instance; + return instance; + } + + + int run(const std::string &json_filename, + const std::string &out_filename, + int port_count, + int dp_core_count, + int dp_core_index, + int limit); + + TrexStateless * get_stateless_obj() { + return m_trex_stateless; + } + + void set_verbose(bool enable) { + m_verbose = enable; + } + +private: + SimStateless(); + ~SimStateless(); + + void prepare_control_plane(); + void prepare_dataplane(); + void execute_json(const std::string &json_filename); + + void run_dp(const std::string &out_filename); + uint64_t run_dp_core(int core_index, const std::string &out_filename); + + void flush_dp_to_cp_messages_core(int core_index); + + void validate_response(const Json::Value &resp); + + bool is_verbose() { + return m_verbose; + } + + TrexStateless *m_trex_stateless; + DpToCpHandler *m_dp_to_cp_handler; + TrexPublisher *m_publisher; + CFlowGenList m_fl; + CErfIFStl m_erf_vif; + bool m_verbose; + + int m_port_count; + int m_dp_core_count; + int m_dp_core_index; + uint64_t m_limit; +}; + +#endif /* __TREX_SIM_H__ */ diff --git a/src/sim/trex_sim_stateful.cpp b/src/sim/trex_sim_stateful.cpp new file mode 100644 index 00000000..88698cd1 --- /dev/null +++ b/src/sim/trex_sim_stateful.cpp @@ -0,0 +1,600 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include "trex_sim.h" + +static int cores = 1; + +#ifdef LINUX + + + +#include <pthread.h> + +struct per_thread_t { + pthread_t tid; +}; + +#define MAX_THREADS 200 +static per_thread_t tr_info[MAX_THREADS]; + + +////////////// + +struct test_t_info1 { + CPreviewMode * preview_info; + CFlowGenListPerThread * thread_info; + uint32_t thread_id; +}; + +void * thread_task(void *info){ + + test_t_info1 * obj =(test_t_info1 *)info; + + CFlowGenListPerThread * lpt=obj->thread_info; + + printf("start thread %d \n",obj->thread_id); + //delay(obj->thread_id *3000); + printf("-->start thread %d \n",obj->thread_id); + if (1/*obj->thread_id ==3*/) { + + char buf[100]; + sprintf(buf,"my%d.erf",obj->thread_id); + lpt->start_generate_stateful(buf,*obj->preview_info); + lpt->m_node_gen.DumpHist(stdout); + printf("end thread %d \n",obj->thread_id); + } + + return (NULL); +} + + +void test_load_list_of_cap_files_linux(CParserOption * op){ + + CFlowGenList fl; + //CNullIF erf_vif; + //CErfIF erf_vif; + + fl.Create(); + + fl.load_from_yaml(op->cfg_file,cores); + fl.DumpPktSize(); + + + fl.generate_p_thread_info(cores); + CFlowGenListPerThread * lpt; + + /* set the ERF file */ + //fl.set_vif_all(&erf_vif); + + int i; + for (i=0; i<cores; i++) { + lpt=fl.m_threads_info[i]; + test_t_info1 * obj = new test_t_info1(); + obj->preview_info =&op->preview; + obj->thread_info = fl.m_threads_info[i]; + obj->thread_id = i; + CNullIF * erf_vif = new CNullIF(); + //CErfIF * erf_vif = new CErfIF(); + + lpt->set_vif(erf_vif); + + assert(pthread_create( &tr_info[i].tid, NULL, thread_task, obj)==0); + } + + for (i=0; i<cores; i++) { + /* wait for all of them to stop */ + assert(pthread_join((pthread_t)tr_info[i].tid,NULL )==0); + } + + printf("compare files \n"); + for (i=1; i<cores; i++) { + + CErfCmp cmp; + char buf[100]; + sprintf(buf,"my%d.erf",i); + char buf1[100]; + sprintf(buf1,"my%d.erf",0); + if ( cmp.compare(std::string(buf),std::string(buf1)) != true ) { + printf(" ERROR cap file is not ex !! \n"); + assert(0); + } + printf(" thread %d is ok \n",i); + } + + fl.Delete(); +} + + +#endif + +/*************************************************************/ +void test_load_list_of_cap_files(CParserOption * op){ + + CFlowGenList fl; + CNullIF erf_vif; + + fl.Create(); + + #define NUM 1 + + fl.load_from_yaml(op->cfg_file,NUM); + fl.DumpPktSize(); + + + fl.generate_p_thread_info(NUM); + CFlowGenListPerThread * lpt; + + /* set the ERF file */ + //fl.set_vif_all(&erf_vif); + + int i; + for (i=0; i<NUM; i++) { + lpt=fl.m_threads_info[i]; + char buf[100]; + sprintf(buf,"my%d.erf",i); + lpt->start_generate_stateful(buf,op->preview); + lpt->m_node_gen.DumpHist(stdout); + } + //sprintf(buf,"my%d.erf",7); + //lpt=fl.m_threads_info[7]; + + //fl.Dump(stdout); + fl.Delete(); +} + +int load_list_of_cap_files(CParserOption * op){ + CFlowGenList fl; + fl.Create(); + fl.load_from_yaml(op->cfg_file,1); + if ( op->preview.getVMode() >0 ) { + fl.DumpCsv(stdout); + } + uint32_t start= os_get_time_msec(); + + CErfIF erf_vif; + //CNullIF erf_vif; + + fl.generate_p_thread_info(1); + CFlowGenListPerThread * lpt; + lpt=fl.m_threads_info[0]; + lpt->set_vif(&erf_vif); + + if ( (op->preview.getVMode() >1) || op->preview.getFileWrite() ) { + lpt->start_generate_stateful(op->out_file,op->preview); + } + + lpt->m_node_gen.DumpHist(stdout); + + uint32_t stop= os_get_time_msec(); + printf(" d time = %ul %ul \n",stop-start,os_get_time_freq()); + fl.Delete(); + return (0); +} + + +int test_dns(){ + + time_init(); + CGlobalInfo::init_pools(1000); + + CParserOption po ; + + //po.cfg_file = "cap2/dns.yaml"; + //po.cfg_file = "cap2/sfr3.yaml"; + po.cfg_file = "cap2/sfr.yaml"; + + po.preview.setVMode(0); + po.preview.setFileWrite(true); + #ifdef LINUX + test_load_list_of_cap_files_linux(&po); + #else + test_load_list_of_cap_files(&po); + #endif + return (0); +} + +void test_pkt_mbuf(void); + +void test_compare_files(void); + +#if 0 +static int b=0; +static int c=0; +static int d=0; + +int test_instructions(){ + int i; + for (i=0; i<100000;i++) { + b+=b+1; + c+=+b+c+1; + d+=+(b*2+1); + } + return (b+c+d); +} + +#include <valgrind/callgrind.h> +#endif + + +void update_tcp_seq_num(CCapFileFlowInfo * obj, + int pkt_id, + int size_change){ + CFlowPktInfo * pkt=obj->GetPacket(pkt_id); + if ( pkt->m_pkt_indication.m_desc.IsUdp() ){ + /* nothing to do */ + return; + } + + bool o_init=pkt->m_pkt_indication.m_desc.IsInitSide(); + TCPHeader * tcp ; + int s= (int)obj->Size(); + int i; + + for (i=pkt_id+1; i<s; i++) { + + pkt=obj->GetPacket(i); + tcp=pkt->m_pkt_indication.l4.m_tcp; + bool init=pkt->m_pkt_indication.m_desc.IsInitSide(); + if (init == o_init) { + /* same dir update the seq number */ + tcp->setSeqNumber (tcp->getSeqNumber ()+size_change); + + }else{ + /* update the ack number */ + tcp->setAckNumber (tcp->getAckNumber ()+size_change); + } + } +} + + + +void change_pkt_len(CCapFileFlowInfo * obj,int pkt_id, int size ){ + CFlowPktInfo * pkt=obj->GetPacket(pkt_id); + + /* enlarge the packet size by 9 */ + + char * p=pkt->m_packet->append(size); + /* set it to 0xaa*/ + memmove(p+size-4,p-4,4); /* CRCbytes */ + memset(p-4,0x0a,size); + + /* refresh the pointers */ + pkt->m_pkt_indication.RefreshPointers(); + + IPHeader * ipv4 = pkt->m_pkt_indication.l3.m_ipv4; + ipv4->updateTotalLength (ipv4->getTotalLength()+size ); + + /* update seq numbers if needed */ + update_tcp_seq_num(obj,pkt_id,size); +} + +void dump_tcp_seq_num_(CCapFileFlowInfo * obj){ + int s= (int)obj->Size(); + int i; + uint32_t i_seq; + uint32_t r_seq; + + CFlowPktInfo * pkt=obj->GetPacket(0); + TCPHeader * tcp = pkt->m_pkt_indication.l4.m_tcp; + i_seq=tcp->getSeqNumber (); + + pkt=obj->GetPacket(1); + tcp = pkt->m_pkt_indication.l4.m_tcp; + r_seq=tcp->getSeqNumber (); + + for (i=2; i<s; i++) { + uint32_t seq; + uint32_t ack; + + pkt=obj->GetPacket(i); + tcp=pkt->m_pkt_indication.l4.m_tcp; + bool init=pkt->m_pkt_indication.m_desc.IsInitSide(); + seq=tcp->getSeqNumber (); + ack=tcp->getAckNumber (); + if (init) { + seq=seq-i_seq; + ack=ack-r_seq; + }else{ + seq=seq-r_seq; + ack=ack-i_seq; + } + printf(" %4d ",i); + if (!init) { + printf(" "); + } + printf(" %s seq: %4d ack : %4d \n",init?"I":"R",seq,ack); + } +} + + +int manipolate_capfile() { + time_init(); + CGlobalInfo::init_pools(1000); + + CCapFileFlowInfo flow_info; + flow_info.Create(); + + flow_info.load_cap_file("avl/delay_10_rtsp_0.pcap",0,0); + + change_pkt_len(&flow_info,4-1 ,6); + change_pkt_len(&flow_info,5-1 ,6); + change_pkt_len(&flow_info,6-1 ,6+2); + change_pkt_len(&flow_info,7-1 ,4); + change_pkt_len(&flow_info,8-1 ,6+2); + change_pkt_len(&flow_info,9-1 ,4); + change_pkt_len(&flow_info,10-1,6); + change_pkt_len(&flow_info,13-1,6); + change_pkt_len(&flow_info,16-1,6); + change_pkt_len(&flow_info,19-1,6); + + flow_info.save_to_erf("exp/c.pcap",1); + + return (1); +} + +int manipolate_capfile_sip() { + time_init(); + CGlobalInfo::init_pools(1000); + + CCapFileFlowInfo flow_info; + flow_info.Create(); + + flow_info.load_cap_file("avl/delay_10_sip_0.pcap",0,0); + + change_pkt_len(&flow_info,1-1 ,6+6); + change_pkt_len(&flow_info,2-1 ,6+6); + + flow_info.save_to_erf("exp/delay_10_sip_0_fixed.pcap",1); + + return (1); +} + +int manipolate_capfile_sip1() { + time_init(); + CGlobalInfo::init_pools(1000); + + CCapFileFlowInfo flow_info; + flow_info.Create(); + + flow_info.load_cap_file("avl/delay_sip_0.pcap",0,0); + flow_info.GetPacket(1); + + change_pkt_len(&flow_info,1-1 ,6+6+10); + + change_pkt_len(&flow_info,2-1 ,6+6+10); + + flow_info.save_to_erf("exp/delay_sip_0_fixed_1.pcap",1); + + return (1); +} + + +class CMergeCapFileRec { +public: + + CCapFileFlowInfo m_cap; + + int m_index; + int m_limit_number_of_packets; /* limit number of packets */ + bool m_stop; /* Do we have more packets */ + + double m_offset; /* offset should be positive */ + double m_start_time; + +public: + bool Create(std::string cap_file,double offset); + void Delete(); + void IncPacket(); + bool GetCurPacket(double & time); + CPacketIndication * GetUpdatedPacket(); + + void Dump(FILE *fd,int _id); +}; + + +void CMergeCapFileRec::Dump(FILE *fd,int _id){ + double time = 0.0; + bool stop=GetCurPacket(time); + fprintf (fd," id:%2d stop : %d index:%4d %3.4f \n",_id,stop?1:0,m_index,time); +} + + +CPacketIndication * CMergeCapFileRec::GetUpdatedPacket(){ + double t1; + assert(GetCurPacket(t1)==false); + CFlowPktInfo * pkt = m_cap.GetPacket(m_index); + pkt->m_pkt_indication.m_packet->set_new_time(t1); + return (&pkt->m_pkt_indication); +} + + +bool CMergeCapFileRec::GetCurPacket(double & time){ + if (m_stop) { + return(true); + } + CFlowPktInfo * pkt = m_cap.GetPacket(m_index); + time= (pkt->m_packet->get_time() -m_start_time + m_offset); + return (false); +} + +void CMergeCapFileRec::IncPacket(){ + m_index++; + if ( (m_limit_number_of_packets) && (m_index > m_limit_number_of_packets ) ) { + m_stop=true; + return; + } + + if ( m_index == (int)m_cap.Size() ) { + m_stop=true; + } +} + +void CMergeCapFileRec::Delete(){ + m_cap.Delete(); +} + +bool CMergeCapFileRec::Create(std::string cap_file, + double offset){ + m_cap.Create(); + m_cap.load_cap_file(cap_file,0,0); + CFlowPktInfo * pkt = m_cap.GetPacket(0); + + m_index=0; + m_stop=false; + m_limit_number_of_packets =0; + m_start_time = pkt->m_packet->get_time() ; + m_offset = offset; + + return (true); +} + + + +#define MERGE_CAP_FILES (2) + +class CMergeCapFile { +public: + bool Create(); + void Delete(); + bool run_merge(std::string to_cap_file); +private: + void append(int _cap_id); + +public: + CMergeCapFileRec m[MERGE_CAP_FILES]; + CCapFileFlowInfo m_results; +}; + +bool CMergeCapFile::Create(){ + m_results.Create(); + return(true); +} + +void CMergeCapFile::Delete(){ + m_results.Delete(); +} + +void CMergeCapFile::append(int _cap_id){ + CPacketIndication * lp=m[_cap_id].GetUpdatedPacket(); + lp->m_packet->Dump(stdout,0); + m_results.Append(lp); +} + + +bool CMergeCapFile::run_merge(std::string to_cap_file){ + + int i=0; + int cnt=0; + while ( true ) { + int min_index=0; + double min_time; + + fprintf(stdout," --------------\n"); + fprintf(stdout," pkt : %d \n",cnt); + for (i=0; i<MERGE_CAP_FILES; i++) { + m[i].Dump(stdout,i); + } + fprintf(stdout," --------------\n"); + + bool valid = false; + for (i=0; i<MERGE_CAP_FILES; i++) { + double t1; + if ( m[i].GetCurPacket(t1) == false ){ + /* not in stop */ + if (!valid) { + min_time = t1; + min_index = i; + valid=true; + }else{ + if (t1 < min_time) { + min_time=t1; + min_index = i; + } + } + + } + } + + /* nothing to do */ + if (valid==false) { + fprintf(stdout,"nothing to do \n"); + break; + } + + cnt++; + fprintf(stdout," choose id %d \n",min_index); + append(min_index); + m[min_index].IncPacket(); + }; + + m_results.save_to_erf(to_cap_file,1); + + return (true); +} + + + +int merge_3_cap_files() { + time_init(); + CGlobalInfo::init_pools(1000); + + CMergeCapFile merger; + merger.Create(); + merger.m[0].Create("exp/c.pcap",0.001); + merger.m[1].Create("avl/delay_10_rtp_160k_0.pcap",0.31); + merger.m[2].Create("avl/delay_10_rtp_160k_1.pcap",0.311); + + //merger.m[1].Create("avl/delay_10_rtp_250k_0_0.pcap",0.31); + //merger.m[1].m_limit_number_of_packets =6; + //merger.m[2].Create("avl/delay_10_rtp_250k_1_0.pcap",0.311); + //merger.m[2].m_limit_number_of_packets =6; + + merger.run_merge("exp/delay_10_rtp_160k_full.pcap"); + + return (0); +} + +int merge_2_cap_files_sip() { + time_init(); + CGlobalInfo::init_pools(1000); + + CMergeCapFile merger; + merger.Create(); + merger.m[0].Create("exp/delay_sip_0_fixed_1.pcap",0.001); + merger.m[1].Create("avl/delay_video_call_rtp_0.pcap",0.51); + //merger.m[1].m_limit_number_of_packets=7; + + //merger.m[1].Create("avl/delay_10_rtp_250k_0_0.pcap",0.31); + //merger.m[1].m_limit_number_of_packets =6; + //merger.m[2].Create("avl/delay_10_rtp_250k_1_0.pcap",0.311); + //merger.m[2].m_limit_number_of_packets =6; + + merger.run_merge("avl/delay_10_sip_video_call_full.pcap"); + + return (0); +} + +int +SimStateful::run() { + assert( CMsgIns::Ins()->Create(4) ); + return load_list_of_cap_files(&CGlobalInfo::m_options); +} diff --git a/src/sim/trex_sim_stateless.cpp b/src/sim/trex_sim_stateless.cpp new file mode 100644 index 00000000..2b73f686 --- /dev/null +++ b/src/sim/trex_sim_stateless.cpp @@ -0,0 +1,346 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2015 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include "trex_sim.h" +#include <trex_stateless.h> +#include <trex_stateless_messaging.h> +#include <trex_rpc_cmd_api.h> +#include <json/json.h> +#include <stdexcept> +#include <sstream> + +using namespace std; + +TrexStateless * get_stateless_obj() { + return SimStateless::get_instance().get_stateless_obj(); +} + + +class SimRunException : public std::runtime_error +{ +public: + SimRunException() : std::runtime_error("") { + + } + SimRunException(const std::string &what) : std::runtime_error(what) { + } +}; + +/*************** hook for platform API **************/ +class SimPlatformApi : public TrexPlatformApi { +public: + SimPlatformApi(int dp_core_count) { + m_dp_core_count = dp_core_count; + } + + virtual uint8_t get_dp_core_count() const { + return m_dp_core_count; + } + + virtual void get_global_stats(TrexPlatformGlobalStats &stats) const { + } + virtual void get_interface_info(uint8_t interface_id, std::string &driver_name, driver_speed_e &speed) const { + } + virtual void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const { + } + + virtual void port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list) const { + for (int i = 0; i < m_dp_core_count; i++) { + cores_id_list.push_back(std::make_pair(i, 0)); + } + } + +private: + int m_dp_core_count; +}; + +/** + * handler for DP to CP messages + * + * @author imarom (19-Nov-15) + */ +class DpToCpHandler { +public: + virtual void handle(TrexStatelessDpToCpMsgBase *msg) = 0; +}; + +/************************ + * + * stateless sim object + * +************************/ +class SimPublisher : public TrexPublisher { +public: + + /* override create */ + bool Create(uint16_t port, bool disable) { + return true; + } + + void Delete() { + + } + + void publish_json(const std::string &s) { + } + + virtual ~SimPublisher() { + } +}; + +/************************ + * + * stateless sim object + * +************************/ + +SimStateless::SimStateless() { + m_trex_stateless = NULL; + m_publisher = NULL; + m_dp_to_cp_handler = NULL; + m_verbose = false; + m_dp_core_count = -1; + m_dp_core_index = -1; + m_port_count = -1; + m_limit = 0; + + /* override ownership checks */ + TrexRpcCommand::test_set_override_ownership(true); +} + + +int +SimStateless::run(const string &json_filename, + const string &out_filename, + int port_count, + int dp_core_count, + int dp_core_index, + int limit) { + + assert(dp_core_count > 0); + + /* -1 means its not set or positive value between 0 and the dp core count - 1*/ + assert( (dp_core_index == -1) || ( (dp_core_index >=0 ) && (dp_core_index < dp_core_count) ) ); + + m_dp_core_count = dp_core_count; + m_dp_core_index = dp_core_index; + m_port_count = port_count; + m_limit = limit; + + prepare_dataplane(); + prepare_control_plane(); + + try { + execute_json(json_filename); + } catch (const SimRunException &e) { + std::cout << "*** test failed ***\n\n" << e.what() << "\n"; + return (-1); + } + + run_dp(out_filename); + + return 0; +} + + +SimStateless::~SimStateless() { + if (m_trex_stateless) { + delete m_trex_stateless; + m_trex_stateless = NULL; + } + + if (m_publisher) { + delete m_publisher; + m_publisher = NULL; + } + + m_fl.Delete(); +} + +/** + * prepare control plane for test + * + * @author imarom (28-Dec-15) + */ +void +SimStateless::prepare_control_plane() { + TrexStatelessCfg cfg; + + m_publisher = new SimPublisher(); + + TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_MOCK, 0); + + cfg.m_port_count = m_port_count; + cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg; + cfg.m_rpc_async_cfg = NULL; + cfg.m_rpc_server_verbose = false; + cfg.m_platform_api = new SimPlatformApi(m_dp_core_count); + cfg.m_publisher = m_publisher; + + m_trex_stateless = new TrexStateless(cfg); + + m_trex_stateless->launch_control_plane(); + + for (auto &port : m_trex_stateless->get_port_list()) { + port->acquire("test", 0, true); + } + +} + + +/** + * prepare the data plane for test + * + */ +void +SimStateless::prepare_dataplane() { + + CGlobalInfo::m_options.m_expected_portd = m_port_count; + + assert(CMsgIns::Ins()->Create(m_dp_core_count)); + m_fl.Create(); + m_fl.generate_p_thread_info(m_dp_core_count); + + for (int i = 0; i < m_dp_core_count; i++) { + m_fl.m_threads_info[i]->set_vif(&m_erf_vif); + } +} + + + +void +SimStateless::execute_json(const std::string &json_filename) { + + std::ifstream test(json_filename); + std::stringstream buffer; + buffer << test.rdbuf(); + + std::string rep = m_trex_stateless->get_rpc_server()->test_inject_request(buffer.str()); + + Json::Value root; + Json::Reader reader; + reader.parse(rep, root, false); + + if (is_verbose()) { + std::cout << "server response: \n\n" << root << "\n\n"; + } + + validate_response(root); + +} + +void +SimStateless::validate_response(const Json::Value &resp) { + std::stringstream ss; + + if (resp.isArray()) { + for (const auto &single : resp) { + if (single["error"] != Json::nullValue) { + ss << "failed with:\n\n" << single["error"] << "\n\n"; + throw SimRunException(ss.str()); + } + } + } else { + if (resp["error"] != Json::nullValue) { + ss << "failed with:\n\n" << resp["error"] << "\n\n"; + throw SimRunException(ss.str()); + } + } + +} + + +void +SimStateless::run_dp(const std::string &out_filename) { + uint64_t pkt_cnt = 0; + + if (m_dp_core_count == 1) { + pkt_cnt = run_dp_core(0, out_filename); + } else { + + /* do we have a specific core index to capture ? */ + if (m_dp_core_index != -1) { + for (int i = 0; i < m_dp_core_count; i++) { + if (i == m_dp_core_index) { + pkt_cnt += run_dp_core(i, out_filename); + } else { + run_dp_core(i, "/dev/null"); + } + } + } else { + for (int i = 0; i < m_dp_core_count; i++) { + std::stringstream ss; + ss << out_filename << "-" << i; + pkt_cnt += run_dp_core(i, ss.str()); + } + } + + } + + + std::cout << "\n"; + std::cout << "ports: " << m_port_count << "\n"; + std::cout << "cores: " << m_dp_core_count << "\n"; + + if (m_dp_core_index != -1) { + std::cout << "core index: " << m_dp_core_index << "\n"; + } else { + std::cout << "core index: merge all\n"; + } + + std::cout << "pkt limit: " << m_limit << "\n"; + std::cout << "\nwritten " << pkt_cnt << " packets " << "to '" << out_filename << "'\n\n"; +} + +uint64_t +SimStateless::run_dp_core(int core_index, const std::string &out_filename) { + + CFlowGenListPerThread *lpt = m_fl.m_threads_info[core_index]; + + lpt->start_stateless_simulation_file((std::string)out_filename, CGlobalInfo::m_options.preview, m_limit / m_dp_core_count); + lpt->start_stateless_daemon_simulation(); + + flush_dp_to_cp_messages_core(core_index); + + return lpt->m_node_gen.m_cnt; +} + + +void +SimStateless::flush_dp_to_cp_messages_core(int core_index) { + + CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(core_index); + + while ( true ) { + CGenNode * node = NULL; + if (ring->Dequeue(node) != 0) { + break; + } + assert(node); + + TrexStatelessDpToCpMsgBase * msg = (TrexStatelessDpToCpMsgBase *)node; + if (m_dp_to_cp_handler) { + m_dp_to_cp_handler->handle(msg); + } + + delete msg; + } +} diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h index 5c11be1e..59be9241 100644 --- a/src/stateless/cp/trex_stateless.h +++ b/src/stateless/cp/trex_stateless.h @@ -161,6 +161,10 @@ public: return m_ports; } + TrexRpcServer * get_rpc_server() { + return m_rpc_server; + } + protected: /* no copy or assignment */ @@ -168,7 +172,7 @@ protected: void operator=(TrexStateless const&) = delete; /* RPC server array */ - TrexRpcServer *m_rpc_server; + TrexRpcServer *m_rpc_server; /* ports */ std::vector <TrexStatelessPort *> m_ports; diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index 4e814d4c..a80efc08 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -377,8 +377,11 @@ void TrexStatelessDpCore::idle_state_loop() { while (m_state == STATE_IDLE) { - periodic_check_for_cp_messages(); - delay(200); + bool had_msg = periodic_check_for_cp_messages(); + /* if no message - backoff for some time */ + if (!had_msg) { + delay(200); + } } } diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h index 7dc4a2b2..efdb364c 100644 --- a/src/stateless/dp/trex_stateless_dp_core.h +++ b/src/stateless/dp/trex_stateless_dp_core.h @@ -185,12 +185,12 @@ public: * * @author imarom (27-Oct-15) */ - void periodic_check_for_cp_messages() { + bool periodic_check_for_cp_messages() { // doing this inline for performance reasons /* fast path */ if ( likely ( m_ring_from_cp->isEmpty() ) ) { - return; + return false; } while ( true ) { @@ -204,6 +204,8 @@ public: handle_cp_msg(msg); } + return true; + } /* quit the main loop, work in both stateless in stateful, don't free memory trigger from master */ |