summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rwxr-xr-xsrc/bp_sim.cpp36
-rwxr-xr-xsrc/bp_sim.h11
-rw-r--r--src/internal_api/trex_platform_api.h2
-rwxr-xr-xsrc/main.cpp739
-rwxr-xr-xsrc/main_dpdk.cpp146
-rw-r--r--src/mock/trex_platform_api_mock.cpp4
-rw-r--r--src/mock/trex_rpc_server_mock.cpp4
-rw-r--r--src/publisher/trex_publisher.h10
-rw-r--r--src/rpc-server/trex_rpc_async_server.cpp4
-rw-r--r--src/rpc-server/trex_rpc_async_server.h1
-rw-r--r--src/rpc-server/trex_rpc_cmd.cpp6
-rw-r--r--src/rpc-server/trex_rpc_cmd_api.h12
-rw-r--r--src/rpc-server/trex_rpc_req_resp_server.cpp56
-rw-r--r--src/rpc-server/trex_rpc_req_resp_server.h26
-rw-r--r--src/rpc-server/trex_rpc_server.cpp23
-rw-r--r--src/rpc-server/trex_rpc_server_api.h25
-rw-r--r--src/sim/trex_sim.h148
-rw-r--r--src/sim/trex_sim_stateful.cpp600
-rw-r--r--src/sim/trex_sim_stateless.cpp346
-rw-r--r--src/stateless/cp/trex_stateless.h6
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp7
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.h6
22 files changed, 1501 insertions, 717 deletions
diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp
index 63a3c144..fcef049c 100755
--- a/src/bp_sim.cpp
+++ b/src/bp_sim.cpp
@@ -3164,7 +3164,8 @@ int CNodeGenerator::open_file(std::string file_name,
/* ser preview mode */
m_v_if->set_review_mode(preview_mode);
m_v_if->open_file(file_name);
- m_cnt = 1;
+ m_cnt = 0;
+ m_limit = 0;
return (0);
}
@@ -3177,11 +3178,13 @@ int CNodeGenerator::close_file(CFlowGenListPerThread * thread){
}
int CNodeGenerator::update_stl_stats(CGenNodeStateless *node_sl){
+ m_cnt++;
+
if ( m_preview_mode.getVMode() >2 ){
fprintf(stdout," %4lu ,", (ulong)m_cnt);
node_sl->Dump(stdout);
- m_cnt++;
}
+
return (0);
}
@@ -3195,6 +3198,10 @@ int CNodeGenerator::update_stats(CGenNode * node){
return (0);
}
+bool CNodeGenerator::has_limit_reached() {
+ /* do we have a limit and has it passed ? */
+ return ( (m_limit > 0) && (m_cnt >= m_limit) );
+}
bool CFlowGenListPerThread::Create(uint32_t thread_id,
uint32_t core_id,
@@ -3217,7 +3224,9 @@ bool CFlowGenListPerThread::Create(uint32_t thread_id,
char name[100];
sprintf(name,"nodes-%d",m_core_id);
- printf(" create thread %d %s socket: %d \n",m_core_id,name,socket_id);
+
+ //printf(" create thread %d %s socket: %d \n",m_core_id,name,socket_id);
+
m_node_pool = utl_rte_mempool_create_non_pkt(name,
CGlobalInfo::m_memory_cfg.get_each_core_dp_flows(),
sizeof(CGenNode),
@@ -3225,7 +3234,8 @@ bool CFlowGenListPerThread::Create(uint32_t thread_id,
0 ,
socket_id);
- printf(" pool %p \n",m_node_pool);
+ //printf(" pool %p \n",m_node_pool);
+
m_node_gen.Create(this);
m_flow_id_to_node_lookup.Create();
@@ -3537,16 +3547,21 @@ int CNodeGenerator::flush_file(dsec_t max_time,
m_p_queue.pop();
CGenNodeStateless *node_sl = (CGenNodeStateless *)node;
- #ifdef _DEBUG
- update_stl_stats(node_sl);
- #endif
-
/* if the stream has been deactivated - end */
if ( unlikely( node_sl->is_mask_for_free() ) ) {
thread->free_node(node);
} else {
node_sl->handle(thread);
+
+ #ifdef _DEBUG
+ update_stl_stats(node_sl);
+ if (has_limit_reached()) {
+ thread->m_stateless_dp_info.stop_traffic(node_sl->get_port_id(), false, 0);
+ }
+ #endif
+
}
+
}else{
if ( likely( type == CGenNode::FLOW_PKT ) ) {
@@ -3987,9 +4002,11 @@ void CFlowGenListPerThread::check_msgs(void) {
void CFlowGenListPerThread::start_stateless_simulation_file(std::string erf_file_name,
- CPreviewMode &preview){
+ CPreviewMode &preview,
+ uint64_t limit){
m_preview_mode = preview;
m_node_gen.open_file(erf_file_name,&m_preview_mode);
+ m_node_gen.set_packet_limit(limit);
}
void CFlowGenListPerThread::stop_stateless_simulation_file(){
@@ -4000,7 +4017,6 @@ void CFlowGenListPerThread::start_stateless_daemon_simulation(){
m_cur_time_sec = 0;
m_stateless_dp_info.run_once();
-
}
diff --git a/src/bp_sim.h b/src/bp_sim.h
index 471d7639..4b906912 100755
--- a/src/bp_sim.h
+++ b/src/bp_sim.h
@@ -1932,6 +1932,12 @@ public:
add_node(node);
}
+ /**
+ * set packet limit for the generator
+ */
+ void set_packet_limit(uint64_t limit) {
+ m_limit = limit;
+ }
void DumpHist(FILE *fd){
fprintf(fd,"\n");
@@ -1950,7 +1956,7 @@ private:
}
int update_stats(CGenNode * node);
int update_stl_stats(CGenNodeStateless *node_sl);
-
+ bool has_limit_reached();
FORCE_NO_INLINE bool handle_slow_messages(uint8_t type,
CGenNode * node,
@@ -1966,6 +1972,7 @@ public:
CFlowGenListPerThread * m_parent;
CPreviewMode m_preview_mode;
uint64_t m_cnt;
+ uint64_t m_limit;
CTimeHistogram m_realtime_his;
};
@@ -3484,7 +3491,7 @@ public:
void start_stateless_daemon_simulation();
/* open a file for simulation */
- void start_stateless_simulation_file(std::string erf_file_name,CPreviewMode &preview);
+ void start_stateless_simulation_file(std::string erf_file_name,CPreviewMode &preview, uint64_t limit = 0);
/* close a file for simulation */
void stop_stateless_simulation_file();
diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h
index 343b8004..3ae49da8 100644
--- a/src/internal_api/trex_platform_api.h
+++ b/src/internal_api/trex_platform_api.h
@@ -138,7 +138,7 @@ public:
*/
class TrexMockPlatformApi : public TrexPlatformApi {
public:
- void port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list) const {}
+ void port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list) const;
void get_global_stats(TrexPlatformGlobalStats &stats) const;
void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const;
void get_interface_info(uint8_t interface_id, std::string &driver_name, driver_speed_e &speed) const {
diff --git a/src/main.cpp b/src/main.cpp
index 1b219a8c..ea8e1e44 100755
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -23,16 +23,35 @@ limitations under the License.
#include "bp_sim.h"
#include "os_time.h"
+#include <unordered_map>
+#include <string>
#include <common/arg/SimpleGlob.h>
#include <common/arg/SimpleOpt.h>
#include <stateless/cp/trex_stateless.h>
+#include <sim/trex_sim.h>
+using namespace std;
// An enum for all the option types
enum { OPT_HELP, OPT_CFG, OPT_NODE_DUMP, OP_STATS,
- OPT_FILE_OUT, OPT_UT, OPT_PCAP, OPT_IPV6, OPT_MAC_FILE};
-
+ OPT_FILE_OUT, OPT_UT, OPT_PCAP, OPT_IPV6, OPT_MAC_FILE,
+ OPT_SL, OPT_DP_CORE_COUNT, OPT_DP_CORE_INDEX, OPT_LIMIT};
+
+
+
+/**
+ * type of run
+ * GTEST
+ * Stateful
+ * Stateless
+ */
+typedef enum {
+ OPT_TYPE_GTEST = 7,
+ OPT_TYPE_SF,
+ OPT_TYPE_SL
+} opt_type_e;
+
/* these are the argument types:
SO_NONE -- no argument needed
@@ -41,24 +60,30 @@ enum { OPT_HELP, OPT_CFG, OPT_NODE_DUMP, OP_STATS,
*/
static CSimpleOpt::SOption parser_options[] =
{
- { OPT_HELP, "-?", SO_NONE },
- { OPT_HELP, "-h", SO_NONE },
- { OPT_HELP, "--help", SO_NONE },
- { OPT_UT, "--ut", SO_NONE },
- { OP_STATS, "-s", SO_NONE },
- { OPT_CFG, "-f", SO_REQ_SEP},
- { OPT_MAC_FILE, "--mac", SO_REQ_SEP},
- { OPT_FILE_OUT , "-o", SO_REQ_SEP },
- { OPT_NODE_DUMP , "-v", SO_REQ_SEP },
- { OPT_PCAP, "--pcap", SO_NONE },
- { OPT_IPV6, "--ipv6", SO_NONE },
+ { OPT_HELP, "-?", SO_NONE },
+ { OPT_HELP, "-h", SO_NONE },
+ { OPT_HELP, "--help", SO_NONE },
+ { OPT_UT, "--ut", SO_NONE },
+ { OP_STATS, "-s", SO_NONE },
+ { OPT_CFG, "-f", SO_REQ_SEP },
+ { OPT_MAC_FILE, "--mac", SO_REQ_SEP },
+ { OPT_FILE_OUT , "-o", SO_REQ_SEP },
+ { OPT_NODE_DUMP , "-v", SO_REQ_SEP },
+ { OPT_PCAP, "--pcap", SO_NONE },
+ { OPT_IPV6, "--ipv6", SO_NONE },
+ { OPT_SL, "--sl", SO_NONE },
+ { OPT_DP_CORE_COUNT, "--cores", SO_REQ_SEP },
+ { OPT_DP_CORE_INDEX, "--core_index", SO_REQ_SEP },
+ { OPT_LIMIT, "--limit", SO_REQ_SEP },
SO_END_OF_OPTIONS
};
-
+static bool in_range(int x, int low, int high) {
+ return ( (x >= low) && (x <= high) );
+}
static int usage(){
@@ -94,9 +119,12 @@ static int usage(){
return (0);
}
-int gtest_main(int argc, char **argv) ;
-static int parse_options(int argc, char *argv[], CParserOption* po, bool & is_gtest ) {
+static int parse_options(int argc,
+ char *argv[],
+ CParserOption* po,
+ std::unordered_map<std::string, int> &params) {
+
CSimpleOpt args(argc, argv, parser_options);
int a=0;
@@ -104,36 +132,63 @@ static int parse_options(int argc, char *argv[], CParserOption* po, bool & is_gt
po->preview.clean();
po->preview.setFileWrite(true);
+ /* by default - type is stateful */
+ params["type"] = OPT_TYPE_SF;
+
while ( args.Next() ){
if (args.LastError() == SO_SUCCESS) {
switch (args.OptionId()) {
case OPT_UT :
- is_gtest=true;
+ params["type"] = OPT_TYPE_GTEST;
return (0);
break;
+
case OPT_HELP:
usage();
return -1;
+
+ case OPT_SL:
+ params["type"] = OPT_TYPE_SL;
+ break;
+
case OPT_CFG:
po->cfg_file = args.OptionArg();
break;
+
case OPT_MAC_FILE:
po->mac_file = args.OptionArg();
break;
+
case OPT_FILE_OUT:
po->out_file = args.OptionArg();
break;
+
case OPT_IPV6:
po->preview.set_ipv6_mode_enable(true);
break;
+
case OPT_NODE_DUMP:
a=atoi(args.OptionArg());
node_dump=1;
po->preview.setFileWrite(false);
break;
+
case OPT_PCAP:
po->preview.set_pcap_mode_enable(true);
break;
+
+ case OPT_DP_CORE_COUNT:
+ params["dp_core_count"] = atoi(args.OptionArg());
+ break;
+
+ case OPT_DP_CORE_INDEX:
+ params["dp_core_index"] = atoi(args.OptionArg());
+ break;
+
+ case OPT_LIMIT:
+ params["limit"] = atoi(args.OptionArg());
+ break;
+
default:
usage();
return -1;
@@ -162,640 +217,74 @@ static int parse_options(int argc, char *argv[], CParserOption* po, bool & is_gt
return -1;
}
}
- return 0;
-}
-
-int cores=1;
-
-/*
-
-int curent_time(){
-
- time_init();
-
- int i;
- for (i=0; i<100000000; i++){
- now=now_sec();
- }
- return (0);
-}*/
-
-#ifdef LINUX
-
-
-
-#include <pthread.h>
-
-struct per_thread_t {
- pthread_t tid;
-};
-
-#define MAX_THREADS 200
-static per_thread_t tr_info[MAX_THREADS];
-
-
-//////////////
-
-struct test_t_info1 {
- CPreviewMode * preview_info;
- CFlowGenListPerThread * thread_info;
- uint32_t thread_id;
-};
-
-void * thread_task(void *info){
-
- test_t_info1 * obj =(test_t_info1 *)info;
-
- CFlowGenListPerThread * lpt=obj->thread_info;
-
- printf("start thread %d \n",obj->thread_id);
- //delay(obj->thread_id *3000);
- printf("-->start thread %d \n",obj->thread_id);
- if (1/*obj->thread_id ==3*/) {
-
- char buf[100];
- sprintf(buf,"my%d.erf",obj->thread_id);
- lpt->start_generate_stateful(buf,*obj->preview_info);
- lpt->m_node_gen.DumpHist(stdout);
- printf("end thread %d \n",obj->thread_id);
- }
-
- return (NULL);
-}
-
-
-void test_load_list_of_cap_files_linux(CParserOption * op){
-
- CFlowGenList fl;
- //CNullIF erf_vif;
- //CErfIF erf_vif;
-
- fl.Create();
-
- fl.load_from_yaml(op->cfg_file,cores);
- fl.DumpPktSize();
-
-
- fl.generate_p_thread_info(cores);
- CFlowGenListPerThread * lpt;
-
- /* set the ERF file */
- //fl.set_vif_all(&erf_vif);
-
- int i;
- for (i=0; i<cores; i++) {
- lpt=fl.m_threads_info[i];
- test_t_info1 * obj = new test_t_info1();
- obj->preview_info =&op->preview;
- obj->thread_info = fl.m_threads_info[i];
- obj->thread_id = i;
- CNullIF * erf_vif = new CNullIF();
- //CErfIF * erf_vif = new CErfIF();
-
- lpt->set_vif(erf_vif);
-
- assert(pthread_create( &tr_info[i].tid, NULL, thread_task, obj)==0);
- }
- for (i=0; i<cores; i++) {
- /* wait for all of them to stop */
- assert(pthread_join((pthread_t)tr_info[i].tid,NULL )==0);
- }
+ /* did the user configure dp core count or dp core index ? */
- printf("compare files \n");
- for (i=1; i<cores; i++) {
-
- CErfCmp cmp;
- char buf[100];
- sprintf(buf,"my%d.erf",i);
- char buf1[100];
- sprintf(buf1,"my%d.erf",0);
- if ( cmp.compare(std::string(buf),std::string(buf1)) != true ) {
- printf(" ERROR cap file is not ex !! \n");
- assert(0);
+ if (params.count("dp_core_count") > 0) {
+ if (!in_range(params["dp_core_count"], 1, 8)) {
+ printf("dp core count must be a value between 1 and 8\n");
+ return (-1);
}
- printf(" thread %d is ok \n",i);
}
- fl.Delete();
-}
-
-
-#endif
-
-/*************************************************************/
-void test_load_list_of_cap_files(CParserOption * op){
-
- CFlowGenList fl;
- CNullIF erf_vif;
-
- fl.Create();
-
- #define NUM 1
-
- fl.load_from_yaml(op->cfg_file,NUM);
- fl.DumpPktSize();
-
-
- fl.generate_p_thread_info(NUM);
- CFlowGenListPerThread * lpt;
-
- /* set the ERF file */
- //fl.set_vif_all(&erf_vif);
-
- int i;
- for (i=0; i<NUM; i++) {
- lpt=fl.m_threads_info[i];
- char buf[100];
- sprintf(buf,"my%d.erf",i);
- lpt->start_generate_stateful(buf,op->preview);
- lpt->m_node_gen.DumpHist(stdout);
- }
- //sprintf(buf,"my%d.erf",7);
- //lpt=fl.m_threads_info[7];
-
- //fl.Dump(stdout);
- fl.Delete();
-}
-
-int load_list_of_cap_files(CParserOption * op){
- CFlowGenList fl;
- fl.Create();
- fl.load_from_yaml(op->cfg_file,1);
- if ( op->preview.getVMode() >0 ) {
- fl.DumpCsv(stdout);
- }
- uint32_t start= os_get_time_msec();
-
- CErfIF erf_vif;
- //CNullIF erf_vif;
-
- fl.generate_p_thread_info(1);
- CFlowGenListPerThread * lpt;
- lpt=fl.m_threads_info[0];
- lpt->set_vif(&erf_vif);
-
- if ( (op->preview.getVMode() >1) || op->preview.getFileWrite() ) {
- lpt->start_generate_stateful(op->out_file,op->preview);
+ if (params.count("dp_core_index") > 0) {
+ if (!in_range(params["dp_core_index"], 0, params["dp_core_count"] - 1)) {
+ printf("dp core index must be a value between 0 and cores - 1\n");
+ return (-1);
+ }
}
- lpt->m_node_gen.DumpHist(stdout);
-
- uint32_t stop= os_get_time_msec();
- printf(" d time = %ul %ul \n",stop-start,os_get_time_freq());
- fl.Delete();
- return (0);
-}
-
-
-int test_dns(){
-
- time_init();
- CGlobalInfo::init_pools(1000);
-
- CParserOption po ;
-
- //po.cfg_file = "cap2/dns.yaml";
- //po.cfg_file = "cap2/sfr3.yaml";
- po.cfg_file = "cap2/sfr.yaml";
-
- po.preview.setVMode(0);
- po.preview.setFileWrite(true);
- #ifdef LINUX
- test_load_list_of_cap_files_linux(&po);
- #else
- test_load_list_of_cap_files(&po);
- #endif
- return (0);
+ return 0;
}
-void test_pkt_mbuf(void);
-
-void test_compare_files(void);
-#if 0
-static int b=0;
-static int c=0;
-static int d=0;
-
-int test_instructions(){
- int i;
- for (i=0; i<100000;i++) {
- b+=b+1;
- c+=+b+c+1;
- d+=+(b*2+1);
- }
- return (b+c+d);
-}
-
-#include <valgrind/callgrind.h>
-#endif
+int main(int argc , char * argv[]){
+ std::unordered_map<std::string, int> params;
-void update_tcp_seq_num(CCapFileFlowInfo * obj,
- int pkt_id,
- int size_change){
- CFlowPktInfo * pkt=obj->GetPacket(pkt_id);
- if ( pkt->m_pkt_indication.m_desc.IsUdp() ){
- /* nothing to do */
- return;
+ if ( parse_options(argc, argv, &CGlobalInfo::m_options , params) != 0) {
+ exit(-1);
}
- bool o_init=pkt->m_pkt_indication.m_desc.IsInitSide();
- TCPHeader * tcp ;
- int s= (int)obj->Size();
- int i;
-
- for (i=pkt_id+1; i<s; i++) {
+ opt_type_e type = (opt_type_e) params["type"];
- pkt=obj->GetPacket(i);
- tcp=pkt->m_pkt_indication.l4.m_tcp;
- bool init=pkt->m_pkt_indication.m_desc.IsInitSide();
- if (init == o_init) {
- /* same dir update the seq number */
- tcp->setSeqNumber (tcp->getSeqNumber ()+size_change);
-
- }else{
- /* update the ack number */
- tcp->setAckNumber (tcp->getAckNumber ()+size_change);
+ switch (type) {
+ case OPT_TYPE_GTEST:
+ {
+ SimGtest test;
+ return test.run(argc, argv);
}
- }
-}
-
-
-
-void change_pkt_len(CCapFileFlowInfo * obj,int pkt_id, int size ){
- CFlowPktInfo * pkt=obj->GetPacket(pkt_id);
-
- /* enlarge the packet size by 9 */
-
- char * p=pkt->m_packet->append(size);
- /* set it to 0xaa*/
- memmove(p+size-4,p-4,4); /* CRCbytes */
- memset(p-4,0x0a,size);
-
- /* refresh the pointers */
- pkt->m_pkt_indication.RefreshPointers();
-
- IPHeader * ipv4 = pkt->m_pkt_indication.l3.m_ipv4;
- ipv4->updateTotalLength (ipv4->getTotalLength()+size );
-
- /* update seq numbers if needed */
- update_tcp_seq_num(obj,pkt_id,size);
-}
-void dump_tcp_seq_num_(CCapFileFlowInfo * obj){
- int s= (int)obj->Size();
- int i;
- uint32_t i_seq;
- uint32_t r_seq;
-
- CFlowPktInfo * pkt=obj->GetPacket(0);
- TCPHeader * tcp = pkt->m_pkt_indication.l4.m_tcp;
- i_seq=tcp->getSeqNumber ();
-
- pkt=obj->GetPacket(1);
- tcp = pkt->m_pkt_indication.l4.m_tcp;
- r_seq=tcp->getSeqNumber ();
-
- for (i=2; i<s; i++) {
- uint32_t seq;
- uint32_t ack;
-
- pkt=obj->GetPacket(i);
- tcp=pkt->m_pkt_indication.l4.m_tcp;
- bool init=pkt->m_pkt_indication.m_desc.IsInitSide();
- seq=tcp->getSeqNumber ();
- ack=tcp->getAckNumber ();
- if (init) {
- seq=seq-i_seq;
- ack=ack-r_seq;
- }else{
- seq=seq-r_seq;
- ack=ack-i_seq;
+ case OPT_TYPE_SF:
+ {
+ SimStateful sf;
+ return sf.run();
}
- printf(" %4d ",i);
- if (!init) {
- printf(" ");
- }
- printf(" %s seq: %4d ack : %4d \n",init?"I":"R",seq,ack);
- }
-}
-
-
-int manipolate_capfile() {
- time_init();
- CGlobalInfo::init_pools(1000);
-
- CCapFileFlowInfo flow_info;
- flow_info.Create();
-
- flow_info.load_cap_file("avl/delay_10_rtsp_0.pcap",0,0);
-
- change_pkt_len(&flow_info,4-1 ,6);
- change_pkt_len(&flow_info,5-1 ,6);
- change_pkt_len(&flow_info,6-1 ,6+2);
- change_pkt_len(&flow_info,7-1 ,4);
- change_pkt_len(&flow_info,8-1 ,6+2);
- change_pkt_len(&flow_info,9-1 ,4);
- change_pkt_len(&flow_info,10-1,6);
- change_pkt_len(&flow_info,13-1,6);
- change_pkt_len(&flow_info,16-1,6);
- change_pkt_len(&flow_info,19-1,6);
-
- flow_info.save_to_erf("exp/c.pcap",1);
-
- return (1);
-}
-
-int manipolate_capfile_sip() {
- time_init();
- CGlobalInfo::init_pools(1000);
-
- CCapFileFlowInfo flow_info;
- flow_info.Create();
-
- flow_info.load_cap_file("avl/delay_10_sip_0.pcap",0,0);
-
- change_pkt_len(&flow_info,1-1 ,6+6);
- change_pkt_len(&flow_info,2-1 ,6+6);
-
- flow_info.save_to_erf("exp/delay_10_sip_0_fixed.pcap",1);
-
- return (1);
-}
-
-int manipolate_capfile_sip1() {
- time_init();
- CGlobalInfo::init_pools(1000);
-
- CCapFileFlowInfo flow_info;
- flow_info.Create();
-
- flow_info.load_cap_file("avl/delay_sip_0.pcap",0,0);
- flow_info.GetPacket(1);
-
- change_pkt_len(&flow_info,1-1 ,6+6+10);
-
- change_pkt_len(&flow_info,2-1 ,6+6+10);
-
- flow_info.save_to_erf("exp/delay_sip_0_fixed_1.pcap",1);
-
- return (1);
-}
-
-
-class CMergeCapFileRec {
-public:
-
- CCapFileFlowInfo m_cap;
-
- int m_index;
- int m_limit_number_of_packets; /* limit number of packets */
- bool m_stop; /* Do we have more packets */
-
- double m_offset; /* offset should be positive */
- double m_start_time;
-
-public:
- bool Create(std::string cap_file,double offset);
- void Delete();
- void IncPacket();
- bool GetCurPacket(double & time);
- CPacketIndication * GetUpdatedPacket();
-
- void Dump(FILE *fd,int _id);
-};
-
-
-void CMergeCapFileRec::Dump(FILE *fd,int _id){
- double time = 0.0;
- bool stop=GetCurPacket(time);
- fprintf (fd," id:%2d stop : %d index:%4d %3.4f \n",_id,stop?1:0,m_index,time);
-}
-
-
-CPacketIndication * CMergeCapFileRec::GetUpdatedPacket(){
- double t1;
- assert(GetCurPacket(t1)==false);
- CFlowPktInfo * pkt = m_cap.GetPacket(m_index);
- pkt->m_pkt_indication.m_packet->set_new_time(t1);
- return (&pkt->m_pkt_indication);
-}
-
-
-bool CMergeCapFileRec::GetCurPacket(double & time){
- if (m_stop) {
- return(true);
- }
- CFlowPktInfo * pkt = m_cap.GetPacket(m_index);
- time= (pkt->m_packet->get_time() -m_start_time + m_offset);
- return (false);
-}
-
-void CMergeCapFileRec::IncPacket(){
- m_index++;
- if ( (m_limit_number_of_packets) && (m_index > m_limit_number_of_packets ) ) {
- m_stop=true;
- return;
- }
-
- if ( m_index == (int)m_cap.Size() ) {
- m_stop=true;
- }
-}
-
-void CMergeCapFileRec::Delete(){
- m_cap.Delete();
-}
-
-bool CMergeCapFileRec::Create(std::string cap_file,
- double offset){
- m_cap.Create();
- m_cap.load_cap_file(cap_file,0,0);
- CFlowPktInfo * pkt = m_cap.GetPacket(0);
-
- m_index=0;
- m_stop=false;
- m_limit_number_of_packets =0;
- m_start_time = pkt->m_packet->get_time() ;
- m_offset = offset;
-
- return (true);
-}
-
-
-#define MERGE_CAP_FILES (2)
+ case OPT_TYPE_SL:
+ {
+ SimStateless &st = SimStateless::get_instance();
-class CMergeCapFile {
-public:
- bool Create();
- void Delete();
- bool run_merge(std::string to_cap_file);
-private:
- void append(int _cap_id);
-
-public:
- CMergeCapFileRec m[MERGE_CAP_FILES];
- CCapFileFlowInfo m_results;
-};
-
-bool CMergeCapFile::Create(){
- m_results.Create();
- return(true);
-}
-
-void CMergeCapFile::Delete(){
- m_results.Delete();
-}
-
-void CMergeCapFile::append(int _cap_id){
- CPacketIndication * lp=m[_cap_id].GetUpdatedPacket();
- lp->m_packet->Dump(stdout,0);
- m_results.Append(lp);
-}
-
-
-bool CMergeCapFile::run_merge(std::string to_cap_file){
-
- int i=0;
- int cnt=0;
- while ( true ) {
- int min_index=0;
- double min_time;
+ if (params.count("dp_core_count") == 0) {
+ params["dp_core_count"] = 1;
+ }
- fprintf(stdout," --------------\n");
- fprintf(stdout," pkt : %d \n",cnt);
- for (i=0; i<MERGE_CAP_FILES; i++) {
- m[i].Dump(stdout,i);
- }
- fprintf(stdout," --------------\n");
-
- bool valid = false;
- for (i=0; i<MERGE_CAP_FILES; i++) {
- double t1;
- if ( m[i].GetCurPacket(t1) == false ){
- /* not in stop */
- if (!valid) {
- min_time = t1;
- min_index = i;
- valid=true;
- }else{
- if (t1 < min_time) {
- min_time=t1;
- min_index = i;
- }
- }
+ if (params.count("dp_core_index") == 0) {
+ params["dp_core_index"] = -1;
+ }
+ if (params.count("limit") == 0) {
+ params["limit"] = 5000;
}
- }
- /* nothing to do */
- if (valid==false) {
- fprintf(stdout,"nothing to do \n");
- break;
+ return st.run(CGlobalInfo::m_options.cfg_file,
+ CGlobalInfo::m_options.out_file,
+ 2,
+ params["dp_core_count"],
+ params["dp_core_index"],
+ params["limit"]);
}
-
- cnt++;
- fprintf(stdout," choose id %d \n",min_index);
- append(min_index);
- m[min_index].IncPacket();
- };
-
- m_results.save_to_erf(to_cap_file,1);
-
- return (true);
-}
-
-
-
-int merge_3_cap_files() {
- time_init();
- CGlobalInfo::init_pools(1000);
-
- CMergeCapFile merger;
- merger.Create();
- merger.m[0].Create("exp/c.pcap",0.001);
- merger.m[1].Create("avl/delay_10_rtp_160k_0.pcap",0.31);
- merger.m[2].Create("avl/delay_10_rtp_160k_1.pcap",0.311);
-
- //merger.m[1].Create("avl/delay_10_rtp_250k_0_0.pcap",0.31);
- //merger.m[1].m_limit_number_of_packets =6;
- //merger.m[2].Create("avl/delay_10_rtp_250k_1_0.pcap",0.311);
- //merger.m[2].m_limit_number_of_packets =6;
-
- merger.run_merge("exp/delay_10_rtp_160k_full.pcap");
-
- return (0);
-}
-
-int merge_2_cap_files_sip() {
- time_init();
- CGlobalInfo::init_pools(1000);
-
- CMergeCapFile merger;
- merger.Create();
- merger.m[0].Create("exp/delay_sip_0_fixed_1.pcap",0.001);
- merger.m[1].Create("avl/delay_video_call_rtp_0.pcap",0.51);
- //merger.m[1].m_limit_number_of_packets=7;
-
- //merger.m[1].Create("avl/delay_10_rtp_250k_0_0.pcap",0.31);
- //merger.m[1].m_limit_number_of_packets =6;
- //merger.m[2].Create("avl/delay_10_rtp_250k_1_0.pcap",0.311);
- //merger.m[2].m_limit_number_of_packets =6;
-
- merger.run_merge("avl/delay_10_sip_video_call_full.pcap");
-
- return (0);
-}
-
-static TrexStateless *g_trex_stateless;
-
-
-TrexStateless * get_stateless_obj() {
- return g_trex_stateless;
-}
-
-extern "C" const char * get_build_date(void){
- return (__DATE__);
-}
-
-extern "C" const char * get_build_time(void){
- return (__TIME__ );
-}
-
-
-
-
-int main(int argc , char * argv[]){
-
- int res=0;
- time_init();
- CGlobalInfo::m_socket.Create(0);
- CGlobalInfo::init_pools(1000);
- assert( CMsgIns::Ins()->Create(4) );
-
-
- bool is_gtest=false;
-
- if ( parse_options(argc, argv, &CGlobalInfo::m_options , is_gtest) != 0){
- exit(-1);
}
-
- if ( is_gtest ) {
- res = gtest_main(argc, argv);
- }else{
- res = load_list_of_cap_files(&CGlobalInfo::m_options);
- }
-
- CMsgIns::Ins()->Free();
- CGlobalInfo::free_pools();
- CGlobalInfo::m_socket.Delete();
-
-
- return (res);
-
}
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index 16e36a61..3a31945f 100755
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -140,7 +140,7 @@ public:
return(false);
}
- virtual int configure_drop_queue(CPhyEthIF * _if)=0;
+ virtual int configure_drop_queue(CPhyEthIF * _if);
virtual void get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats)=0;
virtual void clear_extended_stats(CPhyEthIF * _if)=0;
virtual int wait_for_stable_link()=0;
@@ -174,14 +174,13 @@ public:
return (true);
}
+ virtual int configure_drop_queue(CPhyEthIF * _if);
virtual int configure_rx_filter_rules(CPhyEthIF * _if);
virtual bool is_hardware_support_drop_queue(){
return(true);
}
- virtual int configure_drop_queue(CPhyEthIF * _if);
-
virtual void get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats);
virtual void clear_extended_stats(CPhyEthIF * _if);
@@ -265,8 +264,6 @@ public:
virtual bool is_hardware_support_drop_queue(){
return(true);
}
- virtual int configure_drop_queue(CPhyEthIF * _if);
-
virtual void get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats);
virtual void clear_extended_stats(CPhyEthIF * _if);
virtual int wait_for_stable_link();
@@ -299,10 +296,6 @@ public:
virtual bool is_hardware_support_drop_queue(){
return(true);
}
- virtual int configure_drop_queue(CPhyEthIF * _if);
-
-
-
virtual void get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats);
virtual void clear_extended_stats(CPhyEthIF * _if);
virtual int wait_for_stable_link();
@@ -4549,6 +4542,53 @@ int update_global_info_from_platform_file(){
return (0);
}
+extern "C" int eal_cpu_detected(unsigned lcore_id);
+// return mask representing available cores
+int core_mask_calc() {
+ uint32_t mask = 0;
+ int lcore_id;
+
+ for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
+ if (eal_cpu_detected(lcore_id)) {
+ mask |= (1 << lcore_id);
+ }
+ }
+
+ return mask;
+}
+
+// Return number of set bits in i
+uint32_t num_set_bits(uint32_t i)
+{
+ i = i - ((i >> 1) & 0x55555555);
+ i = (i & 0x33333333) + ((i >> 2) & 0x33333333);
+ return (((i + (i >> 4)) & 0x0F0F0F0F) * 0x01010101) >> 24;
+}
+
+// sanity check if the cores we want to use really exist
+int core_mask_sanity(uint32_t wanted_core_mask) {
+ uint32_t calc_core_mask = core_mask_calc();
+ uint32_t wanted_core_num, calc_core_num;
+
+ wanted_core_num = num_set_bits(wanted_core_mask);
+ calc_core_num = num_set_bits(calc_core_mask);
+
+ if (wanted_core_num > calc_core_num) {
+ printf("Error: You have %d threads available, but you asked for %d threads.\n", calc_core_num, wanted_core_num);
+ printf(" Calculation is: -c <num>(%d) * dual ports (%d) + 1 master thread %s"
+ , CGlobalInfo::m_options.preview.getCores(), CGlobalInfo::m_options.get_expected_dual_ports()
+ , get_is_latency_thread_enable() ? "+1 latency thread (because of -l flag)\n" : "\n");
+ printf(" Maybe try smaller -c <num>.\n");
+ return -1;
+ }
+
+ if (wanted_core_mask != (wanted_core_mask & calc_core_mask)) {
+ printf ("Serious error: Something is wrong with the hardware. Wanted core mask is %x. Existing core mask is %x\n", wanted_core_mask, calc_core_mask);
+ return -1;
+ }
+
+ return 0;
+}
int update_dpdk_args(void){
@@ -4567,8 +4607,10 @@ int update_dpdk_args(void){
lpsock->dump(stdout);
}
-
sprintf(global_cores_str,"0x%llx",(unsigned long long)lpsock->get_cores_mask());
+ if (core_mask_sanity(strtol(global_cores_str, NULL, 16)) < 0) {
+ return -1;
+ }
/* set the DPDK options */
global_dpdk_args_num =7;
@@ -4678,7 +4720,10 @@ int main_test(int argc , char * argv[]){
CGlobalInfo::m_memory_cfg.Dump(stdout);
}
- update_dpdk_args();
+
+ if (update_dpdk_args() < 0) {
+ return -1;
+ }
CParserOption * po=&CGlobalInfo::m_options;
@@ -4786,6 +4831,11 @@ int main_test(int argc , char * argv[]){
//////////////////////////////////////////////////////////////////////////////////////////////
// driver section
//////////////////////////////////////////////////////////////////////////////////////////////
+int CTRexExtendedDriverBase::configure_drop_queue(CPhyEthIF * _if) {
+ uint8_t port_id=_if->get_rte_port_id();
+ return (rte_eth_dev_rx_queue_stop(port_id, 0));
+}
+
void wait_x_sec(int sec) {
int i;
printf(" wait %d sec ", sec);
@@ -4809,26 +4859,6 @@ int CTRexExtendedDriverBase1G::wait_for_stable_link(){
return(0);
}
-int CTRexExtendedDriverBase1G::configure_drop_queue(CPhyEthIF * _if){
- uint8_t protocol;
- if (CGlobalInfo::m_options.m_l_pkt_mode == 0) {
- protocol = IPPROTO_SCTP;
- } else {
- protocol = IPPROTO_ICMP;
- }
-
- _if->pci_reg_write( E1000_RXDCTL(0) , 0);
-
- /* enable filter to pass packet to rx queue 1 */
- _if->pci_reg_write( E1000_IMIR(0), 0x00020000);
- _if->pci_reg_write( E1000_IMIREXT(0), 0x00081000);
- _if->pci_reg_write( E1000_TTQF(0), protocol
- | 0x00008100 /* enable */
- | 0xE0010000 /* RX queue is 1 */
- );
- return (0);
-}
-
void CTRexExtendedDriverBase1G::update_configuration(port_cfg_t * cfg){
cfg->m_tx_conf.tx_thresh.pthresh = TX_PTHRESH_1G;
@@ -4840,10 +4870,32 @@ void CTRexExtendedDriverBase1G::update_global_config_fdir(port_cfg_t * cfg){
// Configuration is done in configure_rx_filter_rules by writing to registers
}
+// e1000 driver does not support the generic stop queue API, so we need to implement ourselves
+int CTRexExtendedDriverBase1G::configure_drop_queue(CPhyEthIF * _if) {
+ // Drop packets coming to RX queue 0
+ _if->pci_reg_write( E1000_RXDCTL(0) , 0);
+ return 0;
+}
+
int CTRexExtendedDriverBase1G::configure_rx_filter_rules(CPhyEthIF * _if){
uint16_t hops = get_rx_check_hops();
uint16_t v4_hops = (hops << 8)&0xff00;
+ uint8_t protocol;
+
+ if (CGlobalInfo::m_options.m_l_pkt_mode == 0) {
+ protocol = IPPROTO_SCTP;
+ } else {
+ protocol = IPPROTO_ICMP;
+ }
+ /* enable filter to pass packet to rx queue 1 */
+ _if->pci_reg_write( E1000_IMIR(0), 0x00020000);
+ _if->pci_reg_write( E1000_IMIREXT(0), 0x00081000);
+ _if->pci_reg_write( E1000_TTQF(0), protocol
+ | 0x00008100 /* enable */
+ | 0xE0010000 /* RX queue is 1 */
+ );
+
/* 16 : 12 MAC , (2)0x0800,2 | DW0 , DW1
6 bytes , TTL , PROTO | DW2=0 , DW3=0x0000FF06
@@ -4981,6 +5033,14 @@ int CTRexExtendedDriverBase10G::configure_rx_filter_rules(CPhyEthIF * _if){
uint16_t hops = get_rx_check_hops();
uint16_t v4_hops = (hops << 8)&0xff00;
+ /* enable rule 0 SCTP -> queue 1 for latency */
+ /* 1<<21 means that queue 1 is for SCTP */
+ _if->pci_reg_write(IXGBE_L34T_IMIR(0),(1<<21));
+ _if->pci_reg_write(IXGBE_FTQF(0),
+ IXGBE_FTQF_PROTOCOL_SCTP|
+ (IXGBE_FTQF_PRIORITY_MASK<<IXGBE_FTQF_PRIORITY_SHIFT)|
+ ((0x0f)<<IXGBE_FTQF_5TUPLE_MASK_SHIFT)|IXGBE_FTQF_QUEUE_ENABLE);
+
// IPv4: bytes being compared are {TTL, Protocol}
uint16_t ff_rules_v4[6]={
(uint16_t)(0xFF11 - v4_hops),
@@ -5036,21 +5096,6 @@ int CTRexExtendedDriverBase10G::configure_rx_filter_rules(CPhyEthIF * _if){
return (0);
}
-int CTRexExtendedDriverBase10G::configure_drop_queue(CPhyEthIF * _if){
- /* enable rule 0 SCTP -> queue 1 for latency */
- /* 1<<21 means that queue 1 is for SCTP */
- _if->pci_reg_write(IXGBE_L34T_IMIR(0),(1<<21));
-
- _if->pci_reg_write(IXGBE_FTQF(0),
- IXGBE_FTQF_PROTOCOL_SCTP|
- (IXGBE_FTQF_PRIORITY_MASK<<IXGBE_FTQF_PRIORITY_SHIFT)|
- ((0x0f)<<IXGBE_FTQF_5TUPLE_MASK_SHIFT)|IXGBE_FTQF_QUEUE_ENABLE);
-
- /* disable queue zero - default all traffic will go to here and will be dropped */
- _if->pci_reg_write( IXGBE_RXDCTL(0) , 0);
- return (0);
-}
-
void CTRexExtendedDriverBase10G::get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats){
int i;
@@ -5173,15 +5218,10 @@ int CTRexExtendedDriverBase40G::configure_rx_filter_rules(CPhyEthIF * _if){
add_rules(_if,RTE_ETH_FLOW_NONFRAG_IPV6_TCP, ttl);
}
- return (0);
-}
-
-
-int CTRexExtendedDriverBase40G::configure_drop_queue(CPhyEthIF * _if){
-
/* Configure queue for latency packets */
add_rules(_if,RTE_ETH_FLOW_NONFRAG_IPV4_OTHER, 255);
add_rules(_if,RTE_ETH_FLOW_NONFRAG_IPV4_SCTP, 255);
+
return (0);
}
@@ -5250,8 +5290,6 @@ void CTRexExtendedDriverBase1GVm::clear_extended_stats(CPhyEthIF * _if){
}
int CTRexExtendedDriverBase1GVm::configure_drop_queue(CPhyEthIF * _if){
-
-
return (0);
}
diff --git a/src/mock/trex_platform_api_mock.cpp b/src/mock/trex_platform_api_mock.cpp
index 54f71e10..416c4b69 100644
--- a/src/mock/trex_platform_api_mock.cpp
+++ b/src/mock/trex_platform_api_mock.cpp
@@ -47,3 +47,7 @@ TrexMockPlatformApi::get_dp_core_count() const {
return (1);
}
+void
+TrexMockPlatformApi::port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list) const {
+ cores_id_list.push_back(std::make_pair(0, 0));
+}
diff --git a/src/mock/trex_rpc_server_mock.cpp b/src/mock/trex_rpc_server_mock.cpp
index 0bdf6cf1..ecfa308d 100644
--- a/src/mock/trex_rpc_server_mock.cpp
+++ b/src/mock/trex_rpc_server_mock.cpp
@@ -78,10 +78,6 @@ find_free_tcp_port(uint16_t start_port = 5050) {
return port;
}
-TrexStateless * get_stateless_obj() {
- return g_trex_stateless;
-}
-
uint16_t gtest_get_mock_server_port() {
return g_rpc_port;
}
diff --git a/src/publisher/trex_publisher.h b/src/publisher/trex_publisher.h
index bd4392f7..52978476 100644
--- a/src/publisher/trex_publisher.h
+++ b/src/publisher/trex_publisher.h
@@ -34,9 +34,11 @@ public:
m_publisher = NULL;
}
- bool Create(uint16_t port, bool disable);
- void Delete();
- void publish_json(const std::string &s);
+ virtual ~TrexPublisher() {}
+
+ virtual bool Create(uint16_t port, bool disable);
+ virtual void Delete();
+ virtual void publish_json(const std::string &s);
enum event_type_e {
EVENT_PORT_STARTED = 0,
@@ -51,7 +53,7 @@ public:
};
- void publish_event(event_type_e type, const Json::Value &data = Json::nullValue);
+ virtual void publish_event(event_type_e type, const Json::Value &data = Json::nullValue);
private:
void show_zmq_last_error(const std::string &err);
diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp
index 46fe499b..6e5fbfc6 100644
--- a/src/rpc-server/trex_rpc_async_server.cpp
+++ b/src/rpc-server/trex_rpc_async_server.cpp
@@ -41,6 +41,10 @@ TrexRpcServerAsync::TrexRpcServerAsync(const TrexRpcServerConfig &cfg, std::mute
m_context = zmq_ctx_new();
}
+void
+TrexRpcServerAsync::_prepare() {
+}
+
/**
* publisher thread
*
diff --git a/src/rpc-server/trex_rpc_async_server.h b/src/rpc-server/trex_rpc_async_server.h
index 02d1490e..80d92c2f 100644
--- a/src/rpc-server/trex_rpc_async_server.h
+++ b/src/rpc-server/trex_rpc_async_server.h
@@ -36,6 +36,7 @@ public:
TrexRpcServerAsync(const TrexRpcServerConfig &cfg, std::mutex *lock = NULL);
protected:
+ void _prepare();
void _rpc_thread_cb();
void _stop_rpc_thread();
diff --git a/src/rpc-server/trex_rpc_cmd.cpp b/src/rpc-server/trex_rpc_cmd.cpp
index b5dd121c..aea7980b 100644
--- a/src/rpc-server/trex_rpc_cmd.cpp
+++ b/src/rpc-server/trex_rpc_cmd.cpp
@@ -32,7 +32,7 @@ TrexRpcCommand::run(const Json::Value &params, Json::Value &result) {
check_param_count(params, m_param_count, result);
- if (m_needs_ownership) {
+ if (m_needs_ownership && !g_test_override_ownership) {
verify_ownership(params, result);
}
@@ -372,3 +372,7 @@ TrexRpcCommand::generate_execute_err(Json::Value &result, const std::string &msg
throw (TrexRpcCommandException(TREX_RPC_CMD_EXECUTE_ERR));
}
+/**
+ * by default this is off
+ */
+bool TrexRpcCommand::g_test_override_ownership = false;
diff --git a/src/rpc-server/trex_rpc_cmd_api.h b/src/rpc-server/trex_rpc_cmd_api.h
index 7cbdf4ff..675d2900 100644
--- a/src/rpc-server/trex_rpc_cmd_api.h
+++ b/src/rpc-server/trex_rpc_cmd_api.h
@@ -89,6 +89,16 @@ public:
return m_name;
}
+ /**
+ * on test we enable this override
+ *
+ *
+ * @param enable
+ */
+ static void test_set_override_ownership(bool enable) {
+ g_test_override_ownership = enable;
+ }
+
virtual ~TrexRpcCommand() {}
protected:
@@ -241,6 +251,8 @@ protected:
std::string m_name;
int m_param_count;
bool m_needs_ownership;
+
+ static bool g_test_override_ownership;
};
#endif /* __TREX_RPC_CMD_API_H__ */
diff --git a/src/rpc-server/trex_rpc_req_resp_server.cpp b/src/rpc-server/trex_rpc_req_resp_server.cpp
index eb7825ac..1e8e177d 100644
--- a/src/rpc-server/trex_rpc_req_resp_server.cpp
+++ b/src/rpc-server/trex_rpc_req_resp_server.cpp
@@ -36,7 +36,10 @@ limitations under the License.
*
*/
TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock) : TrexRpcServerInterface(cfg, "req resp", lock) {
- /* ZMQ is not thread safe - this should be outside */
+
+}
+
+void TrexRpcServerReqRes::_prepare() {
m_context = zmq_ctx_new();
}
@@ -123,15 +126,28 @@ TrexRpcServerReqRes::fetch_one_request(std::string &msg) {
*/
void TrexRpcServerReqRes::_stop_rpc_thread() {
/* by calling zmq_term we signal the blocked thread to exit */
- zmq_term(m_context);
+ if (m_context) {
+ zmq_term(m_context);
+ }
}
+
/**
* handles a request given to the server
* respondes to the request
*/
void TrexRpcServerReqRes::handle_request(const std::string &request) {
+ std::string response_str = process_request(request);
+ zmq_send(m_socket, response_str.c_str(), response_str.size(), 0);
+}
+
+/**
+ * main processing of the request
+ *
+ */
+std::string TrexRpcServerReqRes::process_request(const std::string &request) {
+
std::vector<TrexJsonRpcV2ParsedObject *> commands;
Json::FastWriter writer;
@@ -175,8 +191,7 @@ void TrexRpcServerReqRes::handle_request(const std::string &request) {
verbose_json("Server Replied: ", response_str);
- zmq_send(m_socket, response_str.c_str(), response_str.size(), 0);
-
+ return response_str;
}
/**
@@ -198,3 +213,36 @@ TrexRpcServerReqRes::handle_server_error(const std::string &specific_err) {
zmq_send(m_socket, response_str.c_str(), response_str.size(), 0);
}
+
+
+
+std::string
+TrexRpcServerReqRes::test_inject_request(const std::string &req) {
+ return process_request(req);
+}
+
+
+/**
+ * MOCK req resp server
+ */
+TrexRpcServerReqResMock::TrexRpcServerReqResMock(const TrexRpcServerConfig &cfg) : TrexRpcServerReqRes(cfg) {
+}
+
+/**
+ * override start
+ *
+ */
+void
+TrexRpcServerReqResMock::start() {
+
+}
+
+
+/**
+ * override stop
+ */
+void
+TrexRpcServerReqResMock::stop() {
+
+}
+
diff --git a/src/rpc-server/trex_rpc_req_resp_server.h b/src/rpc-server/trex_rpc_req_resp_server.h
index 2876206c..97efbe08 100644
--- a/src/rpc-server/trex_rpc_req_resp_server.h
+++ b/src/rpc-server/trex_rpc_req_resp_server.h
@@ -34,13 +34,19 @@ public:
TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock = NULL);
+ /* for test purposes - bypass the ZMQ and inject a message */
+ std::string test_inject_request(const std::string &req);
+
protected:
+
+ void _prepare();
void _rpc_thread_cb();
void _stop_rpc_thread();
-private:
bool fetch_one_request(std::string &msg);
void handle_request(const std::string &request);
+ std::string process_request(const std::string &request);
+
void handle_server_error(const std::string &specific_err);
void *m_context;
@@ -48,4 +54,22 @@ private:
};
+/**
+ * a mock req resp server (for tests)
+ *
+ * @author imarom (03-Jan-16)
+ */
+class TrexRpcServerReqResMock : public TrexRpcServerReqRes {
+
+public:
+ TrexRpcServerReqResMock(const TrexRpcServerConfig &cfg);
+
+ /* override the interface functions */
+ virtual void start();
+ virtual void stop();
+
+
+};
+
#endif /* __TREX_RPC_REQ_RESP_API_H__ */
+
diff --git a/src/rpc-server/trex_rpc_server.cpp b/src/rpc-server/trex_rpc_server.cpp
index a14e6f97..1dfc4494 100644
--- a/src/rpc-server/trex_rpc_server.cpp
+++ b/src/rpc-server/trex_rpc_server.cpp
@@ -63,6 +63,9 @@ void TrexRpcServerInterface::start() {
verbose_msg("Starting RPC Server");
+ /* prepare for run */
+ _prepare();
+
m_thread = new std::thread(&TrexRpcServerInterface::_rpc_thread_cb, this);
if (!m_thread) {
throw TrexRpcException("unable to create RPC thread");
@@ -117,9 +120,18 @@ TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg,
const TrexRpcServerConfig *async_cfg,
std::mutex *lock) {
+ m_req_resp = NULL;
+
/* add the request response server */
if (req_resp_cfg) {
- m_servers.push_back(new TrexRpcServerReqRes(*req_resp_cfg, lock));
+
+ if (req_resp_cfg->get_protocol() == TrexRpcServerConfig::RPC_PROT_MOCK) {
+ m_req_resp = new TrexRpcServerReqResMock(*req_resp_cfg);
+ } else {
+ m_req_resp = new TrexRpcServerReqRes(*req_resp_cfg, lock);
+ }
+
+ m_servers.push_back(m_req_resp);
}
/* add async publisher */
@@ -166,3 +178,12 @@ void TrexRpcServer::set_verbose(bool verbose) {
}
}
+
+std::string
+TrexRpcServer::test_inject_request(const std::string &req_str) {
+ if (m_req_resp) {
+ return m_req_resp->test_inject_request(req_str);
+ } else {
+ return "";
+ }
+}
diff --git a/src/rpc-server/trex_rpc_server_api.h b/src/rpc-server/trex_rpc_server_api.h
index ff876ac4..1ab5dce9 100644
--- a/src/rpc-server/trex_rpc_server_api.h
+++ b/src/rpc-server/trex_rpc_server_api.h
@@ -29,8 +29,10 @@ limitations under the License.
#include <string>
#include <stdexcept>
#include <trex_rpc_exception_api.h>
+#include <json/json.h>
class TrexRpcServerInterface;
+class TrexRpcServerReqRes;
/**
* defines a configuration of generic RPC server
@@ -41,18 +43,19 @@ class TrexRpcServerConfig {
public:
enum rpc_prot_e {
- RPC_PROT_TCP
+ RPC_PROT_TCP,
+ RPC_PROT_MOCK
};
TrexRpcServerConfig(rpc_prot_e protocol, uint16_t port) : m_protocol(protocol), m_port(port) {
}
- uint16_t get_port() {
+ uint16_t get_port() const {
return m_port;
}
- rpc_prot_e get_protocol() {
+ rpc_prot_e get_protocol() const {
return m_protocol;
}
@@ -76,13 +79,13 @@ public:
* starts the server
*
*/
- void start();
+ virtual void start();
/**
* stops the server
*
*/
- void stop();
+ virtual void stop();
/**
* set verbose on or off
@@ -107,6 +110,7 @@ protected:
* instances implement this
*
*/
+ virtual void _prepare() = 0;
virtual void _rpc_thread_cb() = 0;
virtual void _stop_rpc_thread() = 0;
@@ -169,12 +173,23 @@ public:
}
+ /**
+ * allow injecting of a JSON and get a response
+ *
+ * @author imarom (27-Dec-15)
+ *
+ * @return std::string
+ */
+ std::string test_inject_request(const std::string &request_str);
private:
static std::string generate_handler();
std::vector<TrexRpcServerInterface *> m_servers;
+ // an alias to the req resp server
+ TrexRpcServerReqRes *m_req_resp;
+
bool m_verbose;
static const std::string s_server_uptime;
diff --git a/src/sim/trex_sim.h b/src/sim/trex_sim.h
new file mode 100644
index 00000000..a541ce01
--- /dev/null
+++ b/src/sim/trex_sim.h
@@ -0,0 +1,148 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+#ifndef __TREX_SIM_H__
+#define __TREX_SIM_H__
+
+#include <string>
+#include <os_time.h>
+#include <bp_sim.h>
+#include <json/json.h>
+
+int gtest_main(int argc, char **argv);
+
+class TrexStateless;
+class TrexPublisher;
+class DpToCpHandler;
+
+/**
+ * interface for a sim target
+ *
+ */
+class SimInterface {
+public:
+
+ SimInterface() {
+
+ time_init();
+ CGlobalInfo::m_socket.Create(0);
+ CGlobalInfo::init_pools(1000);
+ }
+
+ virtual ~SimInterface() {
+
+ CMsgIns::Ins()->Free();
+ CGlobalInfo::free_pools();
+ CGlobalInfo::m_socket.Delete();
+ }
+
+
+};
+
+/**
+ * gtest target
+ *
+ * @author imarom (28-Dec-15)
+ */
+class SimGtest : public SimInterface {
+public:
+
+ int run(int argc, char **argv) {
+ assert( CMsgIns::Ins()->Create(4) );
+ return gtest_main(argc, argv);
+ }
+};
+
+
+
+/**
+ * stateful target
+ *
+ */
+class SimStateful : public SimInterface {
+
+public:
+ int run();
+};
+
+
+
+/**
+ * target for sim stateless
+ *
+ * @author imarom (28-Dec-15)
+ */
+class SimStateless : public SimInterface {
+
+public:
+ static SimStateless& get_instance() {
+ static SimStateless instance;
+ return instance;
+ }
+
+
+ int run(const std::string &json_filename,
+ const std::string &out_filename,
+ int port_count,
+ int dp_core_count,
+ int dp_core_index,
+ int limit);
+
+ TrexStateless * get_stateless_obj() {
+ return m_trex_stateless;
+ }
+
+ void set_verbose(bool enable) {
+ m_verbose = enable;
+ }
+
+private:
+ SimStateless();
+ ~SimStateless();
+
+ void prepare_control_plane();
+ void prepare_dataplane();
+ void execute_json(const std::string &json_filename);
+
+ void run_dp(const std::string &out_filename);
+ uint64_t run_dp_core(int core_index, const std::string &out_filename);
+
+ void flush_dp_to_cp_messages_core(int core_index);
+
+ void validate_response(const Json::Value &resp);
+
+ bool is_verbose() {
+ return m_verbose;
+ }
+
+ TrexStateless *m_trex_stateless;
+ DpToCpHandler *m_dp_to_cp_handler;
+ TrexPublisher *m_publisher;
+ CFlowGenList m_fl;
+ CErfIFStl m_erf_vif;
+ bool m_verbose;
+
+ int m_port_count;
+ int m_dp_core_count;
+ int m_dp_core_index;
+ uint64_t m_limit;
+};
+
+#endif /* __TREX_SIM_H__ */
diff --git a/src/sim/trex_sim_stateful.cpp b/src/sim/trex_sim_stateful.cpp
new file mode 100644
index 00000000..88698cd1
--- /dev/null
+++ b/src/sim/trex_sim_stateful.cpp
@@ -0,0 +1,600 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#include "trex_sim.h"
+
+static int cores = 1;
+
+#ifdef LINUX
+
+
+
+#include <pthread.h>
+
+struct per_thread_t {
+ pthread_t tid;
+};
+
+#define MAX_THREADS 200
+static per_thread_t tr_info[MAX_THREADS];
+
+
+//////////////
+
+struct test_t_info1 {
+ CPreviewMode * preview_info;
+ CFlowGenListPerThread * thread_info;
+ uint32_t thread_id;
+};
+
+void * thread_task(void *info){
+
+ test_t_info1 * obj =(test_t_info1 *)info;
+
+ CFlowGenListPerThread * lpt=obj->thread_info;
+
+ printf("start thread %d \n",obj->thread_id);
+ //delay(obj->thread_id *3000);
+ printf("-->start thread %d \n",obj->thread_id);
+ if (1/*obj->thread_id ==3*/) {
+
+ char buf[100];
+ sprintf(buf,"my%d.erf",obj->thread_id);
+ lpt->start_generate_stateful(buf,*obj->preview_info);
+ lpt->m_node_gen.DumpHist(stdout);
+ printf("end thread %d \n",obj->thread_id);
+ }
+
+ return (NULL);
+}
+
+
+void test_load_list_of_cap_files_linux(CParserOption * op){
+
+ CFlowGenList fl;
+ //CNullIF erf_vif;
+ //CErfIF erf_vif;
+
+ fl.Create();
+
+ fl.load_from_yaml(op->cfg_file,cores);
+ fl.DumpPktSize();
+
+
+ fl.generate_p_thread_info(cores);
+ CFlowGenListPerThread * lpt;
+
+ /* set the ERF file */
+ //fl.set_vif_all(&erf_vif);
+
+ int i;
+ for (i=0; i<cores; i++) {
+ lpt=fl.m_threads_info[i];
+ test_t_info1 * obj = new test_t_info1();
+ obj->preview_info =&op->preview;
+ obj->thread_info = fl.m_threads_info[i];
+ obj->thread_id = i;
+ CNullIF * erf_vif = new CNullIF();
+ //CErfIF * erf_vif = new CErfIF();
+
+ lpt->set_vif(erf_vif);
+
+ assert(pthread_create( &tr_info[i].tid, NULL, thread_task, obj)==0);
+ }
+
+ for (i=0; i<cores; i++) {
+ /* wait for all of them to stop */
+ assert(pthread_join((pthread_t)tr_info[i].tid,NULL )==0);
+ }
+
+ printf("compare files \n");
+ for (i=1; i<cores; i++) {
+
+ CErfCmp cmp;
+ char buf[100];
+ sprintf(buf,"my%d.erf",i);
+ char buf1[100];
+ sprintf(buf1,"my%d.erf",0);
+ if ( cmp.compare(std::string(buf),std::string(buf1)) != true ) {
+ printf(" ERROR cap file is not ex !! \n");
+ assert(0);
+ }
+ printf(" thread %d is ok \n",i);
+ }
+
+ fl.Delete();
+}
+
+
+#endif
+
+/*************************************************************/
+void test_load_list_of_cap_files(CParserOption * op){
+
+ CFlowGenList fl;
+ CNullIF erf_vif;
+
+ fl.Create();
+
+ #define NUM 1
+
+ fl.load_from_yaml(op->cfg_file,NUM);
+ fl.DumpPktSize();
+
+
+ fl.generate_p_thread_info(NUM);
+ CFlowGenListPerThread * lpt;
+
+ /* set the ERF file */
+ //fl.set_vif_all(&erf_vif);
+
+ int i;
+ for (i=0; i<NUM; i++) {
+ lpt=fl.m_threads_info[i];
+ char buf[100];
+ sprintf(buf,"my%d.erf",i);
+ lpt->start_generate_stateful(buf,op->preview);
+ lpt->m_node_gen.DumpHist(stdout);
+ }
+ //sprintf(buf,"my%d.erf",7);
+ //lpt=fl.m_threads_info[7];
+
+ //fl.Dump(stdout);
+ fl.Delete();
+}
+
+int load_list_of_cap_files(CParserOption * op){
+ CFlowGenList fl;
+ fl.Create();
+ fl.load_from_yaml(op->cfg_file,1);
+ if ( op->preview.getVMode() >0 ) {
+ fl.DumpCsv(stdout);
+ }
+ uint32_t start= os_get_time_msec();
+
+ CErfIF erf_vif;
+ //CNullIF erf_vif;
+
+ fl.generate_p_thread_info(1);
+ CFlowGenListPerThread * lpt;
+ lpt=fl.m_threads_info[0];
+ lpt->set_vif(&erf_vif);
+
+ if ( (op->preview.getVMode() >1) || op->preview.getFileWrite() ) {
+ lpt->start_generate_stateful(op->out_file,op->preview);
+ }
+
+ lpt->m_node_gen.DumpHist(stdout);
+
+ uint32_t stop= os_get_time_msec();
+ printf(" d time = %ul %ul \n",stop-start,os_get_time_freq());
+ fl.Delete();
+ return (0);
+}
+
+
+int test_dns(){
+
+ time_init();
+ CGlobalInfo::init_pools(1000);
+
+ CParserOption po ;
+
+ //po.cfg_file = "cap2/dns.yaml";
+ //po.cfg_file = "cap2/sfr3.yaml";
+ po.cfg_file = "cap2/sfr.yaml";
+
+ po.preview.setVMode(0);
+ po.preview.setFileWrite(true);
+ #ifdef LINUX
+ test_load_list_of_cap_files_linux(&po);
+ #else
+ test_load_list_of_cap_files(&po);
+ #endif
+ return (0);
+}
+
+void test_pkt_mbuf(void);
+
+void test_compare_files(void);
+
+#if 0
+static int b=0;
+static int c=0;
+static int d=0;
+
+int test_instructions(){
+ int i;
+ for (i=0; i<100000;i++) {
+ b+=b+1;
+ c+=+b+c+1;
+ d+=+(b*2+1);
+ }
+ return (b+c+d);
+}
+
+#include <valgrind/callgrind.h>
+#endif
+
+
+void update_tcp_seq_num(CCapFileFlowInfo * obj,
+ int pkt_id,
+ int size_change){
+ CFlowPktInfo * pkt=obj->GetPacket(pkt_id);
+ if ( pkt->m_pkt_indication.m_desc.IsUdp() ){
+ /* nothing to do */
+ return;
+ }
+
+ bool o_init=pkt->m_pkt_indication.m_desc.IsInitSide();
+ TCPHeader * tcp ;
+ int s= (int)obj->Size();
+ int i;
+
+ for (i=pkt_id+1; i<s; i++) {
+
+ pkt=obj->GetPacket(i);
+ tcp=pkt->m_pkt_indication.l4.m_tcp;
+ bool init=pkt->m_pkt_indication.m_desc.IsInitSide();
+ if (init == o_init) {
+ /* same dir update the seq number */
+ tcp->setSeqNumber (tcp->getSeqNumber ()+size_change);
+
+ }else{
+ /* update the ack number */
+ tcp->setAckNumber (tcp->getAckNumber ()+size_change);
+ }
+ }
+}
+
+
+
+void change_pkt_len(CCapFileFlowInfo * obj,int pkt_id, int size ){
+ CFlowPktInfo * pkt=obj->GetPacket(pkt_id);
+
+ /* enlarge the packet size by 9 */
+
+ char * p=pkt->m_packet->append(size);
+ /* set it to 0xaa*/
+ memmove(p+size-4,p-4,4); /* CRCbytes */
+ memset(p-4,0x0a,size);
+
+ /* refresh the pointers */
+ pkt->m_pkt_indication.RefreshPointers();
+
+ IPHeader * ipv4 = pkt->m_pkt_indication.l3.m_ipv4;
+ ipv4->updateTotalLength (ipv4->getTotalLength()+size );
+
+ /* update seq numbers if needed */
+ update_tcp_seq_num(obj,pkt_id,size);
+}
+
+void dump_tcp_seq_num_(CCapFileFlowInfo * obj){
+ int s= (int)obj->Size();
+ int i;
+ uint32_t i_seq;
+ uint32_t r_seq;
+
+ CFlowPktInfo * pkt=obj->GetPacket(0);
+ TCPHeader * tcp = pkt->m_pkt_indication.l4.m_tcp;
+ i_seq=tcp->getSeqNumber ();
+
+ pkt=obj->GetPacket(1);
+ tcp = pkt->m_pkt_indication.l4.m_tcp;
+ r_seq=tcp->getSeqNumber ();
+
+ for (i=2; i<s; i++) {
+ uint32_t seq;
+ uint32_t ack;
+
+ pkt=obj->GetPacket(i);
+ tcp=pkt->m_pkt_indication.l4.m_tcp;
+ bool init=pkt->m_pkt_indication.m_desc.IsInitSide();
+ seq=tcp->getSeqNumber ();
+ ack=tcp->getAckNumber ();
+ if (init) {
+ seq=seq-i_seq;
+ ack=ack-r_seq;
+ }else{
+ seq=seq-r_seq;
+ ack=ack-i_seq;
+ }
+ printf(" %4d ",i);
+ if (!init) {
+ printf(" ");
+ }
+ printf(" %s seq: %4d ack : %4d \n",init?"I":"R",seq,ack);
+ }
+}
+
+
+int manipolate_capfile() {
+ time_init();
+ CGlobalInfo::init_pools(1000);
+
+ CCapFileFlowInfo flow_info;
+ flow_info.Create();
+
+ flow_info.load_cap_file("avl/delay_10_rtsp_0.pcap",0,0);
+
+ change_pkt_len(&flow_info,4-1 ,6);
+ change_pkt_len(&flow_info,5-1 ,6);
+ change_pkt_len(&flow_info,6-1 ,6+2);
+ change_pkt_len(&flow_info,7-1 ,4);
+ change_pkt_len(&flow_info,8-1 ,6+2);
+ change_pkt_len(&flow_info,9-1 ,4);
+ change_pkt_len(&flow_info,10-1,6);
+ change_pkt_len(&flow_info,13-1,6);
+ change_pkt_len(&flow_info,16-1,6);
+ change_pkt_len(&flow_info,19-1,6);
+
+ flow_info.save_to_erf("exp/c.pcap",1);
+
+ return (1);
+}
+
+int manipolate_capfile_sip() {
+ time_init();
+ CGlobalInfo::init_pools(1000);
+
+ CCapFileFlowInfo flow_info;
+ flow_info.Create();
+
+ flow_info.load_cap_file("avl/delay_10_sip_0.pcap",0,0);
+
+ change_pkt_len(&flow_info,1-1 ,6+6);
+ change_pkt_len(&flow_info,2-1 ,6+6);
+
+ flow_info.save_to_erf("exp/delay_10_sip_0_fixed.pcap",1);
+
+ return (1);
+}
+
+int manipolate_capfile_sip1() {
+ time_init();
+ CGlobalInfo::init_pools(1000);
+
+ CCapFileFlowInfo flow_info;
+ flow_info.Create();
+
+ flow_info.load_cap_file("avl/delay_sip_0.pcap",0,0);
+ flow_info.GetPacket(1);
+
+ change_pkt_len(&flow_info,1-1 ,6+6+10);
+
+ change_pkt_len(&flow_info,2-1 ,6+6+10);
+
+ flow_info.save_to_erf("exp/delay_sip_0_fixed_1.pcap",1);
+
+ return (1);
+}
+
+
+class CMergeCapFileRec {
+public:
+
+ CCapFileFlowInfo m_cap;
+
+ int m_index;
+ int m_limit_number_of_packets; /* limit number of packets */
+ bool m_stop; /* Do we have more packets */
+
+ double m_offset; /* offset should be positive */
+ double m_start_time;
+
+public:
+ bool Create(std::string cap_file,double offset);
+ void Delete();
+ void IncPacket();
+ bool GetCurPacket(double & time);
+ CPacketIndication * GetUpdatedPacket();
+
+ void Dump(FILE *fd,int _id);
+};
+
+
+void CMergeCapFileRec::Dump(FILE *fd,int _id){
+ double time = 0.0;
+ bool stop=GetCurPacket(time);
+ fprintf (fd," id:%2d stop : %d index:%4d %3.4f \n",_id,stop?1:0,m_index,time);
+}
+
+
+CPacketIndication * CMergeCapFileRec::GetUpdatedPacket(){
+ double t1;
+ assert(GetCurPacket(t1)==false);
+ CFlowPktInfo * pkt = m_cap.GetPacket(m_index);
+ pkt->m_pkt_indication.m_packet->set_new_time(t1);
+ return (&pkt->m_pkt_indication);
+}
+
+
+bool CMergeCapFileRec::GetCurPacket(double & time){
+ if (m_stop) {
+ return(true);
+ }
+ CFlowPktInfo * pkt = m_cap.GetPacket(m_index);
+ time= (pkt->m_packet->get_time() -m_start_time + m_offset);
+ return (false);
+}
+
+void CMergeCapFileRec::IncPacket(){
+ m_index++;
+ if ( (m_limit_number_of_packets) && (m_index > m_limit_number_of_packets ) ) {
+ m_stop=true;
+ return;
+ }
+
+ if ( m_index == (int)m_cap.Size() ) {
+ m_stop=true;
+ }
+}
+
+void CMergeCapFileRec::Delete(){
+ m_cap.Delete();
+}
+
+bool CMergeCapFileRec::Create(std::string cap_file,
+ double offset){
+ m_cap.Create();
+ m_cap.load_cap_file(cap_file,0,0);
+ CFlowPktInfo * pkt = m_cap.GetPacket(0);
+
+ m_index=0;
+ m_stop=false;
+ m_limit_number_of_packets =0;
+ m_start_time = pkt->m_packet->get_time() ;
+ m_offset = offset;
+
+ return (true);
+}
+
+
+
+#define MERGE_CAP_FILES (2)
+
+class CMergeCapFile {
+public:
+ bool Create();
+ void Delete();
+ bool run_merge(std::string to_cap_file);
+private:
+ void append(int _cap_id);
+
+public:
+ CMergeCapFileRec m[MERGE_CAP_FILES];
+ CCapFileFlowInfo m_results;
+};
+
+bool CMergeCapFile::Create(){
+ m_results.Create();
+ return(true);
+}
+
+void CMergeCapFile::Delete(){
+ m_results.Delete();
+}
+
+void CMergeCapFile::append(int _cap_id){
+ CPacketIndication * lp=m[_cap_id].GetUpdatedPacket();
+ lp->m_packet->Dump(stdout,0);
+ m_results.Append(lp);
+}
+
+
+bool CMergeCapFile::run_merge(std::string to_cap_file){
+
+ int i=0;
+ int cnt=0;
+ while ( true ) {
+ int min_index=0;
+ double min_time;
+
+ fprintf(stdout," --------------\n");
+ fprintf(stdout," pkt : %d \n",cnt);
+ for (i=0; i<MERGE_CAP_FILES; i++) {
+ m[i].Dump(stdout,i);
+ }
+ fprintf(stdout," --------------\n");
+
+ bool valid = false;
+ for (i=0; i<MERGE_CAP_FILES; i++) {
+ double t1;
+ if ( m[i].GetCurPacket(t1) == false ){
+ /* not in stop */
+ if (!valid) {
+ min_time = t1;
+ min_index = i;
+ valid=true;
+ }else{
+ if (t1 < min_time) {
+ min_time=t1;
+ min_index = i;
+ }
+ }
+
+ }
+ }
+
+ /* nothing to do */
+ if (valid==false) {
+ fprintf(stdout,"nothing to do \n");
+ break;
+ }
+
+ cnt++;
+ fprintf(stdout," choose id %d \n",min_index);
+ append(min_index);
+ m[min_index].IncPacket();
+ };
+
+ m_results.save_to_erf(to_cap_file,1);
+
+ return (true);
+}
+
+
+
+int merge_3_cap_files() {
+ time_init();
+ CGlobalInfo::init_pools(1000);
+
+ CMergeCapFile merger;
+ merger.Create();
+ merger.m[0].Create("exp/c.pcap",0.001);
+ merger.m[1].Create("avl/delay_10_rtp_160k_0.pcap",0.31);
+ merger.m[2].Create("avl/delay_10_rtp_160k_1.pcap",0.311);
+
+ //merger.m[1].Create("avl/delay_10_rtp_250k_0_0.pcap",0.31);
+ //merger.m[1].m_limit_number_of_packets =6;
+ //merger.m[2].Create("avl/delay_10_rtp_250k_1_0.pcap",0.311);
+ //merger.m[2].m_limit_number_of_packets =6;
+
+ merger.run_merge("exp/delay_10_rtp_160k_full.pcap");
+
+ return (0);
+}
+
+int merge_2_cap_files_sip() {
+ time_init();
+ CGlobalInfo::init_pools(1000);
+
+ CMergeCapFile merger;
+ merger.Create();
+ merger.m[0].Create("exp/delay_sip_0_fixed_1.pcap",0.001);
+ merger.m[1].Create("avl/delay_video_call_rtp_0.pcap",0.51);
+ //merger.m[1].m_limit_number_of_packets=7;
+
+ //merger.m[1].Create("avl/delay_10_rtp_250k_0_0.pcap",0.31);
+ //merger.m[1].m_limit_number_of_packets =6;
+ //merger.m[2].Create("avl/delay_10_rtp_250k_1_0.pcap",0.311);
+ //merger.m[2].m_limit_number_of_packets =6;
+
+ merger.run_merge("avl/delay_10_sip_video_call_full.pcap");
+
+ return (0);
+}
+
+int
+SimStateful::run() {
+ assert( CMsgIns::Ins()->Create(4) );
+ return load_list_of_cap_files(&CGlobalInfo::m_options);
+}
diff --git a/src/sim/trex_sim_stateless.cpp b/src/sim/trex_sim_stateless.cpp
new file mode 100644
index 00000000..2b73f686
--- /dev/null
+++ b/src/sim/trex_sim_stateless.cpp
@@ -0,0 +1,346 @@
+/*
+ Itay Marom
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#include "trex_sim.h"
+#include <trex_stateless.h>
+#include <trex_stateless_messaging.h>
+#include <trex_rpc_cmd_api.h>
+#include <json/json.h>
+#include <stdexcept>
+#include <sstream>
+
+using namespace std;
+
+TrexStateless * get_stateless_obj() {
+ return SimStateless::get_instance().get_stateless_obj();
+}
+
+
+class SimRunException : public std::runtime_error
+{
+public:
+ SimRunException() : std::runtime_error("") {
+
+ }
+ SimRunException(const std::string &what) : std::runtime_error(what) {
+ }
+};
+
+/*************** hook for platform API **************/
+class SimPlatformApi : public TrexPlatformApi {
+public:
+ SimPlatformApi(int dp_core_count) {
+ m_dp_core_count = dp_core_count;
+ }
+
+ virtual uint8_t get_dp_core_count() const {
+ return m_dp_core_count;
+ }
+
+ virtual void get_global_stats(TrexPlatformGlobalStats &stats) const {
+ }
+ virtual void get_interface_info(uint8_t interface_id, std::string &driver_name, driver_speed_e &speed) const {
+ }
+ virtual void get_interface_stats(uint8_t interface_id, TrexPlatformInterfaceStats &stats) const {
+ }
+
+ virtual void port_id_to_cores(uint8_t port_id, std::vector<std::pair<uint8_t, uint8_t>> &cores_id_list) const {
+ for (int i = 0; i < m_dp_core_count; i++) {
+ cores_id_list.push_back(std::make_pair(i, 0));
+ }
+ }
+
+private:
+ int m_dp_core_count;
+};
+
+/**
+ * handler for DP to CP messages
+ *
+ * @author imarom (19-Nov-15)
+ */
+class DpToCpHandler {
+public:
+ virtual void handle(TrexStatelessDpToCpMsgBase *msg) = 0;
+};
+
+/************************
+ *
+ * stateless sim object
+ *
+************************/
+class SimPublisher : public TrexPublisher {
+public:
+
+ /* override create */
+ bool Create(uint16_t port, bool disable) {
+ return true;
+ }
+
+ void Delete() {
+
+ }
+
+ void publish_json(const std::string &s) {
+ }
+
+ virtual ~SimPublisher() {
+ }
+};
+
+/************************
+ *
+ * stateless sim object
+ *
+************************/
+
+SimStateless::SimStateless() {
+ m_trex_stateless = NULL;
+ m_publisher = NULL;
+ m_dp_to_cp_handler = NULL;
+ m_verbose = false;
+ m_dp_core_count = -1;
+ m_dp_core_index = -1;
+ m_port_count = -1;
+ m_limit = 0;
+
+ /* override ownership checks */
+ TrexRpcCommand::test_set_override_ownership(true);
+}
+
+
+int
+SimStateless::run(const string &json_filename,
+ const string &out_filename,
+ int port_count,
+ int dp_core_count,
+ int dp_core_index,
+ int limit) {
+
+ assert(dp_core_count > 0);
+
+ /* -1 means its not set or positive value between 0 and the dp core count - 1*/
+ assert( (dp_core_index == -1) || ( (dp_core_index >=0 ) && (dp_core_index < dp_core_count) ) );
+
+ m_dp_core_count = dp_core_count;
+ m_dp_core_index = dp_core_index;
+ m_port_count = port_count;
+ m_limit = limit;
+
+ prepare_dataplane();
+ prepare_control_plane();
+
+ try {
+ execute_json(json_filename);
+ } catch (const SimRunException &e) {
+ std::cout << "*** test failed ***\n\n" << e.what() << "\n";
+ return (-1);
+ }
+
+ run_dp(out_filename);
+
+ return 0;
+}
+
+
+SimStateless::~SimStateless() {
+ if (m_trex_stateless) {
+ delete m_trex_stateless;
+ m_trex_stateless = NULL;
+ }
+
+ if (m_publisher) {
+ delete m_publisher;
+ m_publisher = NULL;
+ }
+
+ m_fl.Delete();
+}
+
+/**
+ * prepare control plane for test
+ *
+ * @author imarom (28-Dec-15)
+ */
+void
+SimStateless::prepare_control_plane() {
+ TrexStatelessCfg cfg;
+
+ m_publisher = new SimPublisher();
+
+ TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_MOCK, 0);
+
+ cfg.m_port_count = m_port_count;
+ cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg;
+ cfg.m_rpc_async_cfg = NULL;
+ cfg.m_rpc_server_verbose = false;
+ cfg.m_platform_api = new SimPlatformApi(m_dp_core_count);
+ cfg.m_publisher = m_publisher;
+
+ m_trex_stateless = new TrexStateless(cfg);
+
+ m_trex_stateless->launch_control_plane();
+
+ for (auto &port : m_trex_stateless->get_port_list()) {
+ port->acquire("test", 0, true);
+ }
+
+}
+
+
+/**
+ * prepare the data plane for test
+ *
+ */
+void
+SimStateless::prepare_dataplane() {
+
+ CGlobalInfo::m_options.m_expected_portd = m_port_count;
+
+ assert(CMsgIns::Ins()->Create(m_dp_core_count));
+ m_fl.Create();
+ m_fl.generate_p_thread_info(m_dp_core_count);
+
+ for (int i = 0; i < m_dp_core_count; i++) {
+ m_fl.m_threads_info[i]->set_vif(&m_erf_vif);
+ }
+}
+
+
+
+void
+SimStateless::execute_json(const std::string &json_filename) {
+
+ std::ifstream test(json_filename);
+ std::stringstream buffer;
+ buffer << test.rdbuf();
+
+ std::string rep = m_trex_stateless->get_rpc_server()->test_inject_request(buffer.str());
+
+ Json::Value root;
+ Json::Reader reader;
+ reader.parse(rep, root, false);
+
+ if (is_verbose()) {
+ std::cout << "server response: \n\n" << root << "\n\n";
+ }
+
+ validate_response(root);
+
+}
+
+void
+SimStateless::validate_response(const Json::Value &resp) {
+ std::stringstream ss;
+
+ if (resp.isArray()) {
+ for (const auto &single : resp) {
+ if (single["error"] != Json::nullValue) {
+ ss << "failed with:\n\n" << single["error"] << "\n\n";
+ throw SimRunException(ss.str());
+ }
+ }
+ } else {
+ if (resp["error"] != Json::nullValue) {
+ ss << "failed with:\n\n" << resp["error"] << "\n\n";
+ throw SimRunException(ss.str());
+ }
+ }
+
+}
+
+
+void
+SimStateless::run_dp(const std::string &out_filename) {
+ uint64_t pkt_cnt = 0;
+
+ if (m_dp_core_count == 1) {
+ pkt_cnt = run_dp_core(0, out_filename);
+ } else {
+
+ /* do we have a specific core index to capture ? */
+ if (m_dp_core_index != -1) {
+ for (int i = 0; i < m_dp_core_count; i++) {
+ if (i == m_dp_core_index) {
+ pkt_cnt += run_dp_core(i, out_filename);
+ } else {
+ run_dp_core(i, "/dev/null");
+ }
+ }
+ } else {
+ for (int i = 0; i < m_dp_core_count; i++) {
+ std::stringstream ss;
+ ss << out_filename << "-" << i;
+ pkt_cnt += run_dp_core(i, ss.str());
+ }
+ }
+
+ }
+
+
+ std::cout << "\n";
+ std::cout << "ports: " << m_port_count << "\n";
+ std::cout << "cores: " << m_dp_core_count << "\n";
+
+ if (m_dp_core_index != -1) {
+ std::cout << "core index: " << m_dp_core_index << "\n";
+ } else {
+ std::cout << "core index: merge all\n";
+ }
+
+ std::cout << "pkt limit: " << m_limit << "\n";
+ std::cout << "\nwritten " << pkt_cnt << " packets " << "to '" << out_filename << "'\n\n";
+}
+
+uint64_t
+SimStateless::run_dp_core(int core_index, const std::string &out_filename) {
+
+ CFlowGenListPerThread *lpt = m_fl.m_threads_info[core_index];
+
+ lpt->start_stateless_simulation_file((std::string)out_filename, CGlobalInfo::m_options.preview, m_limit / m_dp_core_count);
+ lpt->start_stateless_daemon_simulation();
+
+ flush_dp_to_cp_messages_core(core_index);
+
+ return lpt->m_node_gen.m_cnt;
+}
+
+
+void
+SimStateless::flush_dp_to_cp_messages_core(int core_index) {
+
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingDpToCp(core_index);
+
+ while ( true ) {
+ CGenNode * node = NULL;
+ if (ring->Dequeue(node) != 0) {
+ break;
+ }
+ assert(node);
+
+ TrexStatelessDpToCpMsgBase * msg = (TrexStatelessDpToCpMsgBase *)node;
+ if (m_dp_to_cp_handler) {
+ m_dp_to_cp_handler->handle(msg);
+ }
+
+ delete msg;
+ }
+}
diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h
index 5c11be1e..59be9241 100644
--- a/src/stateless/cp/trex_stateless.h
+++ b/src/stateless/cp/trex_stateless.h
@@ -161,6 +161,10 @@ public:
return m_ports;
}
+ TrexRpcServer * get_rpc_server() {
+ return m_rpc_server;
+ }
+
protected:
/* no copy or assignment */
@@ -168,7 +172,7 @@ protected:
void operator=(TrexStateless const&) = delete;
/* RPC server array */
- TrexRpcServer *m_rpc_server;
+ TrexRpcServer *m_rpc_server;
/* ports */
std::vector <TrexStatelessPort *> m_ports;
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index 4e814d4c..a80efc08 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -377,8 +377,11 @@ void
TrexStatelessDpCore::idle_state_loop() {
while (m_state == STATE_IDLE) {
- periodic_check_for_cp_messages();
- delay(200);
+ bool had_msg = periodic_check_for_cp_messages();
+ /* if no message - backoff for some time */
+ if (!had_msg) {
+ delay(200);
+ }
}
}
diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h
index 7dc4a2b2..efdb364c 100644
--- a/src/stateless/dp/trex_stateless_dp_core.h
+++ b/src/stateless/dp/trex_stateless_dp_core.h
@@ -185,12 +185,12 @@ public:
*
* @author imarom (27-Oct-15)
*/
- void periodic_check_for_cp_messages() {
+ bool periodic_check_for_cp_messages() {
// doing this inline for performance reasons
/* fast path */
if ( likely ( m_ring_from_cp->isEmpty() ) ) {
- return;
+ return false;
}
while ( true ) {
@@ -204,6 +204,8 @@ public:
handle_cp_msg(msg);
}
+ return true;
+
}
/* quit the main loop, work in both stateless in stateful, don't free memory trigger from master */