summaryrefslogtreecommitdiffstats
path: root/src/bp_sim.cpp
diff options
context:
space:
mode:
authorHanoh Haim <hhaim@cisco.com>2015-06-24 14:03:29 +0300
committerHanoh Haim <hhaim@cisco.com>2015-06-24 14:03:29 +0300
commit8b52a31ed2c299b759f330c4f976b9c70f5765f4 (patch)
tree9d6da5438b5b56b1d2d57e6c13494b4e65d000e7 /src/bp_sim.cpp
first version
Diffstat (limited to 'src/bp_sim.cpp')
-rwxr-xr-xsrc/bp_sim.cpp6622
1 files changed, 6622 insertions, 0 deletions
diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp
new file mode 100755
index 00000000..f81ef446
--- /dev/null
+++ b/src/bp_sim.cpp
@@ -0,0 +1,6622 @@
+/*
+ Hanoh Haim
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#include "bp_sim.h"
+#include "utl_json.h"
+#include "utl_yaml.h"
+#include "msg_manager.h"
+#include <common/basic_utils.h>
+
+
+#undef VALG
+
+#ifdef VALG
+#include <valgrind/callgrind.h>
+#endif
+
+
+CPluginCallback * CPluginCallback::callback;
+
+
+uint32_t getDualPortId(uint32_t thread_id){
+ return ( thread_id % (CGlobalInfo::m_options.get_expected_dual_ports()) );
+}
+
+
+
+CRteMemPool CGlobalInfo::m_mem_pool[MAX_SOCKETS_SUPPORTED];
+
+uint32_t CGlobalInfo::m_nodes_pool_size = 10*1024;
+CParserOption CGlobalInfo::m_options;
+CGlobalMemory CGlobalInfo::m_memory_cfg;
+CPlatformSocketInfo CGlobalInfo::m_socket;
+
+
+
+
+
+void CGlobalMemory::Dump(FILE *fd){
+ fprintf(fd," Total Memory : \n");
+
+ const std::string * names =get_mbuf_names();
+
+ uint32_t c_size=64;
+ uint32_t c_total=0;
+
+ int i=0;
+ for (i=0; i<MBUF_SIZE; i++) {
+ if ( (i>MBUF_2048) && (i<MBUF_DP_FLOWS)){
+ continue;
+ }
+ if ( i<TRAFFIC_MBUF_64 ){
+ c_total= m_mbuf[i] *c_size;
+ c_size=c_size*2;
+ }
+
+ fprintf(fd," %-40s : %lu \n",names[i].c_str(),m_mbuf[i]);
+ }
+ c_total += (m_mbuf[MBUF_DP_FLOWS] * sizeof(CGenNode));
+
+ fprintf(fd," %-40s : %lu \n","get_each_core_dp_flows",get_each_core_dp_flows());
+ fprintf(fd," %-40s : %s \n","Total memory",double_to_human_str(c_total,"bytes",KBYE_1024).c_str() );
+}
+
+
+void CGlobalMemory::set(const CPlatformMemoryYamlInfo &info,float mul){
+ int i;
+ for (i=0; i<MBUF_SIZE; i++) {
+ m_mbuf[i]=(uint32_t)((float)info.m_mbuf[i]*mul);
+ }
+ /* no need to multiply */
+ m_mbuf[MBUF_64] += info.m_mbuf[TRAFFIC_MBUF_64];
+ m_mbuf[MBUF_128] += info.m_mbuf[TRAFFIC_MBUF_128];
+ m_mbuf[MBUF_256] += info.m_mbuf[TRAFFIC_MBUF_256];
+ m_mbuf[MBUF_512] += info.m_mbuf[TRAFFIC_MBUF_512];
+ m_mbuf[MBUF_1024] += info.m_mbuf[TRAFFIC_MBUF_1024];
+ m_mbuf[MBUF_2048] += info.m_mbuf[TRAFFIC_MBUF_2048];
+}
+
+
+////////////////////////////////////////
+
+
+bool CPlatformSocketInfoNoConfig::is_sockets_enable(socket_id_t socket){
+ if ( socket==0 ) {
+ return(true);
+ }
+ return (false);
+}
+
+socket_id_t CPlatformSocketInfoNoConfig::max_num_active_sockets(){
+ return (1);
+}
+
+
+socket_id_t CPlatformSocketInfoNoConfig::port_to_socket(port_id_t port){
+ return (0);
+}
+
+
+void CPlatformSocketInfoNoConfig::set_latency_thread_is_enabled(bool enable){
+ m_latency_is_enabled = enable;
+}
+
+void CPlatformSocketInfoNoConfig::set_number_of_dual_ports(uint8_t num_dual_ports){
+ m_dual_if = num_dual_ports;
+}
+
+
+void CPlatformSocketInfoNoConfig::set_number_of_threads_per_ports(uint8_t num_threads){
+ m_threads_per_dual_if = num_threads;
+}
+
+bool CPlatformSocketInfoNoConfig::sanity_check(){
+ return (true);
+}
+
+/* return the core mask */
+uint64_t CPlatformSocketInfoNoConfig::get_cores_mask(){
+
+ uint32_t cores_number = m_threads_per_dual_if*m_dual_if;
+ if ( m_latency_is_enabled ) {
+ cores_number += 2;
+ }else{
+ cores_number += 1; /* only MASTER*/
+ }
+ int i;
+ int offset=0;
+ /* master */
+ uint32_t res=1;
+ uint32_t mask=(1<<(offset+1));
+ for (i=0; i<(cores_number-1); i++) {
+ res |= mask ;
+ mask = mask <<1;
+ }
+ return (res);
+}
+
+virtual_thread_id_t CPlatformSocketInfoNoConfig::thread_phy_to_virt(physical_thread_id_t phy_id){
+ return (phy_id);
+}
+
+physical_thread_id_t CPlatformSocketInfoNoConfig::thread_virt_to_phy(virtual_thread_id_t virt_id){
+ return (virt_id);
+}
+
+bool CPlatformSocketInfoNoConfig::thread_phy_is_master(physical_thread_id_t phy_id){
+ return (phy_id==0);
+}
+
+bool CPlatformSocketInfoNoConfig::thread_phy_is_latency(physical_thread_id_t phy_id){
+ return (phy_id==(m_threads_per_dual_if*m_dual_if+1));
+}
+
+
+void CPlatformSocketInfoNoConfig::dump(FILE *fd){
+ fprintf(fd," there is no configuration file given \n");
+}
+
+////////////////////////////////////////
+
+bool CPlatformSocketInfoConfig::Create(CPlatformCoresYamlInfo * platform){
+ m_platform=platform;
+ assert(m_platform);
+ assert(m_platform->m_is_exists);
+ reset();
+ return (true);
+}
+
+bool CPlatformSocketInfoConfig::init(){
+
+ /* iterate the sockets */
+ uint32_t num_threads=0;
+ uint32_t num_dual_if = m_platform->m_dual_if.size();
+
+ if ( m_num_dual_if > num_dual_if ){
+ printf("ERROR number of dual if %d is higher than defined in configuration file %d\n",
+ (int)m_num_dual_if,
+ (int)num_dual_if);
+ }
+
+ int i;
+ for (i=0; i<m_num_dual_if; i++) {
+ CPlatformDualIfYamlInfo * lp=&m_platform->m_dual_if[i];
+ if ( lp->m_socket>=MAX_SOCKETS_SUPPORTED ){
+ printf("ERROR socket %d is bigger than max %d \n",lp->m_socket,MAX_SOCKETS_SUPPORTED);
+ exit(1);
+ }
+
+ if (!m_sockets_enable[lp->m_socket] ) {
+ m_sockets_enable[lp->m_socket]=true;
+ m_sockets_enabled++;
+ }
+
+ m_socket_per_dual_if[i]=lp->m_socket;
+
+ /* learn how many threads per dual-if */
+ if (i==0) {
+ num_threads = lp->m_threads.size();
+ m_max_threads_per_dual_if = num_threads;
+ }else{
+ if (lp->m_threads.size() != num_threads) {
+ printf("ERROR number of threads per dual ports should be the same for all dual ports\n");
+ exit(1);
+ }
+ }
+
+ int j;
+
+ for (j=0; j<m_threads_per_dual_if; j++) {
+ uint8_t virt_thread = 1+ i + j*m_num_dual_if; /* virtual thread */
+ uint8_t phy_thread = lp->m_threads[j];
+
+ if (phy_thread>MAX_THREADS_SUPPORTED) {
+ printf("ERROR physical thread id is %d higher than max %d \n",phy_thread,MAX_THREADS_SUPPORTED);
+ exit(1);
+ }
+
+ if (virt_thread>MAX_THREADS_SUPPORTED) {
+ printf("ERROR virtual thread id is %d higher than max %d \n",virt_thread,MAX_THREADS_SUPPORTED);
+ exit(1);
+ }
+
+ if ( m_thread_phy_to_virtual[phy_thread] ){
+ printf("ERROR physical thread %d defined twice %d \n",phy_thread);
+ exit(1);
+ }
+ m_thread_phy_to_virtual[phy_thread]=virt_thread;
+ m_thread_virt_to_phy[virt_thread] =phy_thread;
+ }
+ }
+
+ if ( m_thread_phy_to_virtual[m_platform->m_master_thread] ){
+ printf("ERROR physical master thread %d already defined \n",m_platform->m_master_thread);
+ exit(1);
+ }
+
+ if ( m_thread_phy_to_virtual[m_platform->m_latency_thread] ){
+ printf("ERROR physical latency thread %d already defined \n",m_platform->m_latency_thread);
+ exit(1);
+ }
+
+ if (m_max_threads_per_dual_if < m_threads_per_dual_if ) {
+ printf("ERROR number of threads asked per dual if is %d lower than max %d \n",
+ (int)m_threads_per_dual_if,
+ (int)m_max_threads_per_dual_if);
+ exit(1);
+ }
+ return (true);
+}
+
+
+void CPlatformSocketInfoConfig::dump(FILE *fd){
+ fprintf(fd," core_mask %x \n",get_cores_mask());
+ fprintf(fd," sockets :");
+ int i;
+ for (i=0; i<MAX_SOCKETS_SUPPORTED; i++) {
+ if ( is_sockets_enable(i) ){
+ fprintf(fd," %d ",i);
+ }
+ }
+ fprintf(fd," \n");
+ fprintf(fd," active sockets : %d \n",max_num_active_sockets());
+
+ fprintf(fd," ports_sockets : \n",max_num_active_sockets());
+
+ for (i=0; i<(MAX_LATENCY_PORTS); i++) {
+ fprintf(fd,"%d,",port_to_socket(i));
+ }
+ fprintf(fd,"\n");
+
+ fprintf(fd," phy | virt \n");
+ for (i=0; i<MAX_THREADS_SUPPORTED; i++) {
+ virtual_thread_id_t virt=thread_phy_to_virt(i);
+ if ( virt ){
+ fprintf(fd," %d %d \n",i,virt);
+ }
+ }
+}
+
+
+void CPlatformSocketInfoConfig::reset(){
+ m_sockets_enabled=0;
+ int i;
+ for (i=0; i<MAX_SOCKETS_SUPPORTED; i++) {
+ m_sockets_enable[i]=false;
+ }
+
+ for (i=0; i<MAX_THREADS_SUPPORTED; i++) {
+ m_thread_virt_to_phy[i]=0;
+ }
+ for (i=0; i<MAX_THREADS_SUPPORTED; i++) {
+ m_thread_phy_to_virtual[i]=0;
+ }
+ for (i=0; i<(MAX_LATENCY_PORTS>>1); i++) {
+ m_socket_per_dual_if[i]=0;
+ }
+
+ m_num_dual_if=0;
+
+ m_threads_per_dual_if=0;
+ m_latency_is_enabled=false;
+ m_max_threads_per_dual_if=0;
+}
+
+
+void CPlatformSocketInfoConfig::Delete(){
+
+}
+
+bool CPlatformSocketInfoConfig::is_sockets_enable(socket_id_t socket){
+ assert(socket<MAX_SOCKETS_SUPPORTED);
+ return ( m_sockets_enable[socket] );
+}
+
+socket_id_t CPlatformSocketInfoConfig::max_num_active_sockets(){
+ return ((socket_id_t)m_sockets_enabled);
+}
+
+socket_id_t CPlatformSocketInfoConfig::port_to_socket(port_id_t port){
+ return ( m_socket_per_dual_if[(port>>1)]);
+}
+
+void CPlatformSocketInfoConfig::set_latency_thread_is_enabled(bool enable){
+ m_latency_is_enabled =enable;
+}
+
+void CPlatformSocketInfoConfig::set_number_of_dual_ports(uint8_t num_dual_ports){
+ m_num_dual_if = num_dual_ports;
+}
+
+void CPlatformSocketInfoConfig::set_number_of_threads_per_ports(uint8_t num_threads){
+ m_threads_per_dual_if =num_threads;
+}
+
+bool CPlatformSocketInfoConfig::sanity_check(){
+ return (init());
+}
+
+/* return the core mask */
+uint64_t CPlatformSocketInfoConfig::get_cores_mask(){
+ int i;
+ uint64_t mask=0;
+ for (i=0; i<MAX_THREADS_SUPPORTED; i++) {
+ if ( m_thread_phy_to_virtual[i] ) {
+
+ if (i>=64) {
+ printf(" ERROR phy threads can't be higher than 64 \n");
+ exit(1);
+ }
+ mask |=(1<<i);
+ }
+ }
+
+ mask |=(1<<m_platform->m_master_thread);
+ assert(m_platform->m_master_thread<64);
+ if (m_latency_is_enabled) {
+ mask |=(1<<m_platform->m_latency_thread);
+ assert(m_platform->m_latency_thread<64);
+ }
+ return (mask);
+}
+
+virtual_thread_id_t CPlatformSocketInfoConfig::thread_phy_to_virt(physical_thread_id_t phy_id){
+ return (m_thread_phy_to_virtual[phy_id]);
+}
+
+physical_thread_id_t CPlatformSocketInfoConfig::thread_virt_to_phy(virtual_thread_id_t virt_id){
+ return ( m_thread_virt_to_phy[virt_id]);
+}
+
+bool CPlatformSocketInfoConfig::thread_phy_is_master(physical_thread_id_t phy_id){
+ return (m_platform->m_master_thread==phy_id?true:false);
+}
+
+bool CPlatformSocketInfoConfig::thread_phy_is_latency(physical_thread_id_t phy_id){
+ return (m_platform->m_latency_thread == phy_id?true:false);
+}
+
+
+
+////////////////////////////////////////
+
+
+bool CPlatformSocketInfo::Create(CPlatformCoresYamlInfo * platform){
+ if ( (platform) && (platform->m_is_exists) ) {
+ CPlatformSocketInfoConfig * lp=new CPlatformSocketInfoConfig();
+ assert(lp);
+ lp->Create(platform);
+ m_obj= lp;
+ }else{
+ m_obj= new CPlatformSocketInfoNoConfig();
+ }
+ return(true);
+}
+
+void CPlatformSocketInfo::Delete(){
+ if ( m_obj ){
+ delete m_obj;
+ m_obj=NULL;
+ }
+}
+
+bool CPlatformSocketInfo::is_sockets_enable(socket_id_t socket){
+ return ( m_obj->is_sockets_enable(socket) );
+}
+
+socket_id_t CPlatformSocketInfo::max_num_active_sockets(){
+ return ( m_obj->max_num_active_sockets() );
+}
+
+
+socket_id_t CPlatformSocketInfo::port_to_socket(port_id_t port){
+ return ( m_obj->port_to_socket(port) );
+}
+
+
+void CPlatformSocketInfo::set_latency_thread_is_enabled(bool enable){
+ m_obj->set_latency_thread_is_enabled(enable);
+}
+
+void CPlatformSocketInfo::set_number_of_dual_ports(uint8_t num_dual_ports){
+ m_obj->set_number_of_dual_ports(num_dual_ports);
+}
+
+void CPlatformSocketInfo::set_number_of_threads_per_ports(uint8_t num_threads){
+ m_obj->set_number_of_threads_per_ports(num_threads);
+}
+
+bool CPlatformSocketInfo::sanity_check(){
+ return ( m_obj->sanity_check());
+}
+
+/* return the core mask */
+uint64_t CPlatformSocketInfo::get_cores_mask(){
+ return ( m_obj->get_cores_mask());
+}
+
+virtual_thread_id_t CPlatformSocketInfo::thread_phy_to_virt(physical_thread_id_t phy_id){
+ return ( m_obj->thread_phy_to_virt(phy_id));
+}
+
+physical_thread_id_t CPlatformSocketInfo::thread_virt_to_phy(virtual_thread_id_t virt_id){
+ return ( m_obj->thread_virt_to_phy(virt_id));
+}
+
+bool CPlatformSocketInfo::thread_phy_is_master(physical_thread_id_t phy_id){
+ return ( m_obj->thread_phy_is_master(phy_id));
+}
+
+bool CPlatformSocketInfo::thread_phy_is_latency(physical_thread_id_t phy_id){
+ return ( m_obj->thread_phy_is_latency(phy_id));
+}
+
+void CPlatformSocketInfo::dump(FILE *fd){
+ m_obj->dump(fd);
+}
+
+////////////////////////////////////////
+
+
+void CRteMemPool::dump_in_case_of_error(FILE *fd){
+ fprintf(fd," ERROR ERROR there is no enough memory in socket %d \n",m_pool_id);
+ fprintf(fd," Try to enlarge the memory values in the configuration file /etc/trex_cfg.yaml \n");
+ dump(fd);
+}
+
+
+void CRteMemPool::dump(FILE *fd){
+ #define DUMP_MBUF(a,b) { float p=(100.0*(float)rte_mempool_count(b)/(float)b->size); fprintf(fd," %-30s : %.2f %% %s \n",a,p,(p<5.0?"<-":"OK") ); }
+
+ DUMP_MBUF("mbuf_64",m_small_mbuf_pool);
+ DUMP_MBUF("mbuf_128",m_mbuf_pool_128);
+ DUMP_MBUF("mbuf_256",m_mbuf_pool_256);
+ DUMP_MBUF("mbuf_512",m_mbuf_pool_512);
+ DUMP_MBUF("mbuf_1024",m_mbuf_pool_1024);
+ DUMP_MBUF("mbuf_2048",m_big_mbuf_pool);
+}
+////////////////////////////////////////
+
+void CGlobalInfo::init_pools(uint32_t rx_buffers){
+ /* this include the pkt from 64- */
+ CGlobalMemory * lp=&CGlobalInfo::m_memory_cfg;
+ CPlatformSocketInfo * lpSocket =&m_socket;
+
+ CRteMemPool * lpmem;
+
+ int i;
+ for (i=0; i<(int)MAX_SOCKETS_SUPPORTED; i++) {
+ if (lpSocket->is_sockets_enable((socket_id_t)i)) {
+ lpmem= &m_mem_pool[i];
+ lpmem->m_pool_id=i;
+
+ lpmem->m_big_mbuf_pool = utl_rte_mempool_create("big-pkt-const",
+ (lp->get_2k_num_blocks()+rx_buffers),
+ CONST_MBUF_SIZE,
+ 32,
+ (i<<5)+ 1,i);
+ assert(lpmem->m_big_mbuf_pool);
+
+ /* this include the packet from 0-64 this is for small packets */
+ lpmem->m_small_mbuf_pool =utl_rte_mempool_create("small-pkt-const",
+ lp->m_mbuf[MBUF_64],
+ CONST_SMALL_MBUF_SIZE,
+ 32,(i<<5)+ 2,i);
+ assert(lpmem->m_small_mbuf_pool);
+
+
+
+
+ lpmem->m_mbuf_pool_128=utl_rte_mempool_create("_128-pkt-const",
+ lp->m_mbuf[MBUF_128],
+ CONST_128_MBUF_SIZE,
+ 32,(i<<5)+ 6,i);
+
+
+ assert(lpmem->m_mbuf_pool_128);
+
+
+ lpmem->m_mbuf_pool_256=utl_rte_mempool_create("_256-pkt-const",
+ lp->m_mbuf[MBUF_256],
+ CONST_256_MBUF_SIZE,
+ 32,(i<<5)+ 3,i);
+
+ assert(lpmem->m_mbuf_pool_256);
+
+ lpmem->m_mbuf_pool_512=utl_rte_mempool_create("_512_-pkt-const",
+ lp->m_mbuf[MBUF_512],
+ CONST_512_MBUF_SIZE,
+ 32,(i<<5)+ 4,i);
+ assert(lpmem->m_mbuf_pool_512);
+
+ lpmem->m_mbuf_pool_1024=utl_rte_mempool_create("_1024-pkt-const",
+ lp->m_mbuf[MBUF_1024],
+ CONST_1024_MBUF_SIZE,
+ 32,(i<<5)+ 5,i);
+
+ assert(lpmem->m_mbuf_pool_1024);
+
+
+ }
+ }
+
+ /* global always from socket 0 */
+ m_mem_pool[0].m_mbuf_global_nodes = utl_rte_mempool_create_non_pkt("global-nodes",
+ lp->m_mbuf[MBUF_GLOBAL_FLOWS],
+ sizeof(CGenNode),
+ 128,
+ 0 ,
+ SOCKET_ID_ANY);
+
+ assert(m_mem_pool[0].m_mbuf_global_nodes);
+
+
+}
+
+
+
+void CFlowYamlInfo::Dump(FILE *fd){
+ fprintf(fd,"name : %s \n",m_name.c_str());
+ fprintf(fd,"cps : %f \n",m_k_cps);
+ fprintf(fd,"ipg : %f \n",m_ipg_sec);
+ fprintf(fd,"rtt : %f \n",m_rtt_sec);
+ fprintf(fd,"w : %d \n",m_w);
+ fprintf(fd,"wlength : %d \n",m_wlength);
+ fprintf(fd,"limit : %d \n",m_limit);
+ fprintf(fd,"limit_was_set : %d \n",m_limit_was_set?1:0);
+ fprintf(fd,"cap_mode : %d \n",m_cap_mode?1:0);
+ fprintf(fd,"plugin_id : %d \n",m_plugin_id);
+ fprintf(fd,"one_server : %d \n",m_one_app_server?1:0);
+ fprintf(fd,"one_server_was_set : %d \n",m_one_app_server_was_set?1:0);
+ if (m_dpPkt) {
+ m_dpPkt->Dump(fd);
+ }
+}
+
+
+
+
+void dump_mac_addr(FILE* fd,uint8_t *p){
+ int i;
+ for (i=0; i<6; i++) {
+ uint8_t a=p[i];
+ if (i==5) {
+ fprintf(fd,"%02x",a);
+ }else{
+ fprintf(fd,"%02x:",a);
+ }
+ }
+
+}
+
+
+
+static uint8_t human_tbl[]={
+ ' ',
+ 'K',
+ 'M',
+ 'G',
+ 'T'
+};
+
+std::string double_to_human_str(double num,
+ std::string units,
+ human_kbyte_t etype){
+ double abs_num=num;
+ if (num<0.0) {
+ abs_num=-num;
+ }
+ int i=0;
+ int max_cnt=sizeof(human_tbl)/sizeof(human_tbl[0]);
+ double div =1.0;
+ double f=1000.0;
+ if (etype ==KBYE_1024){
+ f=1024.0;
+ }
+ while ((abs_num > f ) && (i< max_cnt)){
+ abs_num/=f;
+ div*=f;
+ i++;
+ }
+
+ char buf [100];
+ sprintf(buf,"%10.2f %c%s",num/div,human_tbl[i],units.c_str());
+ std::string res(buf);
+ return (res);
+}
+
+
+void CPreviewMode::Dump(FILE *fd){
+ fprintf(fd," flags : %x\n", m_flags);
+ fprintf(fd," write_file : %d\n", getFileWrite()?1:0);
+ fprintf(fd," verbose : %d\n", (int)getVMode() );
+ fprintf(fd," realtime : %d\n", (int)getRealTime() );
+ fprintf(fd," flip : %d\n", (int)getClientServerFlip() );
+ fprintf(fd," cores : %d\n", (int)getCores() );
+ fprintf(fd," single core : %d\n", (int)getSingleCore() );
+ fprintf(fd," flow-flip : %d\n", (int)getClientServerFlowFlip() );
+ fprintf(fd," no clean close : %d\n", (int)getNoCleanFlowClose() );
+ fprintf(fd," 1g mode : %d\n", (int)get_1g_mode() );
+ fprintf(fd," zmq_publish : %d\n", (int)get_zmq_publish_enable() );
+ fprintf(fd," vlan_enable : %d\n", (int)get_vlan_mode_enable() );
+ fprintf(fd," mbuf_cache_disable : %d\n", (int)isMbufCacheDisabled() );
+ fprintf(fd," mac_ip_features : %d\n", (int)get_mac_ip_features_enable()?1:0 );
+ fprintf(fd," mac_ip_map : %d\n", (int)get_mac_ip_mapping_enable()?1:0 );
+ fprintf(fd," vm mode : %d\n", (int)get_vm_one_queue_enable()?1:0 );
+}
+
+void CFlowGenStats::clear(){
+ m_nat_lookup_no_flow_id=0;
+ m_total_bytes=0;
+ m_total_pkt=0;
+ m_total_open_flows =0;
+ m_total_close_flows =0;
+ m_nat_lookup_no_flow_id=0;
+ m_nat_lookup_remove_flow_id=0;
+ m_nat_lookup_add_flow_id=0;
+ m_nat_flow_timeout=0;
+ m_nat_flow_learn_error=0;
+}
+
+void CFlowGenStats::dump(FILE *fd){
+ std::string s_bytes=double_to_human_str((double )(m_total_bytes),
+ "bytes",
+ KBYE_1024);
+
+ std::string s_pkt=double_to_human_str((double )(m_total_pkt),
+ "pkt",
+ KBYE_1000);
+
+ std::string s_flows=double_to_human_str((double )(m_total_open_flows),
+ "flows",
+ KBYE_1000);
+
+ DP_S(m_total_bytes,s_bytes);
+ DP_S(m_total_pkt,s_pkt);
+ DP_S(m_total_open_flows,s_flows);
+ DP(m_total_pkt);
+ DP(m_total_open_flows);
+ DP(m_total_close_flows);
+ DP_name("active",(m_total_open_flows-m_total_close_flows));
+ DP(m_total_bytes);
+ DP(m_nat_lookup_no_flow_id);
+
+ DP(m_nat_lookup_no_flow_id);
+ DP(m_nat_lookup_remove_flow_id);
+ DP(m_nat_lookup_add_flow_id);
+ DP(m_nat_flow_timeout);
+ DP_name("active_nat",(m_nat_lookup_add_flow_id-m_nat_lookup_remove_flow_id));
+ DP(m_nat_flow_learn_error);
+}
+
+
+
+int CErfIF::open_file(std::string file_name){
+ BP_ASSERT(m_writer==0);
+
+ if ( m_preview_mode->getFileWrite() ){
+ capture_type_e file_type=ERF;
+ if ( m_preview_mode->get_pcap_mode_enable() ){
+ file_type=LIBPCAP;
+ }
+ m_writer = CCapWriterFactory::CreateWriter(file_type,(char *)file_name.c_str());
+ if (m_writer == NULL) {
+ fprintf(stderr,"ERROR can't create cap file %s ",(char *)file_name.c_str());
+ return (-1);
+ }
+ }
+ m_raw = new CCapPktRaw();
+ return (0);
+}
+
+
+int CErfIF::write_pkt(CCapPktRaw *pkt_raw){
+
+ BP_ASSERT(m_writer);
+
+ if ( m_preview_mode->getFileWrite() ){
+ BP_ASSERT(m_writer);
+ bool res=m_writer->write_packet(pkt_raw);
+ if (res != true) {
+ fprintf(stderr,"ERROR can't write to cap file");
+ return (-1);
+ }
+ }
+ return (0);
+}
+
+
+int CErfIF::close_file(void){
+
+ BP_ASSERT(m_raw);
+ m_raw->raw=0;
+ delete m_raw;
+
+ if ( m_preview_mode->getFileWrite() ){
+ BP_ASSERT(m_writer);
+ delete m_writer;
+ m_writer=0;
+ }
+ return (0);
+}
+
+
+
+void CFlowKey::Clean(){
+ m_ipaddr1=0;
+ m_ipaddr2=0;
+ m_port1=0;
+ m_port2=0;
+ m_ip_proto=0;
+ m_l2_proto=0;
+ m_vrfid=0;
+}
+
+void CFlowKey::Dump(FILE *fd){
+ fprintf(fd," %x:%x:%x:%x:%x:%x:%x\n",m_ipaddr1,m_ipaddr2,m_port1,m_port2,m_ip_proto,m_l2_proto,m_vrfid);
+}
+
+
+
+void CPacketDescriptor::Dump(FILE *fd){
+ fprintf(fd," IsSwapTuple : %d \n",IsSwapTuple()?1:0);
+ fprintf(fd," IsSInitDir : %d \n",IsInitSide()?1:0);
+ fprintf(fd," Isvalid : %d ",IsValidPkt()?1:0);
+ fprintf(fd," IsRtt : %d ",IsRtt()?1:0);
+ fprintf(fd," IsLearn : %d ",IsLearn()?1:0);
+
+ if (IsTcp() ) {
+ fprintf(fd," TCP ");
+ }else{
+ fprintf(fd," UDP ");
+ }
+ fprintf(fd," IsLast Pkt : %d ", IsLastPkt() ?1:0);
+ fprintf(fd," id : %d \n",getId() );
+
+ fprintf(fd," flow_ID : %d , max_pkts : %u, max_aging: %d sec , pkt_id : %u, init: %d ( dir:%d dir_max :%d ) bid:%d \n",getFlowId(),
+ GetMaxPktsPerFlow(),
+ GetMaxFlowTimeout() ,
+ getFlowPktNum(),
+ IsInitSide(),
+ GetDirInfo()->GetPktNum(),
+ GetDirInfo()->GetMaxPkts(),
+ IsBiDirectionalFlow()?1:0
+
+ );
+ fprintf(fd,"\n");
+}
+
+
+void CPacketIndication::UpdateOffsets(){
+ m_ether_offset = getEtherOffset();
+ m_ip_offset = getIpOffset();
+ m_udp_tcp_offset = getTcpOffset();
+ m_payload_offset = getPayloadOffset();
+}
+
+void CPacketIndication::UpdatePacketPadding(){
+ m_packet_padding = m_packet->getTotalLen() - (l3.m_ipv4->getTotalLength()+ getIpOffset());
+}
+
+
+void CPacketIndication::RefreshPointers(){
+
+ char *pobase=getBasePtr();
+ CPacketIndication * obj=this;
+
+ m_ether = (EthernetHeader *) (pobase + m_ether_offset);
+ l3.m_ipv4 = (IPHeader *) (pobase + m_ip_offset);
+ l4.m_tcp= (TCPHeader *)(pobase + m_udp_tcp_offset);
+ if ( m_payload_offset ){
+ m_payload =(uint8_t *)(pobase + m_payload_offset);
+ }else{
+ m_payload =(uint8_t *)(0);
+ }
+}
+
+// copy ref assume pkt point to a fresh
+void CPacketIndication::Clone(CPacketIndication * obj,CCapPktRaw * pkt){
+ Clean();
+ m_cap_ipg = obj->m_cap_ipg;
+ m_packet = pkt;
+ char *pobase=getBasePtr();
+ m_flow = obj->m_flow;
+
+ m_ether = (EthernetHeader *) (pobase + obj->getEtherOffset());
+ l3.m_ipv4 = (IPHeader *) (pobase + obj->getIpOffset());
+ m_is_ipv6 = obj->m_is_ipv6;
+ l4.m_tcp= (TCPHeader *)(pobase + obj->getTcpOffset());
+ if ( obj->getPayloadOffset() ){
+ m_payload =(uint8_t *)(pobase + obj->getPayloadOffset());
+ }else{
+ m_payload =(uint8_t *)(0);
+ }
+ m_payload_len = obj->m_payload_len;
+ m_flow_key = obj->m_flow_key;
+ m_desc = obj->m_desc;
+
+ m_packet_padding = obj->m_packet_padding;
+ /* copy offsets*/
+ m_ether_offset = obj->m_ether_offset;
+ m_ip_offset = obj->m_ip_offset;
+ m_udp_tcp_offset = obj->m_udp_tcp_offset;;
+ m_payload_offset = obj->m_payload_offset;
+}
+
+
+
+void CPacketIndication::Dump(FILE *fd,int verbose){
+ fprintf(fd," ipg : %f \n",m_cap_ipg);
+ fprintf(fd," key \n");
+ fprintf(fd," ------\n");
+ m_flow_key.Dump(fd);
+
+ fprintf(fd," L2 info \n");
+ fprintf(fd," ------\n");
+ m_packet->Dump(fd,verbose);
+
+ fprintf(fd," Descriptor \n");
+ fprintf(fd," ------\n");
+ m_desc.Dump(fd);
+
+ if ( m_desc.IsValidPkt() ) {
+ fprintf(fd," ipv4 \n");
+ l3.m_ipv4->dump(fd);
+ if ( m_desc.IsUdp() ) {
+ l4.m_udp->dump(fd);
+ }else{
+ l4.m_tcp->dump(fd);
+ }
+ fprintf(fd," payload len : %d \n",m_payload_len);
+ }else{
+ fprintf(fd," not valid packet \n");
+ }
+}
+
+void CPacketIndication::Clean(){
+ m_desc.Clear();
+ m_ether=0;
+ l3.m_ipv4=0;
+ l4.m_tcp=0;
+ m_payload=0;
+ m_payload_len=0;
+}
+
+
+
+uint64_t CCPacketParserCounters::getTotalErrors(){
+ uint64_t res=
+ m_non_ip+
+ m_arp+
+ m_mpls+
+ m_non_valid_ipv4_ver+
+ m_ip_checksum_error+
+ m_ip_length_error+
+ m_ip_not_first_fragment_error+
+ m_ip_ttl_is_zero_error+
+ m_ip_multicast_error+
+
+ m_non_tcp_udp_ah+
+ m_non_tcp_udp_esp+
+ m_non_tcp_udp_icmp+
+ m_non_tcp_udp_gre+
+ m_non_tcp_udp_ip+
+ m_tcp_udp_pkt_length_error;
+ return (res);
+}
+
+void CCPacketParserCounters::Clear(){
+ m_pkt=0;
+ m_non_ip=0;
+ m_vlan=0;
+ m_arp=0;
+ m_mpls=0;
+
+ m_non_valid_ipv4_ver=0;
+ m_ip_checksum_error=0;
+ m_ip_length_error=0;
+ m_ip_not_first_fragment_error=0;
+ m_ip_ttl_is_zero_error=0;
+ m_ip_multicast_error=0;
+ m_ip_header_options=0;
+
+ m_non_tcp_udp=0;
+ m_non_tcp_udp_ah=0;
+ m_non_tcp_udp_esp=0;
+ m_non_tcp_udp_icmp=0;
+ m_non_tcp_udp_gre=0;
+ m_non_tcp_udp_ip=0;
+ m_tcp_header_options=0;
+ m_tcp_udp_pkt_length_error=0;
+ m_tcp=0;
+ m_udp=0;
+ m_valid_udp_tcp=0;
+}
+
+
+void CCPacketParserCounters::Dump(FILE *fd){
+
+ DP (m_pkt);
+ DP (m_non_ip);
+ DP (m_vlan);
+ DP (m_arp);
+ DP (m_mpls);
+
+ DP (m_non_valid_ipv4_ver);
+ DP (m_ip_checksum_error);
+ DP (m_ip_length_error);
+ DP (m_ip_not_first_fragment_error);
+ DP (m_ip_ttl_is_zero_error);
+ DP (m_ip_multicast_error);
+ DP (m_ip_header_options);
+
+ DP (m_non_tcp_udp);
+ DP (m_non_tcp_udp_ah);
+ DP (m_non_tcp_udp_esp);
+ DP (m_non_tcp_udp_icmp);
+ DP (m_non_tcp_udp_gre);
+ DP (m_non_tcp_udp_ip);
+ DP (m_tcp_header_options);
+ DP (m_tcp_udp_pkt_length_error);
+ DP (m_tcp);
+ DP (m_udp);
+ DP (m_valid_udp_tcp);
+}
+
+
+bool CPacketParser::Create(){
+ m_counter.Clear();
+ return (true);
+}
+
+void CPacketParser::Delete(){
+}
+
+
+bool CPacketParser::ProcessPacket(CPacketIndication * pkt_indication,
+ CCapPktRaw * raw_packet){
+ BP_ASSERT(pkt_indication);
+ pkt_indication->ProcessPacket(this,raw_packet);
+ if (pkt_indication->m_desc.IsValidPkt()) {
+ return (true);
+ }
+ return (false);
+}
+
+void CPacketParser::Dump(FILE *fd){
+ fprintf(fd," parser statistic \n");
+ fprintf(fd," ===================== \n");
+ m_counter.Dump(fd);
+}
+
+
+void CPacketIndication::SetKey(void){
+ uint32_t ip_src, ip_dst;
+
+ m_desc.SetIsValidPkt(true);
+ if (is_ipv6()){
+ uint16_t ipv6_src[8];
+ uint16_t ipv6_dst[8];
+
+ l3.m_ipv6->getSourceIpv6(&ipv6_src[0]);
+ l3.m_ipv6->getDestIpv6(&ipv6_dst[0]);
+ ip_src=(ipv6_src[6] << 16) | ipv6_src[7];
+ ip_dst=(ipv6_dst[6] << 16) | ipv6_dst[7];
+ m_flow_key.m_ip_proto = l3.m_ipv6->getNextHdr();
+ }else{
+ ip_src=l3.m_ipv4->getSourceIp();
+ ip_dst=l3.m_ipv4->getDestIp();
+ m_flow_key.m_ip_proto = l3.m_ipv4->getProtocol();
+ }
+
+ /* UDP/TCP has same place */
+ uint16_t src_port = l4.m_udp->getSourcePort();
+ uint16_t dst_port = l4.m_udp->getDestPort();
+ if (ip_src > ip_dst ) {
+ m_flow_key.m_ipaddr1 =ip_dst;
+ m_flow_key.m_ipaddr2 =ip_src;
+ m_flow_key.m_port1 = dst_port;
+ m_flow_key.m_port2 = src_port;
+ }else{
+ m_desc.SetSwapTuple(true);
+ m_flow_key.m_ipaddr1 = ip_src;
+ m_flow_key.m_ipaddr2 = ip_dst;
+ m_flow_key.m_port1 = src_port;
+ m_flow_key.m_port2 = dst_port;
+ }
+ m_flow_key.m_l2_proto = 0;
+ m_flow_key.m_vrfid = 0;
+}
+
+uint8_t CPacketIndication::ProcessIpPacketProtocol(CCPacketParserCounters *m_cnt,
+ uint8_t protocol, int *offset){
+
+ char * packetBase = m_packet->raw;
+ TCPHeader * tcp=0;
+ UDPHeader * udp=0;
+ uint16_t tcp_header_len=0;
+
+ switch (protocol) {
+ case IPHeader::Protocol::TCP :
+ m_desc.SetIsTcp(true);
+ tcp =(TCPHeader *)(packetBase +*offset);
+ l4.m_tcp = tcp;
+
+ tcp_header_len = tcp->getHeaderLength();
+ if ( tcp_header_len > (5*4) ){
+ // we have ip option
+ m_cnt->m_tcp_header_options++;
+ }
+ *offset += tcp_header_len;
+ m_cnt->m_tcp++;
+ break;
+ case IPHeader::Protocol::UDP :
+ m_desc.SetIsUdp(true);
+ udp =(UDPHeader *)(packetBase +*offset);
+ l4.m_udp = udp;
+ *offset += 8;
+ m_cnt->m_udp++;
+ break;
+ case IPHeader::Protocol::AH:
+ m_cnt->m_non_tcp_udp_ah++;
+ return (1);
+ break;
+ case IPHeader::Protocol::ESP:
+ m_cnt->m_non_tcp_udp_esp++;
+ return (1);
+ break;
+ case IPHeader::Protocol::ICMP:
+ case IPHeader::Protocol::IPV6_ICMP:
+ m_cnt->m_non_tcp_udp_icmp++;
+ return (1);
+ break;
+ case IPHeader::Protocol::GRE:
+ m_cnt->m_non_tcp_udp_gre++;
+ return (1);
+ break;
+ case IPHeader::Protocol::IP:
+ m_cnt->m_non_ip++;
+ return (1);
+ break;
+
+ default:
+ m_cnt->m_non_tcp_udp++;
+ return (1);
+ break;
+ }
+
+ /* out of packet */
+ if ( *offset > m_packet->getTotalLen() ) {
+ m_cnt->m_tcp_udp_pkt_length_error++;
+ return (1);
+ }
+ return (0);
+}
+
+
+void CPacketIndication::ProcessIpPacket(CPacketParser *parser,
+ int offset){
+
+ char * packetBase;
+ CCPacketParserCounters * m_cnt=&parser->m_counter;
+ packetBase = m_packet->raw;
+ uint8_t protocol;
+ BP_ASSERT(l3.m_ipv4);
+
+ parser->m_counter.m_pkt++;
+
+ if ( l3.m_ipv4->getVersion() == 4 ){
+ m_cnt->m_ipv4++;
+ }else{
+ m_cnt->m_non_valid_ipv4_ver++;
+ return;
+ }
+ // check the IP Length
+ if( (uint32_t)(l3.m_ipv4->getTotalLength()+offset) > (uint32_t)( m_packet->getTotalLen()) ){
+ m_cnt->m_ip_length_error++;
+ return;
+ }
+
+ uint16_t ip_offset=offset;
+ uint16_t ip_header_length = l3.m_ipv4->getHeaderLength();
+
+ if ( ip_header_length >(5*4) ){
+ m_cnt->m_ip_header_options++;
+ }
+
+ if ( (uint32_t)(ip_header_length + offset) > (uint32_t)m_packet->getTotalLen() ) {
+ m_cnt->m_ip_length_error++;
+ return;
+ }
+ offset += ip_header_length;
+
+ if (!l3.m_ipv4->isChecksumOK() ){
+ m_cnt->m_ip_checksum_error++;
+ }
+ if( l3.m_ipv4->isMulticast() ){
+ m_cnt->m_ip_multicast_error++;
+ return;
+ }
+
+ if( l3.m_ipv4->getTimeToLive() ==0 ){
+ m_cnt->m_ip_ttl_is_zero_error++;
+ return;
+ }
+
+ if( l3.m_ipv4->isNotFirstFragment() ) {
+ m_cnt->m_ip_not_first_fragment_error++;
+ return;
+ }
+
+ protocol = l3.m_ipv4->getProtocol();
+ if (ProcessIpPacketProtocol(m_cnt,protocol,&offset) != 0) {
+ return;
+ };
+
+ uint16_t payload_offset_from_ip = offset-ip_offset;
+ if ( payload_offset_from_ip > l3.m_ipv4->getTotalLength() ) {
+ m_cnt->m_tcp_udp_pkt_length_error++;
+ return;
+ }
+
+ // Set packet length and include padding if needed
+ m_packet->pkt_len = l3.m_ipv4->getTotalLength() + getIpOffset();
+ if (m_packet->pkt_len < 60) { m_packet->pkt_len = 60; }
+
+ m_cnt->m_valid_udp_tcp++;
+ m_payload_len = l3.m_ipv4->getTotalLength() - (payload_offset_from_ip);
+ m_payload = (uint8_t *)(packetBase +offset);
+ UpdatePacketPadding();
+ SetKey();
+}
+
+
+
+void CPacketIndication::ProcessIpv6Packet(CPacketParser *parser,
+ int offset){
+
+ char * packetBase = m_packet->raw;
+ CCPacketParserCounters * m_cnt=&parser->m_counter;
+ uint16_t src_ipv6[6];
+ uint16_t dst_ipv6[6];
+ uint16_t idx;
+ uint8_t protocol;
+ BP_ASSERT(l3.m_ipv6);
+
+ parser->m_counter.m_pkt++;
+
+ if ( l3.m_ipv6->getVersion() == 6 ){
+ m_cnt->m_ipv6++;
+ }else{
+ m_cnt->m_non_valid_ipv6_ver++;
+ return;
+ }
+
+ // Check length
+ if ((uint32_t)(l3.m_ipv6->getPayloadLen()+offset+l3.m_ipv6->getHeaderLength()) >
+ (uint32_t)( m_packet->getTotalLen()) ){
+ m_cnt->m_ipv6_length_error++;
+ return;
+ }
+
+ for (idx=0; idx<6; idx++){
+ src_ipv6[idx] = CGlobalInfo::m_options.m_src_ipv6[idx];
+ dst_ipv6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
+ }
+ l3.m_ipv6->updateMSBIpv6Src(&src_ipv6[0]);
+ l3.m_ipv6->updateMSBIpv6Dst(&dst_ipv6[0]);
+
+ offset += l3.m_ipv6->getHeaderLength();
+ protocol = l3.m_ipv6->getNextHdr();
+ if (ProcessIpPacketProtocol(m_cnt,protocol,&offset) != 0) {
+ return;
+ };
+
+ // Set packet length and include padding if needed
+ uint16_t real_pkt_size = l3.m_ipv6->getPayloadLen()+ getIpOffset() + l3.m_ipv6->getHeaderLength();
+ m_packet->pkt_len = real_pkt_size;
+ if (m_packet->pkt_len < 60) { m_packet->pkt_len = 60; }
+
+ m_cnt->m_valid_udp_tcp++;
+ m_payload_len = l3.m_ipv6->getPayloadLen();
+ m_payload = (uint8_t *)(packetBase +offset);
+
+ m_packet_padding = m_packet->getTotalLen() - real_pkt_size;
+ assert( m_packet->getTotalLen()>= real_pkt_size );
+ SetKey();
+}
+
+
+static uint8_t cbuff[MAX_PKT_SIZE];
+
+bool CPacketIndication::ConvertPacketToIpv6InPlace(CCapPktRaw * pkt,
+ int offset){
+
+ // Copy l2 data and set l2 type to ipv6
+ memcpy(cbuff, pkt->raw, offset);
+ *(uint16_t*)(cbuff+12) = PKT_HTONS(EthernetHeader::Protocol::IPv6);
+
+ // Create the ipv6 header
+ IPHeader *ipv4 = (IPHeader *) (pkt->raw+offset);
+ IPv6Header *ipv6 = (IPv6Header *) (cbuff+offset);
+ uint8_t ipv6_hdrlen = ipv6->getHeaderLength();
+ memset(ipv6,0,ipv6_hdrlen);
+ ipv6->setVersion(6);
+ if (ipv4->getTotalLength() < ipv4->getHeaderLength()) {
+ return(false);
+ }
+ // Calculate the payload length
+ uint16_t p_len = ipv4->getTotalLength() - ipv4->getHeaderLength();
+ ipv6->setPayloadLen(p_len);
+ uint8_t l4_proto = ipv4->getProtocol();
+ ipv6->setNextHdr(l4_proto);
+ ipv6->setHopLimit(64);
+
+ // Update the least signficant 32-bits of ipv6 address
+ // using the ipv4 address
+ ipv6->updateLSBIpv6Src(ipv4->getSourceIp());
+ ipv6->updateLSBIpv6Dst(ipv4->getDestIp());
+
+ // Copy rest of packet
+ uint16_t ipv4_offset = offset + ipv4->getHeaderLength();
+ uint16_t ipv6_offset = offset + ipv6_hdrlen;
+ memcpy(cbuff+ipv6_offset,pkt->raw+ipv4_offset,p_len);
+
+ ipv6_offset+=p_len;
+ memcpy(pkt->raw,cbuff,ipv6_offset);
+
+ // Set packet length
+ pkt->pkt_len = ipv6_offset;
+ m_is_ipv6 = true;
+
+ return (true);
+}
+
+
+void CPacketIndication::ProcessPacket(CPacketParser *parser,
+ CCapPktRaw * pkt){
+ _ProcessPacket(parser,pkt);
+ UpdateOffsets(); /* update fast offsets */
+}
+
+
+
+/* process packet */
+void CPacketIndication::_ProcessPacket(CPacketParser *parser,
+ CCapPktRaw * pkt){
+
+ BP_ASSERT(pkt);
+ m_packet =pkt;
+ Clean();
+ CCPacketParserCounters * m_cnt=&parser->m_counter;
+
+ int offset = 0;
+ char * packetBase;
+ packetBase = m_packet->raw;
+ BP_ASSERT(packetBase);
+ m_ether = (EthernetHeader *)packetBase;
+ m_is_ipv6 = false;
+
+ // IP
+ switch( m_ether->getNextProtocol() ) {
+ case EthernetHeader::Protocol::IP :
+ offset = 14;
+ l3.m_ipv4 =(IPHeader *)(packetBase+offset);
+ break;
+ case EthernetHeader::Protocol::IPv6 :
+ offset = 14;
+ l3.m_ipv6 =(IPv6Header *)(packetBase+offset);
+ m_is_ipv6 = true;
+ break;
+ case EthernetHeader::Protocol::VLAN :
+ m_cnt->m_vlan++;
+ switch ( m_ether->getVlanProtocol() ){
+ case EthernetHeader::Protocol::IP:
+ offset = 18;
+ l3.m_ipv4 =(IPHeader *)(packetBase+offset);
+ break;
+ case EthernetHeader::Protocol::IPv6 :
+ offset = 18;
+ l3.m_ipv6 =(IPv6Header *)(packetBase+offset);
+ m_is_ipv6 = true;
+ break;
+ case EthernetHeader::Protocol::MPLS_Multicast :
+ case EthernetHeader::Protocol::MPLS_Unicast :
+ m_cnt->m_mpls++;
+ return;
+
+ case EthernetHeader::Protocol::ARP :
+ m_cnt->m_arp++;
+ return;
+
+ default:
+ m_cnt->m_non_ip++;
+ return ; /* Non IP */
+ }
+ break;
+ case EthernetHeader::Protocol::ARP :
+ m_cnt->m_arp++;
+ return; /* Non IP */
+ break;
+
+ case EthernetHeader::Protocol::MPLS_Multicast :
+ case EthernetHeader::Protocol::MPLS_Unicast :
+ m_cnt->m_mpls++;
+ return; /* Non IP */
+ break;
+
+ default:
+ m_cnt->m_non_ip++;
+ return; /* Non IP */
+ }
+
+ if (is_ipv6() == false) {
+ if( (14+20) > (uint32_t)( m_packet->getTotalLen()) ){
+ m_cnt->m_ip_length_error++;
+ return;
+ }
+ }
+
+ // For now, we can not mix ipv4 and ipv4 packets
+ // so we require --ipv6 option be set for ipv6 packets
+ if ((m_is_ipv6) && (CGlobalInfo::is_ipv6_enable() == false)){
+ fprintf(stderr,"ERROR --ipv6 must be set to process ipv6 packets\n");
+ exit(-1);
+ }
+
+ // Convert to Ipv6 if requested and not already Ipv6
+ if ((CGlobalInfo::is_ipv6_enable()) && (is_ipv6() == false )) {
+ if (ConvertPacketToIpv6InPlace(pkt, offset) == false){
+ /* Move to next packet as this was erroneous */
+ printf(" unable to convert packet to IPv6, skipping...\n");
+ return;
+ }
+ }
+
+ if (is_ipv6()){
+ ProcessIpv6Packet(parser,offset);
+ }else{
+ ProcessIpPacket(parser,offset);
+ }
+}
+
+
+
+void CFlowTableStats::Clear(){
+ m_lookup=0;
+ m_found=0;
+ m_fif=0;
+ m_add=0;
+ m_remove=0;
+ m_fif_err=0;
+ m_active=0;
+}
+
+void CFlowTableStats::Dump(FILE *fd){
+ DP (m_lookup);
+ DP (m_found);
+ DP (m_fif);
+ DP (m_add);
+ DP (m_remove);
+ DP (m_fif_err);
+ DP (m_active);
+}
+
+
+void CFlow::Dump(FILE *fd){
+ fprintf(fd," fif is swap : %d \n",is_fif_swap);
+}
+
+
+void CFlowTableManagerBase::Dump(FILE *fd){
+ m_stats.Dump(fd);
+}
+
+CFlow * CFlowTableManagerBase::process(CFlowKey & key,bool &is_fif ){
+ m_stats.m_lookup++;
+ is_fif=false;
+ CFlow * lp=lookup(key);
+ if ( lp ) {
+ m_stats.m_found++;
+ return (lp);
+ }else{
+ m_stats.m_fif++;
+ m_stats.m_active++;
+ m_stats.m_add++;
+ is_fif=true;
+ lp= add(key );
+ if (lp) {
+ }else{
+ m_stats.m_fif_err++;
+ }
+ }
+ return (lp);
+}
+
+
+bool CFlowTableMap::Create(int max_size){
+ m_stats.Clear();
+ return (true);
+}
+
+void CFlowTableMap::Delete(){
+ remove_all();
+}
+
+void CFlowTableMap::remove(CFlowKey & key ){
+ CFlow *lp=lookup(key);
+ if ( lp ) {
+ delete lp;
+ m_stats.m_remove++;
+ m_stats.m_active--;
+ m_map.erase(key);
+ }else{
+ BP_ASSERT(0);
+ }
+}
+
+
+CFlow * CFlowTableMap::lookup(CFlowKey & key ){
+ flow_map_t::iterator iter;
+ iter = m_map.find(key);
+ if (iter != m_map.end() ) {
+ return ( (*iter).second );
+ }else{
+ return (( CFlow*)0);
+ }
+}
+
+CFlow * CFlowTableMap::add(CFlowKey & key ){
+ CFlow * flow = new CFlow();
+ m_map.insert(flow_map_t::value_type(key,flow));
+ return (flow);
+}
+
+void CFlowTableMap::remove_all(){
+ if ( m_map.empty() )
+ return;
+ flow_map_iter_t it;
+ for (it= m_map.begin(); it != m_map.end(); ++it) {
+ CFlow *lp = it->second;
+ delete lp;
+ }
+ m_map.clear();
+}
+
+uint64_t CFlowTableMap::count(){
+ return ( m_map.size());
+}
+
+
+/*
+ * This function will insert an IP option header containing metadata for the
+ * rx-check feature.
+ *
+ * An mbuf is created to hold the new option header plus the portion of the
+ * packet after the base IP header (includes any IP options header that might
+ * exist). This mbuf is then linked into the existing mbufs (becoming the
+ * second mbuf).
+ *
+ * Note that the rxcheck option header is inserted as the first option header,
+ * and any existing IP option headers are placed after it.
+ */
+void CFlowPktInfo::do_generate_new_mbuf_rxcheck(rte_mbuf_t * m,
+ CGenNode * node,
+ pkt_dir_t dir,
+ bool single_port){
+
+ /* retrieve size of rx-check header, must be multiple of 8 */
+ uint16_t opt_len = RX_CHECK_LEN;
+ uint16_t current_opt_len = 0;
+ assert( (opt_len % 8) == 0 );
+
+ /* determine starting move location */
+ char *mp1 = rte_pktmbuf_mtod(m, char*);
+ uint16_t mp1_offset = m_pkt_indication.getFastIpOffsetFast();
+ if (unlikely (m_pkt_indication.is_ipv6()) ) {
+ mp1_offset += IPv6Header::DefaultSize;
+ }else{
+ mp1_offset += IPHeader::DefaultSize;
+ }
+ char *move_from = mp1 + mp1_offset;
+
+ /* determine size of new mbuf required */
+ uint16_t move_len = m->data_len - mp1_offset;
+ uint16_t new_mbuf_size = move_len + opt_len;
+ uint16_t mp2_offset = opt_len;
+
+ /* obtain a new mbuf */
+ rte_mbuf_t * new_mbuf = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), new_mbuf_size);
+ assert(new_mbuf);
+ char * mp2 = rte_pktmbuf_append(new_mbuf, new_mbuf_size);
+ char * move_to = mp2 + mp2_offset;
+
+ /* move part of packet from first mbuf to new mbuf */
+ memmove(move_to, move_from, move_len);
+
+ /* trim first mbuf and set pointer to option header*/
+ CRx_check_header *rxhdr;
+ uint16_t buf_adjust = move_len;
+ rxhdr = (CRx_check_header *)mp2;
+ m->data_len -= buf_adjust;
+
+ /* insert rx-check data as an IPv4 option header or IPv6 extension header*/
+ CFlowPktInfo * lp=node->m_pkt_info;
+ CPacketDescriptor * desc=&lp->m_pkt_indication.m_desc;
+
+ /* set option type and update ip header length */
+ IPHeader * ipv4=(IPHeader *)(mp1 + 14);
+ if (unlikely (m_pkt_indication.is_ipv6()) ) {
+ IPv6Header * ipv6=(IPv6Header *)(mp1 + 14);
+ uint8_t save_header= ipv6->getNextHdr();
+ ipv6->setNextHdr(RX_CHECK_V6_OPT_TYPE);
+ ipv6->setHopLimit(TTL_RESERVE_DUPLICATE);
+ ipv6->setPayloadLen( ipv6->getPayloadLen() +
+ sizeof(CRx_check_header));
+ rxhdr->m_option_type = save_header;
+ rxhdr->m_option_len = RX_CHECK_V6_OPT_LEN;
+ }else{
+ current_opt_len = ipv4->getHeaderLength();
+ ipv4->setHeaderLength(current_opt_len+opt_len);
+ ipv4->setTotalLength(ipv4->getTotalLength()+opt_len);
+ ipv4->setTimeToLive(TTL_RESERVE_DUPLICATE);
+ rxhdr->m_option_type = RX_CHECK_V4_OPT_TYPE;
+ rxhdr->m_option_len = RX_CHECK_V4_OPT_LEN;
+ }
+
+ /* fill in the rx-check metadata in the options header */
+ if ( CGlobalInfo::m_options.is_rxcheck_const_ts() ){
+ /* Runtime flag to use a constant value for the timestamp field. */
+ /* This is used by simulation to provide consistency across runs. */
+ rxhdr->m_time_stamp = 0xB3B2B1B0;
+ }else{
+ rxhdr->m_time_stamp = os_get_hr_tick_32();
+ }
+ rxhdr->m_magic = RX_CHECK_MAGIC;
+ rxhdr->m_flow_id = node->m_flow_id | ( ( (uint64_t)(desc->getFlowId() & 0xf))<<52 ) ; // include thread_id, node->flow_id, sub_flow in case of multi-flow template
+ rxhdr->m_flags = 0;
+ rxhdr->m_aging_sec = desc->GetMaxFlowTimeout();
+ rxhdr->m_template_id = (uint8_t)desc->getId();
+
+ /* add the flow packets goes to the same port */
+ if (single_port) {
+ rxhdr->m_pkt_id = desc->getFlowPktNum();
+ rxhdr->m_flow_size = desc->GetMaxPktsPerFlow();
+
+ }else{
+ rxhdr->m_pkt_id = desc->GetDirInfo()->GetPktNum();
+ rxhdr->m_flow_size = desc->GetDirInfo()->GetMaxPkts();
+ /* set dir */
+ rxhdr->set_dir(desc->IsInitSide()?1:0);
+ rxhdr->set_both_dir(desc->IsBiDirectionalFlow()?1:0);
+ }
+
+ /* update checksum for IPv4, split across 2 mbufs */
+ if (likely ( ! m_pkt_indication.is_ipv6()) ) {
+ ipv4->updateCheckSum2((uint8_t *)ipv4, current_opt_len, (uint8_t *)rxhdr, opt_len);
+ }
+
+ /* link new mbuf */
+ new_mbuf->next = m->next;
+ new_mbuf->nb_segs++;
+ m->next = new_mbuf;
+ m->nb_segs++;
+ m->pkt_len += opt_len;
+}
+
+
+char * CFlowPktInfo::push_ipv4_option_offline(uint8_t bytes){
+ /* must be align by 4*/
+ assert( (bytes % 4)== 0 );
+ assert(m_pkt_indication.is_ipv6()==false);
+ if ( m_pkt_indication.l3.m_ipv4->getHeaderLength()+bytes>60 ){
+ printf(" ERROR ipv4 options size is too big, should be able to add %d bytes for internal info \n",bytes);
+ return((char *)0);
+ }
+ /* now we can do that !*/
+
+ /* add more bytes to the packet */
+ m_packet->append(bytes);
+ uint8_t ip_offset_to_move= m_pkt_indication.getFastIpOffsetFast()+IPHeader::DefaultSize;
+ char *p=m_packet->raw+ip_offset_to_move;
+ uint16_t bytes_to_move= m_packet->pkt_len - ip_offset_to_move -bytes;
+
+ /* move the start of ipv4 options */
+ memmove(p+bytes ,p, bytes_to_move);
+
+ /* fix all other stuff */
+ if ( m_pkt_indication.m_udp_tcp_offset ){
+ m_pkt_indication.m_udp_tcp_offset+=bytes;
+ }
+ if ( m_pkt_indication.m_payload_offset ) {
+ m_pkt_indication.m_payload_offset+=bytes;
+ }
+
+ m_pkt_indication.RefreshPointers();
+ /* now pointer are updated we can manipulate ipv4 header */
+ IPHeader * ipv4=m_pkt_indication.l3.m_ipv4;
+
+ ipv4->setTotalLength(ipv4->getTotalLength()+bytes);
+ ipv4->setHeaderLength(ipv4->getHeaderLength()+(bytes));
+
+ m_pkt_indication.UpdatePacketPadding();
+
+ /* refresh the global mbuf */
+ free_const_mbuf();
+ alloc_const_mbuf();
+ return (p);
+}
+
+
+void CFlowPktInfo::mask_as_learn(){
+ char *p;
+ CNatOption *lpNat;
+ if ( m_pkt_indication.is_ipv6() ){
+ lpNat=(CNatOption *)push_ipv6_option_offline(CNatOption::noOPTION_LEN);
+ lpNat->set_init_ipv6_header();
+ lpNat->set_fid(0);
+ lpNat->set_thread_id(0);
+ }else{
+ lpNat=(CNatOption *)push_ipv4_option_offline(CNatOption::noOPTION_LEN);
+ lpNat->set_init_ipv4_header();
+ lpNat->set_fid(0);
+ lpNat->set_thread_id(0);
+ m_pkt_indication.l3.m_ipv4->updateCheckSum();
+ }
+ /* learn is true */
+ m_pkt_indication.m_desc.SetLearn(true);
+
+}
+
+
+char * CFlowPktInfo::push_ipv6_option_offline(uint8_t bytes){
+
+ /* must be align by 8*/
+ assert( (bytes % 8)== 0 );
+ assert(m_pkt_indication.is_ipv6()==true);
+
+ /* add more bytes to the packet */
+ m_packet->append(bytes);
+ uint8_t ip_offset_to_move= m_pkt_indication.getFastIpOffsetFast()+IPv6Header::DefaultSize;
+ char *p=m_packet->raw+ip_offset_to_move;
+ uint16_t bytes_to_move= m_packet->pkt_len - ip_offset_to_move -bytes;
+
+ /* move the start of ipv4 options */
+ memmove(p+bytes ,p, bytes_to_move);
+
+ /* fix all other stuff */
+ if ( m_pkt_indication.m_udp_tcp_offset ){
+ m_pkt_indication.m_udp_tcp_offset+=bytes;
+ }
+ if ( m_pkt_indication.m_payload_offset ) {
+ m_pkt_indication.m_payload_offset+=bytes;
+ }
+
+ m_pkt_indication.RefreshPointers();
+ /* now pointer are updated we can manipulate ipv6 header */
+ IPv6Header * ipv6=m_pkt_indication.l3.m_ipv6;
+
+ ipv6->setPayloadLen(ipv6->getPayloadLen()+bytes);
+ uint8_t save_header= ipv6->getNextHdr();
+ *p=save_header; /* copy next header */
+ ipv6->setNextHdr(CNatOption::noIPV6_OPTION);
+
+ m_pkt_indication.UpdatePacketPadding();
+
+ /* refresh the global mbuf */
+ free_const_mbuf();
+ alloc_const_mbuf();
+ return (p);
+}
+
+
+void CFlowPktInfo::alloc_const_mbuf(){
+
+ if ( m_packet->pkt_len > FIRST_PKT_SIZE ) {
+ /* pkt size in bigger than FIRST_PKT_SIZE let's create a offline buffer */
+ int i;
+ for (i=0; i<MAX_SOCKETS_SUPPORTED; i++) {
+ if ( CGlobalInfo::m_socket.is_sockets_enable(i) ){
+
+ rte_mbuf_t * m;
+ uint16_t pkt_s=(m_packet->pkt_len - FIRST_PKT_SIZE);
+
+ m = CGlobalInfo::pktmbuf_alloc(i,pkt_s);
+ BP_ASSERT(m);
+ char *p=rte_pktmbuf_append(m, pkt_s);
+ rte_memcpy(p,(m_packet->raw+FIRST_PKT_SIZE),pkt_s);
+
+ assert(m_big_mbuf[i]==NULL);
+ m_big_mbuf[i]=m;
+ }
+ }
+ }
+}
+
+void CFlowPktInfo::free_const_mbuf(){
+ int i;
+ for (i=0; i<MAX_SOCKETS_SUPPORTED; i++) {
+ rte_mbuf_t * m=m_big_mbuf[i];
+ if (m) {
+ rte_pktmbuf_free(m );
+ m_big_mbuf[i]=NULL;
+ }
+ }
+}
+
+
+bool CFlowPktInfo::Create(CPacketIndication * pkt_ind){
+ /* clone the packet*/
+ m_packet = new CCapPktRaw(pkt_ind->m_packet);
+ /* clone of the offsets */
+ m_pkt_indication.Clone(pkt_ind,m_packet);
+
+ int i;
+ for (i=0; i<MAX_SOCKETS_SUPPORTED; i++) {
+ m_big_mbuf[i] = NULL;
+ }
+ alloc_const_mbuf();
+ return (true);
+}
+
+void CFlowPktInfo::Delete(){
+ free_const_mbuf();
+ delete m_packet;
+}
+
+void CFlowPktInfo::Dump(FILE *fd){
+ m_pkt_indication.Dump(fd,0);
+}
+
+
+
+
+void CCapFileFlowInfo::save_to_erf(std::string cap_file_name,int pcap){
+ if (Size() ==0) {
+ fprintf(stderr,"ERROR no info for this flow ");
+ return ;
+ }
+ capture_type_e file_type=ERF;
+ if ( pcap ){
+ file_type=LIBPCAP;
+ }
+
+
+ CFileWriterBase * lpWriter=CCapWriterFactory::CreateWriter(file_type,(char *)cap_file_name.c_str());
+ if (lpWriter == NULL) {
+ fprintf(stderr,"ERROR can't create cap file %s ",(char *)cap_file_name.c_str());
+ return ;
+ }
+ int i;
+
+ for (i=0; i<(int)Size(); i++) {
+ CFlowPktInfo * lp=GetPacket((uint32_t)i);
+ bool res=lpWriter->write_packet(lp->m_packet);
+ BP_ASSERT(res);
+ }
+ delete lpWriter;
+}
+
+
+
+struct CTmpFlowPerDirInfo {
+ CTmpFlowPerDirInfo(){
+ m_pkt_id=0;
+ }
+
+ uint16_t m_pkt_id;
+};
+
+class CTmpFlowInfo {
+public:
+ CTmpFlowInfo(){
+ m_max_pkts=0;
+ m_max_aging_sec=0.0;
+ m_last_pkt=0.0;
+
+ }
+ ~CTmpFlowInfo(){
+ }
+public:
+ uint32_t m_max_pkts;
+ dsec_t m_max_aging_sec;
+ dsec_t m_last_pkt;
+
+ CTmpFlowPerDirInfo m_per_dir[CS_NUM];
+};
+
+typedef CTmpFlowInfo * flow_tmp_t;
+typedef std::map<uint16_t, flow_tmp_t> flow_tmp_map_t;
+typedef flow_tmp_map_t::iterator flow_tmp_map_iter_t;
+
+
+
+bool CCapFileFlowInfo::is_valid_template_load_time(std::string & err){
+ err="";
+ int i;
+ for (i=0; i<Size(); i++) {
+ CFlowPktInfo * lp= GetPacket((uint32_t)i);
+ CPacketIndication * lpd=&lp->m_pkt_indication;
+ if ( lpd->getEtherOffset() !=0 ){
+ err=" supported template Ether offset start is 0 \n";
+ return (false);
+ }
+ if ( lpd->getIpOffset() !=14 ){
+ err=" supported template ip offset is 14 \n";
+ return (false);
+ }
+ if ( lpd->is_ipv6() ){
+ if ( lpd->getTcpOffset() != (14+40) ){
+ err=" supported template tcp/udp offset is 54, no ipv6 option header is supported \n";
+ return (false);
+ }
+ }else{
+ if ( lpd->getTcpOffset() != (14+20) ){
+ err=" supported template tcp/udp offset is 34, no ipv4 option is allowed in this version \n";
+ return (false);
+ }
+ }
+ }
+
+ if ( CGlobalInfo::is_learn_mode() ) {
+ if ( GetPacket(0)->m_pkt_indication.m_desc.IsPluginEnable() ) {
+ err="plugins are not supported with --learn mode \n";
+ return(false);
+ }
+ }
+ return(true);
+}
+
+
+/**
+ * update global info
+ * 1. maximum aging
+ * 2. per sub-flow pkt_num/max-pkt per dir and per global
+ */
+void CCapFileFlowInfo::update_info(){
+ flow_tmp_map_iter_t iter;
+ flow_tmp_map_t ft;
+ CTmpFlowInfo * lpFlow;
+ int i;
+ dsec_t ctime=0.0;
+
+ // first iteration, lern all the info into a temp flow table
+ for (i=0; i<Size(); i++) {
+ CFlowPktInfo * lp= GetPacket((uint32_t)i);
+ // extract flow_id
+ CPacketDescriptor * desc=&lp->m_pkt_indication.m_desc;
+ uint16_t flow_id = desc->getFlowId();
+ CPacketDescriptorPerDir * lpCurPacket = desc->GetDirInfo();
+ pkt_dir_t dir=desc->IsInitSide()?CLIENT_SIDE:SERVER_SIDE; // with respect to the first sub-flow in the template
+
+ //update lpFlow
+ iter = ft.find(flow_id);
+ if (iter != ft.end() ) {
+ lpFlow=(*iter).second;
+ }else{
+ lpFlow = new CTmpFlowInfo();
+ assert(lpFlow);
+ ft.insert(flow_tmp_map_t::value_type(flow_id,lpFlow));
+ //add it
+
+ }
+
+ // main info
+ lpCurPacket->SetPktNum(lpFlow->m_per_dir[dir].m_pkt_id);
+ lpFlow->m_max_pkts++;
+ lpFlow->m_per_dir[dir].m_pkt_id++;
+
+ dsec_t delta = ctime - lpFlow->m_last_pkt ;
+ lpFlow->m_last_pkt = ctime;
+ if (delta > lpFlow->m_max_aging_sec) {
+ lpFlow->m_max_aging_sec = delta;
+ }
+ // per direction info
+
+ if (i<Size()) {
+ ctime += lp->m_pkt_indication.m_cap_ipg;
+ }
+ }
+
+
+ for (i=0; i<Size(); i++) {
+ CFlowPktInfo * lp= GetPacket((uint32_t)i);
+
+ CPacketDescriptor * desc=&lp->m_pkt_indication.m_desc;
+ uint16_t flow_id = desc->getFlowId();
+ CPacketDescriptorPerDir * lpCurPacket = desc->GetDirInfo();
+ pkt_dir_t dir=desc->IsInitSide()?CLIENT_SIDE:SERVER_SIDE; // with respect to the first sub-flow in the template
+
+ iter = ft.find(flow_id);
+ assert( iter != ft.end() );
+ lpFlow=(*iter).second;
+
+ if ( (lpFlow->m_per_dir[0].m_pkt_id >0) &&
+ (lpFlow->m_per_dir[1].m_pkt_id >0) ) {
+ /* we have both dir */
+ lp->m_pkt_indication.m_desc.SetBiPluginEnable(true);
+ }
+
+
+ lpCurPacket->SetMaxPkts(lpFlow->m_per_dir[dir].m_pkt_id);
+ lp->m_pkt_indication.m_desc.SetMaxPktsPerFlow(lpFlow->m_max_pkts);
+ lp->m_pkt_indication.m_desc.SetMaxFlowTimeout(lpFlow->m_max_aging_sec);
+ }
+
+
+ /* in case of learn mode , we need to mark the first packet */
+ if ( CGlobalInfo::is_learn_mode() ) {
+ CFlowPktInfo * lp= GetPacket(0);
+ assert(lp);
+ /* only for bi directionl traffic mask the learn flag , only for the first packet */
+ if ( lp->m_pkt_indication.m_desc.IsBiDirectionalFlow() ){
+ lp->mask_as_learn();
+ }
+ }
+
+ if ( ft.empty() )
+ return;
+
+ flow_tmp_map_iter_t it;
+ for (it= ft.begin(); it != ft.end(); ++it) {
+ CTmpFlowInfo *lp = it->second;
+ assert(lp);
+ delete lp;
+ }
+ ft.clear();
+}
+
+
+int CCapFileFlowInfo::load_cap_file(std::string cap_file,uint16_t _id,uint8_t plugin_id){
+ RemoveAll();
+
+ fprintf(stdout," -- loading cap file %s \n",cap_file.c_str());
+ CPacketParser parser;
+ CPacketIndication pkt_indication;
+ CCapReaderBase * lp=CCapReaderFactory::CreateReader((char *)cap_file.c_str(),0);
+
+ if (lp == 0) {
+ printf(" ERROR file %s does not exist or not supported \n",(char *)cap_file.c_str());
+ return (-1);
+ }
+ bool multi_flow_enable =( (plugin_id!=0)?true:false);
+
+
+ CFlowTableMap flow;
+
+ parser.Create();
+ flow.Create(0);
+ m_total_bytes=0;
+ m_total_flows=0;
+ m_total_errors=0;
+ CFlow * first_flow=0;
+ bool first_flow_fif_is_swap=false;
+
+ bool time_was_set=false;
+ double last_time=0.0;
+ CCapPktRaw raw_packet;
+ int cnt=0;
+ while ( true ) {
+ /* read packet */
+ if ( lp->ReadPacket(&raw_packet) ==false ){
+ break;
+ }
+ cnt++;
+
+ if ( !time_was_set ){
+ last_time=raw_packet.get_time();
+ time_was_set=true;
+ }else{
+ if (raw_packet.get_time()<last_time) {
+ printf(" ERROR not valid pcap file,timestamp is negative at packet %d \n",cnt);
+ exit(-1);
+ }
+ last_time=raw_packet.get_time();
+ }
+
+ if ( parser.ProcessPacket(&pkt_indication, &raw_packet) ){
+
+ if ( pkt_indication.m_desc.IsValidPkt() ) {
+ pkt_indication.m_desc.SetPluginEnable(multi_flow_enable);
+ pkt_indication.m_desc.SetPluginId(plugin_id);
+
+ pkt_indication.m_desc.SetId(_id);
+ bool is_fif;
+ CFlow * lpflow=flow.process(pkt_indication.m_flow_key,is_fif);
+ m_total_bytes += pkt_indication.m_packet->pkt_len;
+ pkt_indication.m_cap_ipg = raw_packet.get_time();
+
+ pkt_indication.m_flow =lpflow;
+ pkt_indication.m_desc.SetFlowPktNum(lpflow->pkt_id);
+ /* inc pkt_id inside the flow */
+ lpflow->pkt_id++;
+
+ /* check that we don't have reserve TTL for duplication */
+ uint8_t ttl = pkt_indication.getTTL();
+ if ( (ttl == TTL_RESERVE_DUPLICATE) ||
+ (ttl == (TTL_RESERVE_DUPLICATE-1)) ) {
+ pkt_indication.setTTL(TTL_RESERVE_DUPLICATE-4);
+ }
+
+ if (is_fif) {
+
+ lpflow->flow_id = m_total_flows;
+
+ pkt_indication.m_desc.SetFlowId(lpflow->flow_id);
+
+ if (m_total_flows == 0) {
+ /* first flow */
+ first_flow =lpflow;/* save it for single flow support , to signal error */
+ lpflow->is_fif_swap =pkt_indication.m_desc.IsSwapTuple();
+ first_flow_fif_is_swap = pkt_indication.m_desc.IsSwapTuple();
+ pkt_indication.m_desc.SetInitSide(true);
+ Append(&pkt_indication);
+ m_total_flows++;
+
+ }else{
+ if ( multi_flow_enable ){
+
+ lpflow->is_fif_swap = pkt_indication.m_desc.IsSwapTuple();
+ /* in respect to the first flow */
+
+ bool init_side_in_repect_to_first_flow =
+ ((first_flow_fif_is_swap?true:false) == lpflow->is_fif_swap)?true:false;
+
+ pkt_indication.m_desc.SetInitSide(init_side_in_repect_to_first_flow);
+ Append(&pkt_indication);
+ m_total_flows++;
+
+ }else{
+ printf(" more than one flow in this cap ignore it !! \n");
+ pkt_indication.m_flow_key.Dump(stdout);
+ m_total_errors++;
+ }
+ }
+
+
+ }else{ /* no FIF */
+
+ pkt_indication.m_desc.SetFlowId(lpflow->flow_id);
+
+ if ( multi_flow_enable ==false ){
+
+ if (lpflow == first_flow) {
+ // add to
+ bool init_side=
+ ((lpflow->is_fif_swap?true:false) == pkt_indication.m_desc.IsSwapTuple())?true:false;
+ pkt_indication.m_desc.SetInitSide( init_side );
+ Append(&pkt_indication);
+ }else{
+ //printf(" more than one flow in this cap ignot it !! \n");
+ m_total_errors++;
+ }
+ }else{
+ /* support multi-flow, */
+
+ /* work in respect to first flow */
+ bool init_side=
+ ((first_flow_fif_is_swap?true:false) == pkt_indication.m_desc.IsSwapTuple())?true:false;
+ pkt_indication.m_desc.SetInitSide( init_side );
+ Append(&pkt_indication);
+
+ }
+ }
+ }
+ }
+ }
+
+
+ /* set the last */
+ CFlowPktInfo * last_pkt =GetPacket((uint32_t)(Size()-1));
+ last_pkt->m_pkt_indication.m_desc.SetIsLastPkt(true);
+
+ int i;
+
+ for (i=1; i<Size(); i++) {
+ CFlowPktInfo * lp_prev= GetPacket((uint32_t)i-1);
+ CFlowPktInfo * lp= GetPacket((uint32_t)i);
+
+ lp_prev->m_pkt_indication.m_cap_ipg = lp->m_pkt_indication.m_cap_ipg-
+ lp_prev->m_pkt_indication.m_cap_ipg;
+
+
+
+ if ( lp->m_pkt_indication.m_desc.IsInitSide() !=
+ lp_prev->m_pkt_indication.m_desc.IsInitSide()) {
+ lp_prev->m_pkt_indication.m_desc.SetRtt(true);
+ }
+ }
+
+ GetPacket((uint32_t)Size()-1)->m_pkt_indication.m_cap_ipg=0.0;
+ m_total_errors += parser.m_counter.getTotalErrors();
+
+
+ /* dump the flow */
+ //Dump(stdout);
+
+ //flow.Dump(stdout);
+ flow.Delete();
+ //parser.Dump(stdout);
+ parser.Delete();
+ //fprintf(stdout," -- finish loading cap file \n");
+ //fprintf(stdout,"\n");
+ delete lp;
+ if ( m_total_errors > 0 ) {
+ parser.m_counter.Dump(stdout);
+ printf(" ERORR in one of the cap file, you should have one flow per cap file or valid plugin \n");
+ return(-1);
+ }
+ return (0);
+}
+
+void CCapFileFlowInfo::update_pcap_mode(){
+ int i;
+ for (i=0; i<(int)Size(); i++) {
+ CFlowPktInfo * lp=GetPacket((uint32_t)i);
+ lp->m_pkt_indication.m_desc.SetPcapTiming(true);
+ }
+}
+
+void CCapFileFlowInfo::get_total_memory(CCCapFileMemoryUsage & memory){
+ memory.clear();
+ int i;
+ for (i=0; i<(int)Size(); i++) {
+ CFlowPktInfo * lp=GetPacket((uint32_t)i);
+ if ( lp->m_packet->pkt_len > FIRST_PKT_SIZE ) {
+ memory.add_size(lp->m_packet->pkt_len - FIRST_PKT_SIZE);
+ }
+ }
+}
+
+
+double CCapFileFlowInfo::get_cap_file_length_sec(){
+ dsec_t sum=0.0;
+ int i;
+ for (i=0; i<(int)Size(); i++) {
+ CFlowPktInfo * lp=GetPacket((uint32_t)i);
+ sum+=lp->m_pkt_indication.m_cap_ipg;
+ }
+ return (sum);
+}
+
+
+void CCapFileFlowInfo::update_min_ipg(dsec_t min_ipg,
+ dsec_t override_ipg){
+
+ int i;
+ for (i=0; i<(int)Size(); i++) {
+ CFlowPktInfo * lp=GetPacket((uint32_t)i);
+ if ( lp->m_pkt_indication.m_cap_ipg < min_ipg ){
+ lp->m_pkt_indication.m_cap_ipg=override_ipg;
+ }
+ if ( lp->m_pkt_indication.m_cap_ipg < override_ipg ){
+ lp->m_pkt_indication.m_cap_ipg=override_ipg;
+ }
+ }
+}
+
+
+void CCapFileFlowInfo::Dump(FILE *fd){
+
+
+ int i;
+ //CCapPacket::DumpHeader(fd);
+ for (i=0; i<(int)Size(); i++) {
+ fprintf(fd,"pkt_id : %d \n",i+1);
+ fprintf(fd,"-----------\n");
+ CFlowPktInfo * lp=GetPacket((uint32_t)i);
+ lp->Dump(fd);
+ }
+}
+
+// add pkt indication
+void CCapFileFlowInfo::Append(CPacketIndication * pkt_indication){
+
+ CFlowPktInfo * lp;
+ lp = new CFlowPktInfo();
+ lp->Create( pkt_indication );
+ m_flow_pkts.push_back(lp);
+}
+
+
+
+void CCCapFileMemoryUsage::Add(const CCCapFileMemoryUsage & obj){
+ int i;
+ for (i=0; i<CCCapFileMemoryUsage::MASK_SIZE; i++) {
+ m_buf[i] += obj.m_buf[i];
+ }
+ m_total_bytes +=obj.m_total_bytes;
+
+}
+
+
+void CCCapFileMemoryUsage::dump(FILE *fd){
+ fprintf(fd, " Memory usage \n");
+ int i;
+ int c_size=CCCapFileMemoryUsage::SIZE_MIN;
+ int c_total=0;
+
+ for (i=0; i<CCCapFileMemoryUsage::MASK_SIZE; i++) {
+ fprintf(fd," size_%-7d : %lu \n",c_size,m_buf[i]);
+ c_total +=m_buf[i]*c_size;
+ c_size = c_size*2;
+ }
+ fprintf(fd," Total : %s %.0f%% util due to buckets \n",double_to_human_str(c_total,"bytes",KBYE_1024).c_str(),100.0*float(c_total)/float(m_total_bytes) );
+}
+
+
+bool CCapFileFlowInfo::Create(){
+ m_total_bytes=0;
+ m_total_errors = 0;
+ m_total_flows = 0;
+ return (true);
+}
+
+
+void CCapFileFlowInfo::dump_pkt_sizes(void){
+ int i;
+ for (i=0; i<(int)Size(); i++) {
+ flow_pkt_info_t lp=GetPacket((uint32_t)i);
+ CGenNode node;
+ node.m_dest_ip = 0x10000110;
+ node.m_src_ip = 0x20000110;
+ node.m_src_port = 12;
+ rte_mbuf_t * buf=lp->generate_new_mbuf(&node);
+ //rte_pktmbuf_dump(buf, buf->pkt_len);
+ rte_pktmbuf_free(buf);
+ }
+}
+
+void CCapFileFlowInfo::RemoveAll(){
+ int i;
+ m_total_bytes=0;
+ m_total_errors = 0;
+ m_total_flows = 0;
+ for (i=0; i<(int)Size(); i++) {
+ flow_pkt_info_t lp=GetPacket((uint32_t)i);
+ lp->Delete();
+ delete lp;
+ }
+ // free all the pointers
+ m_flow_pkts.clear();
+}
+
+void CCapFileFlowInfo::Delete(){
+ RemoveAll();
+}
+
+void operator >> (const YAML::Node& node, mac_mapping_t &fi) {
+ utl_yaml_read_ip_addr(node,"ip", fi.ip);
+ const YAML::Node& mac_info = node["mac"];
+ for(unsigned i=0;i<mac_info.size();i++) {
+ const YAML::Node & node_2 =mac_info;
+ uint32_t value;
+ node_2[i] >> value;
+ fi.mac.mac[i] = value;
+ }
+}
+
+void operator >> (const YAML::Node& node, std::map<uint32_t, mac_addr_align_t> &mac_info) {
+ const YAML::Node& mac_node = node["items"];
+ mac_mapping_t mac_mapping;
+ for (unsigned i=0;i<mac_node.size();i++) {
+ mac_node[i] >> mac_mapping;
+ mac_info[mac_mapping.ip] = mac_mapping.mac;
+ }
+}
+
+void operator >> (const YAML::Node& node, CFlowYamlDpPkt & fi) {
+ uint32_t val;
+ node["pkt_id"] >> val;
+ fi.m_pkt_id =(uint8_t)val;
+ node["pyld_offset"] >> val;
+ fi.m_pyld_offset =(uint8_t)val;
+ node["type"] >> val;
+ fi.m_type =(uint8_t)val;
+ node["len"] >> val;
+ fi.m_len =(uint8_t)val;
+ node["mask"] >> val;
+ fi.m_pkt_mask =val;
+}
+
+void operator >> (const YAML::Node& node, CVlanYamlInfo & fi) {
+
+ uint32_t tmp;
+ try {
+ node["enable"] >> tmp ;
+ fi.m_enable=tmp;
+ }catch ( const std::exception& e ) {
+
+ }
+
+ try {
+ node["vlan0"] >> tmp;
+ fi.m_vlan_per_port[0] = tmp;
+ node["vlan1"] >> tmp;
+ fi.m_vlan_per_port[1] = tmp;
+ }catch ( const std::exception& e ) {
+ // there is a default
+
+ }
+}
+
+
+
+void operator >> (const YAML::Node& node, CFlowYamlInfo & fi) {
+ node["name"] >> fi.m_name;
+ node["cps"] >> fi.m_k_cps;
+ fi.m_k_cps = fi.m_k_cps/1000.0;
+ double t;
+ node["ipg"] >> t;
+ fi.m_ipg_sec =t/1000000.0;
+ node["rtt"] >> t;
+ fi.m_rtt_sec = t/1000000.0;
+ node["w"] >> fi.m_w;
+
+ try {
+ node["cap_ipg"] >> fi.m_cap_mode;
+ fi.m_cap_mode_was_set =true;
+ } catch ( const std::exception& e ) {
+ fi.m_cap_mode_was_set =false;
+ }
+
+ try {
+ node["wlength"] >> fi.m_wlength;
+ fi.m_wlength_set=true;
+ } catch ( const std::exception& e ) {
+ fi.m_wlength_set=false;
+ fi.m_wlength =500;
+ }
+
+ try {
+ node["limit"] >> fi.m_limit;
+ fi.m_limit_was_set = true;
+ } catch ( const std::exception& e ) {
+ fi.m_limit_was_set = false;
+ fi.m_limit = 0;
+ }
+
+ try {
+ uint32_t plugin_val;
+ node["plugin_id"] >> plugin_val;
+ fi.m_plugin_id=plugin_val;
+ } catch ( const std::exception& e ) {
+ fi.m_plugin_id=0;
+ }
+
+ fi.m_one_app_server_was_set = false;
+ fi.m_one_app_server = false;
+ if ( utl_yaml_read_ip_addr(node,
+ "server_addr",
+ fi.m_server_addr) ){
+ try {
+ node["one_app_server"] >> fi.m_one_app_server;
+ fi.m_one_app_server_was_set=true;
+ } catch ( const std::exception& e ) {
+ fi.m_one_app_server_was_set = false;
+ fi.m_one_app_server = false;
+ }
+ }
+
+
+
+ if ( ( fi.m_limit_was_set ) && (fi.m_plugin_id !=0) ){
+ fprintf(stderr," limit can't be non zero when plugin is set, you must have only one of the options set");
+ exit(-1);
+ }
+
+
+ try {
+ int i;
+ const YAML::Node& dyn_pyload = node["dyn_pyload"];
+ for(unsigned i=0;i<dyn_pyload.size();i++) {
+ CFlowYamlDpPkt fd;
+ dyn_pyload[i] >> fd;
+ if ( fi.m_dpPkt == 0 ){
+ fi.m_dpPkt = new CFlowYamlDynamicPyloadPlugin();
+ if (fi.m_plugin_id == 0) {
+ fi.m_plugin_id = mpDYN_PYLOAD;
+ }else{
+ fprintf(stderr," plugin should be zero with dynamic pyload program");
+ exit(-1);
+ }
+ }
+
+ fd.Dump(stdout);
+
+ fi.m_dpPkt->Add(fd);
+ printf(" here ");
+ }
+ } catch ( const std::exception& e ) {
+ fi.m_dpPkt=0;
+ }
+}
+
+
+
+void operator >> (const YAML::Node& node, CFlowsYamlInfo & flows_info) {
+
+ node["duration"] >> flows_info.m_duration_sec;
+
+ try {
+ node["generator"] >> flows_info.m_tuple_gen;
+ flows_info.m_tuple_gen_was_set =true;
+ } catch ( const std::exception& e ) {
+ flows_info.m_tuple_gen_was_set =false;
+ }
+
+
+ // m_ipv6_set will be true if and only if both src_ipv6
+ // and dst_ipv6 are provided. These are used to set
+ // the most significant 96-bits of the IPv6 address; the
+ // least significant 32-bits come from the ipv4 address
+ // (what is set above).
+ //
+ // If the IPv6 src/dst is not provided in the yaml file,
+ // then the most significant 96-bits will be set to 0
+ // which represents an IPv4-compatible IPv6 address.
+ //
+ // If desired, an IPv4-mapped IPv6 address can be
+ // formed by providing src_ipv6,dst_ipv6 and specifying
+ // {0,0,0,0,0,0xffff}
+ flows_info.m_ipv6_set=true;
+ try {
+ const YAML::Node& src_ipv6_info = node["src_ipv6"];
+ if (src_ipv6_info.size() == 6 ){
+ for(unsigned i=0;i<src_ipv6_info.size();i++) {
+ uint32_t fi;
+ const YAML::Node & node =src_ipv6_info;
+ node[i] >> fi;
+ flows_info.m_src_ipv6.push_back(fi);
+ }
+ }else{
+ flows_info.m_ipv6_set=false;
+ }
+ } catch ( const std::exception& e ) {
+ flows_info.m_ipv6_set=false;
+ }
+
+ try {
+ const YAML::Node& dst_ipv6_info = node["dst_ipv6"];
+ if (dst_ipv6_info.size() == 6 ){
+ for(unsigned i=0;i<dst_ipv6_info.size();i++) {
+ uint32_t fi;
+ const YAML::Node & node =dst_ipv6_info;
+ node[i] >> fi;
+ flows_info.m_dst_ipv6.push_back(fi);
+ }
+ }else{
+ flows_info.m_ipv6_set=false;
+ }
+ } catch ( const std::exception& e ) {
+ flows_info.m_ipv6_set=false;
+ }
+
+ try {
+ node["cap_ipg"] >> flows_info.m_cap_mode;
+ flows_info.m_cap_mode_set=true;
+ } catch ( const std::exception& e ) {
+ flows_info.m_cap_mode=false;
+ flows_info.m_cap_mode_set=false;
+ }
+ double t;
+
+ try {
+ node["cap_ipg_min"] >> t ;
+ flows_info.m_cap_ipg_min = t/1000000.0;
+ flows_info.m_cap_ipg_min_set=true;
+ } catch ( const std::exception& e ) {
+ flows_info.m_cap_ipg_min_set=false;
+ flows_info.m_cap_ipg_min = 20;
+ }
+
+ try {
+ node["cap_override_ipg"] >> t;
+ flows_info.m_cap_overide_ipg = t/1000000.0;
+ flows_info.m_cap_overide_ipg_set = true;
+ } catch ( const std::exception& e ) {
+ flows_info.m_cap_overide_ipg_set = false;
+ flows_info.m_cap_overide_ipg = 0;
+ }
+
+ try {
+ node["wlength"] >> flows_info.m_wlength;
+ flows_info.m_wlength_set=true;
+ } catch ( const std::exception& e ) {
+ flows_info.m_wlength_set=false;
+ flows_info.m_wlength =100;
+ }
+
+ try {
+ node["one_app_server"] >> flows_info.m_one_app_server;
+ flows_info.m_one_app_server_was_set=true;
+ } catch ( const std::exception& e ) {
+ flows_info.m_one_app_server =false;
+ flows_info.m_one_app_server_was_set=false;
+ }
+
+ try {
+ node["vlan"] >> flows_info.m_vlan_info;
+ } catch ( const std::exception& e ) {
+ }
+
+ try {
+ node["mac_override_by_ip"] >> flows_info.m_mac_replace_by_ip;
+ } catch ( const std::exception& e ) {
+ flows_info.m_mac_replace_by_ip =false;
+ }
+
+
+ const YAML::Node& mac_info = node["mac"];
+ for(unsigned i=0;i<mac_info.size();i++) {
+ uint32_t fi;
+ const YAML::Node & node =mac_info;
+ node[i] >> fi;
+ flows_info.m_mac_base.push_back(fi);
+ }
+
+ const YAML::Node& cap_info = node["cap_info"];
+ for(unsigned i=0;i<cap_info.size();i++) {
+ CFlowYamlInfo fi;
+ cap_info[i] >> fi;
+ flows_info.m_vec.push_back(fi);
+ }
+}
+
+void CVlanYamlInfo::Dump(FILE *fd){
+ fprintf(fd," vlan enable : %d \n",m_enable);
+ fprintf(fd," vlan val : %d ,%d \n",m_vlan_per_port[0],m_vlan_per_port[1]);
+}
+
+
+void CFlowsYamlInfo::Dump(FILE *fd){
+ fprintf(fd," duration : %f sec \n",m_duration_sec);
+ m_tuple_gen.Dump(fd);
+
+ fprintf(fd,"\n");
+ if (CGlobalInfo::is_ipv6_enable()) {
+ int idx;
+ fprintf(fd," src_ipv6 : ");
+ for (idx=0; idx<5; idx++){
+ fprintf(fd,"%04x:", CGlobalInfo::m_options.m_src_ipv6[idx]);
+ }
+ fprintf(fd,"%04x\n", CGlobalInfo::m_options.m_src_ipv6[5]);
+ fprintf(fd," dst_ipv6 : ");
+ for (idx=0; idx<5; idx++){
+ fprintf(fd,"%04x:", CGlobalInfo::m_options.m_dst_ipv6[idx]);
+ }
+ fprintf(fd,"%04x\n", CGlobalInfo::m_options.m_dst_ipv6[5]);
+ }
+ if ( !m_cap_mode_set ) {
+ fprintf(fd," cap_ipg : wasn't set \n");
+ }else{
+ fprintf(fd," cap_ipg : %d \n",m_cap_mode?1:0);
+ }
+
+ if ( !m_cap_ipg_min_set ){
+ fprintf(fd," cap_ipg_min : wasn't set \n");
+ }else{
+ fprintf(fd," cap_ipg_min : %f \n",m_cap_ipg_min);
+ }
+
+ if ( !m_cap_overide_ipg_set ){
+ fprintf(fd," cap_override_ipg : wasn't set \n");
+ }else{
+ fprintf(fd," cap_override_ipg : %f \n",m_cap_overide_ipg);
+ }
+
+ if ( !m_wlength_set ){
+ fprintf(fd," wlength : wasn't set \n");
+ }else{
+ fprintf(fd," m_wlength : %d \n",m_wlength);
+ }
+ fprintf(fd," one_server_for_application : %d \n",m_one_app_server?1:0);
+ fprintf(fd," one_server_for_application_was_set : %d \n",m_one_app_server_was_set?1:0);
+
+ m_vlan_info.Dump(fd);
+
+ fprintf(fd," mac base : ");
+ int i;
+ for (i=0; i<(int)m_mac_base.size(); i++) {
+ if (i< (int)(m_mac_base.size()-1) ) {
+ fprintf(fd,"0x%02x,",m_mac_base[i]);
+ }else{
+ fprintf(fd,"0x%02x",m_mac_base[i]);
+ }
+ }
+ fprintf(fd,"\n");
+
+ fprintf(fd," cap file info \n");
+ fprintf(fd," ------------- \n");
+ for (i=0; i<(int)m_vec.size(); i++) {
+ m_vec[i].Dump(fd);
+ }
+}
+
+
+/*
+
+example for YAML file
+
+- duration : 10.0
+ cap_info :
+ - name: hey1.pcap
+ cps : 12.0
+ ipg : 0.0001
+ - name: hey2.pcap
+ cps : 11.0
+ ipg : 0.0001
+
+
+*/
+
+bool CFlowsYamlInfo::verify_correctness(uint32_t num_threads) {
+ if ( m_tuple_gen_was_set ==false ){
+ printf(" ERROR there must be a generator field in YAML , the old format is deprecated \n");
+ printf(" This is not supported : \n");
+ printf(" min_src_ip : 0x10000001 \n");
+ printf(" max_src_ip : 0x50000001 \n");
+ printf(" min_dst_ip : 0x60000001 \n");
+ printf(" max_dst_ip : 0x60000010 \n");
+ printf(" This is supported : \n");
+ printf("generator : \n");
+ printf(" distribution : \"seq\" \n");
+ printf(" clients_start : \"16.0.0.1\" \n");
+ printf(" clients_end : \"16.0.1.255\" \n");
+ printf(" servers_start : \"48.0.0.1\" \n");
+ printf(" servers_end : \"48.0.0.255\" \n");
+ printf(" clients_per_gb : 201 \n");
+ printf(" min_clients : 101 \n");
+ printf(" dual_port_mask : \"1.0.0.0\" \n");
+ printf(" tcp_aging : 1 \n");
+ printf(" udp_aging : 1 \n");
+ return(false);
+ }
+ if ( !m_tuple_gen.is_valid(num_threads,is_any_plugin_configured()) ){
+ return (false);
+ }
+
+ return(true);
+}
+
+
+
+int CFlowsYamlInfo::load_from_yaml_file(std::string file_name){
+ m_vec.clear();
+
+ if ( !utl_is_file_exists (file_name) ){
+ printf(" ERROR file %s does not exist \n",file_name.c_str());
+ exit(-1);
+ }
+
+ try {
+ std::ifstream fin((char *)file_name.c_str());
+ YAML::Parser parser(fin);
+ YAML::Node doc;
+
+ parser.GetNextDocument(doc);
+ for(unsigned i=0;i<doc.size();i++) {
+ doc[i] >> *this;
+ break;
+ }
+ } catch ( const std::exception& e ) {
+ std::cout << e.what() << "\n";
+ exit(-1);
+ }
+
+ /* update from user input */
+ if (CGlobalInfo::m_options.m_duration > 0.1) {
+ m_duration_sec = CGlobalInfo::m_options.m_duration;
+ }
+ int i;
+ m_is_plugin_configured=false;
+ for (i=0; i<(int)m_vec.size(); i++) {
+ m_vec[i].m_k_cps =m_vec[i].m_k_cps*CGlobalInfo::m_options.m_factor;
+ if (( ! m_vec[i].m_cap_mode_was_set ) && (m_cap_mode_set ) ){
+ m_vec[i].m_cap_mode = m_cap_mode;
+ }
+ if (( ! m_vec[i].m_wlength_set ) && (m_wlength_set ) ){
+ m_vec[i].m_wlength = m_wlength;
+ }
+
+ if (( ! m_vec[i].m_one_app_server_was_set ) && (m_one_app_server_was_set ) ){
+ m_vec[i].m_one_app_server = m_one_app_server;
+ }
+
+ if ( m_cap_overide_ipg_set ){
+ m_vec[i].m_ipg_sec = m_cap_overide_ipg;
+ m_vec[i].m_rtt_sec = m_cap_overide_ipg;
+ }
+
+ if ( m_vec[i].m_plugin_id ){
+ m_is_plugin_configured=true;
+ }
+ }
+ return 0;
+}
+
+
+
+void CFlowStats::Clear(){
+
+ m_id=0;
+ m_name="";
+ m_pkt=0.0;
+ m_bytes=0.0;
+ m_cps=0.0;
+ m_mb_sec=0.0;
+ m_mB_sec=0.0;
+ m_c_flows=0.0;
+ m_pps =0.0;
+ m_total_Mbytes=00 ;
+ m_errors =0;
+ m_flows =0 ;
+ m_memory.clear();
+}
+
+void CFlowStats::Add(const CFlowStats & obj){
+
+ m_pkt += obj.m_pkt ;
+ m_bytes += obj.m_bytes ;
+ m_cps += obj.m_cps ;
+ m_mb_sec += obj.m_mb_sec ;
+ m_mB_sec += obj.m_mB_sec ;
+ m_c_flows += obj.m_c_flows ;
+ m_pps += obj.m_pps ;
+ m_total_Mbytes +=obj.m_total_Mbytes ;
+ m_errors +=obj.m_errors;
+ m_flows +=obj.m_flows ;
+
+ m_memory.Add(obj.m_memory);
+}
+
+
+void CFlowStats::DumpHeader(FILE *fd){
+ fprintf(fd," %2s,%-40s,%4s,%4s,%5s,%7s,%9s,%9s,%9s,%10s,%5s,%7s,%4s,%4s \n",
+ "id","name","tps","cps","f-pkts","f-bytes","duration","Mb/sec","MB/sec","c-flows","PPS","total-Mbytes-duration","errors","flows");
+}
+void CFlowStats::Dump(FILE *fd){
+ //"name","cps","f-pkts","f-bytes","Mb/sec","MB/sec","c-flows","PPS","total-Mbytes-duration","errors","flows"
+ fprintf(fd," %02d, %-40s ,%4.2f,%4.2f, %5.0f , %7.0f ,%7.2f ,%7.2f , %7.2f , %10.0f , %5.0f , %7.0f , %llu , %llu \n",
+ m_id,m_name.c_str(),m_cps,get_normal_cps(),
+ m_pkt,m_bytes,duration_sec,m_mb_sec,m_mB_sec,m_c_flows,m_pps,m_total_Mbytes,m_errors,m_flows);
+}
+
+bool CFlowGeneratorRecPerThread::Create(CTupleGeneratorSmart * global_gen,
+ CFlowYamlInfo * info,
+ CFlowsYamlInfo * yaml_flow_info,
+ CCapFileFlowInfo * flow_info,
+ uint16_t _id,
+ uint32_t thread_id ){
+
+ BP_ASSERT(info);
+ m_thread_id =thread_id ;
+
+ tuple_gen.Create(global_gen);
+ CTupleGenYamlInfo * lpt=&yaml_flow_info->m_tuple_gen;
+
+ tuple_gen.SetSingleServer(info->m_one_app_server,
+ info->m_server_addr,
+ getDualPortId(thread_id),
+ lpt->m_dual_interface_mask
+ );
+
+ tuple_gen.SetW(info->m_w);
+
+
+
+ m_id =_id;
+ m_info =info;
+ m_flows_info = yaml_flow_info;
+ // set policer give bucket size for bursts
+ m_policer.set_cir(info->m_k_cps*1000.0);
+ m_policer.set_level(0.0);
+ m_policer.set_bucket_size(100.0);
+ /* pointer to global */
+ m_flow_info = flow_info;
+ return (true);
+}
+
+
+void CFlowGeneratorRecPerThread::Delete(){
+ tuple_gen.Delete();
+}
+
+
+
+
+void CFlowGeneratorRecPerThread::Dump(FILE *fd){
+ fprintf(fd," configuration info ");
+ fprintf(fd," -----------------");
+ m_info->Dump(fd);
+ fprintf(fd," -----------------");
+ m_flow_info->Dump(fd);
+}
+
+
+void CFlowGeneratorRecPerThread::getFlowStats(CFlowStats * stats){
+
+ double t_pkt=(double)m_flow_info->Size();
+ double t_bytes=(double)m_flow_info->get_total_bytes();
+ double cps=m_info->m_k_cps *1000.0;
+ double mb_sec = (cps*t_bytes*8.0)/(_1Mb_DOUBLE);
+ double mB_sec = (cps*t_bytes)/(_1Mb_DOUBLE);
+
+ double c_flow_windows_sec=0.0;
+
+ if (m_info->m_cap_mode) {
+ c_flow_windows_sec = m_flow_info->get_cap_file_length_sec();
+ }else{
+ c_flow_windows_sec = t_pkt * m_info->m_ipg_sec;
+ }
+
+
+ double c_flows = cps*c_flow_windows_sec*m_flow_info->get_total_flows();
+ double pps =cps*t_pkt;
+ double total_Mbytes = mB_sec * m_flows_info->m_duration_sec;
+ uint64_t errors = m_flow_info->get_total_errors();
+ uint64_t flows = m_flow_info->get_total_flows();
+
+
+ stats->m_id = m_id;
+ stats->m_pkt = t_pkt;
+ stats->m_bytes = t_bytes;
+ stats->duration_sec = c_flow_windows_sec;
+ stats->m_name = m_info->m_name.c_str();
+ stats->m_cps = cps;
+ stats->m_mb_sec = mb_sec;
+ stats->m_mB_sec = mB_sec;
+ stats->m_c_flows = c_flows;
+ stats->m_pps = pps;
+ stats->m_total_Mbytes = total_Mbytes;
+ stats->m_errors = errors;
+ stats->m_flows = flows;
+}
+
+
+
+void CFlowGeneratorRec::Dump(FILE *fd){
+ fprintf(fd," configuration info ");
+ fprintf(fd," -----------------");
+ m_info->Dump(fd);
+ fprintf(fd," -----------------");
+ m_flow_info.Dump(fd);
+}
+
+
+void CFlowGeneratorRec::getFlowStats(CFlowStats * stats){
+
+ double t_pkt=(double)m_flow_info.Size();
+ double t_bytes=(double)m_flow_info.get_total_bytes();
+ double cps=m_info->m_k_cps *1000.0;
+ double mb_sec = (cps*t_bytes*8.0)/(_1Mb_DOUBLE);
+ double mB_sec = (cps*t_bytes)/(_1Mb_DOUBLE);
+
+ double c_flow_windows_sec=0.0;
+
+ if (m_info->m_cap_mode) {
+ c_flow_windows_sec = m_flow_info.get_cap_file_length_sec();
+ }else{
+ c_flow_windows_sec = t_pkt * m_info->m_ipg_sec;
+ }
+
+ m_flow_info.get_total_memory(stats->m_memory);
+
+
+ double c_flows = cps*c_flow_windows_sec;
+ double pps =cps*t_pkt;
+ double total_Mbytes = mB_sec * m_flows_info->m_duration_sec;
+ uint64_t errors = m_flow_info.get_total_errors();
+ uint64_t flows = m_flow_info.get_total_flows();
+
+
+ stats->m_id = m_id;
+ stats->m_pkt = t_pkt;
+ stats->m_bytes = t_bytes;
+ stats->duration_sec = c_flow_windows_sec;
+ stats->m_name = m_info->m_name.c_str();
+ stats->m_cps = cps;
+ stats->m_mb_sec = mb_sec;
+ stats->m_mB_sec = mB_sec;
+ stats->m_c_flows = c_flows;
+ stats->m_pps = pps;
+ stats->m_total_Mbytes = total_Mbytes;
+ stats->m_errors = errors;
+ stats->m_flows = flows;
+}
+
+
+void CFlowGeneratorRec::fixup_ipg_if_needed(void){
+ if ( m_flows_info->m_cap_mode ) {
+ m_flow_info.update_pcap_mode();
+ }
+
+ if ( (m_flows_info->m_cap_mode) &&
+ (m_flows_info->m_cap_ipg_min_set) &&
+ (m_flows_info->m_cap_overide_ipg_set)
+ ){
+ m_flow_info.update_min_ipg(m_flows_info->m_cap_ipg_min,
+ m_flows_info->m_cap_overide_ipg);
+ }
+}
+
+
+bool CFlowGeneratorRec::Create(CFlowYamlInfo * info,
+ CFlowsYamlInfo * flows_info,
+ uint16_t _id){
+ BP_ASSERT(info);
+ m_id=_id;
+ m_info=info;
+ m_flows_info=flows_info;
+ m_flow_info.Create();
+
+ // set policer give bucket size for bursts
+ m_policer.set_cir(info->m_k_cps*1000.0);
+ m_policer.set_level(0.0);
+ m_policer.set_bucket_size(100.0);
+
+ int res=m_flow_info.load_cap_file(info->m_name.c_str(),_id,m_info->m_plugin_id);
+ if ( res==0 ) {
+ fixup_ipg_if_needed();
+ std::string err;
+ /* verify that template are valid */
+ bool is_valid=m_flow_info.is_valid_template_load_time(err);
+ if (!is_valid) {
+ printf("\n ERROR template file is not valid '%s' \n",err.c_str());
+ return (false);
+ }
+ m_flow_info.update_info();
+ return (true);
+ }else{
+ return (false);
+ }
+}
+
+void CFlowGeneratorRec::Delete(){
+ m_flow_info.Delete();
+}
+
+
+void CGenNode::DumpHeader(FILE *fd){
+ fprintf(fd," pkt_id,time,fid,pkt_info,pkt,len,type,is_init,is_last,type,thread_id,src_ip,dest_ip,src_port \n");
+}
+
+void CGenNode::Dump(FILE *fd){
+ fprintf(fd,"%.6f,%llx,%p,%llu,%d,%d,%d,%d,%d,%d,%x,%x,%d\n",m_time,m_flow_id,m_pkt_info,
+ m_pkt_info->m_pkt_indication.m_packet->pkt_cnt,
+ m_pkt_info->m_pkt_indication.m_packet->pkt_len,
+ m_pkt_info->m_pkt_indication.m_desc.getId(),
+ (m_pkt_info->m_pkt_indication.m_desc.IsInitSide()?1:0),
+ m_pkt_info->m_pkt_indication.m_desc.IsLastPkt(),
+ m_type,
+ m_thread_id,
+ m_src_ip,
+ m_dest_ip,
+ m_src_port
+
+
+
+ );
+
+}
+
+void CNodeGenerator::set_vif(CVirtualIF * v_if){
+ m_v_if = v_if;
+}
+
+bool CNodeGenerator::Create(CFlowGenListPerThread * parent){
+ m_v_if =0;
+ m_parent=parent;
+ m_socket_id =0;
+ m_is_realtime =CGlobalInfo::is_realtime();
+ m_realtime_his.Create();
+ return(true);
+}
+
+void CNodeGenerator::Delete(){
+ m_realtime_his.Delete();
+}
+
+
+void CNodeGenerator::add_node(CGenNode * mynode){
+ m_p_queue.push(mynode);
+}
+
+
+
+void CNodeGenerator::remove_all(CFlowGenListPerThread * thread){
+ CGenNode *node;
+ while (!m_p_queue.empty()) {
+ node = m_p_queue.top();
+ m_p_queue.pop();
+ thread->free_node( node);
+ }
+}
+
+int CNodeGenerator::open_file(std::string file_name,
+ CPreviewMode * preview_mode){
+ BP_ASSERT(m_v_if);
+ m_preview_mode =*preview_mode;
+ /* ser preview mode */
+ m_v_if->set_review_mode(preview_mode);
+ m_v_if->open_file(file_name);
+ m_cnt = 1;
+ return (0);
+}
+
+int CNodeGenerator::close_file(CFlowGenListPerThread * thread){
+ remove_all(thread);
+ BP_ASSERT(m_v_if);
+ m_v_if->close_file();
+ return (0);
+}
+
+int CNodeGenerator::flush_one_node_to_file(CGenNode * node){
+ BP_ASSERT(m_v_if);
+ return (m_v_if->send_node(node));
+}
+
+int CNodeGenerator::update_stats(CGenNode * node){
+ if ( m_preview_mode.getVMode() >2 ){
+ fprintf(stdout," %llu ,",m_cnt);
+ node->Dump(stdout);
+ m_cnt++;
+ }
+ return (0);
+}
+
+
+bool CFlowGenListPerThread::Create(uint32_t thread_id,
+ uint32_t core_id,
+ CFlowGenList * flow_list,
+ uint32_t max_threads){
+
+
+ m_flow_list =flow_list;
+ m_core_id= core_id;
+ m_tcp_dpc= 0;
+ m_udp_dpc=0;
+ m_max_threads=max_threads;
+ m_thread_id=thread_id;
+
+ m_cpu_cp_u.Create(&m_cpu_dp_u);
+
+ uint32_t socket_id=rte_lcore_to_socket_id(m_core_id);
+
+ char name[100];
+ sprintf(name,"nodes-%d",m_core_id);
+ printf(" create thread %d %s socket: %d \n",m_core_id,name,socket_id);
+ m_node_pool = utl_rte_mempool_create_non_pkt(name,
+ CGlobalInfo::m_memory_cfg.get_each_core_dp_flows(),
+ sizeof(CGenNode),
+ 128,
+ 0 ,
+ socket_id);
+
+ printf(" pool %p \n",m_node_pool);
+ m_node_gen.Create(this);
+ m_flow_id_to_node_lookup.Create();
+
+ /* split the clients to threads */
+ CTupleGenYamlInfo * tuple_gen = &m_flow_list->m_yaml_info.m_tuple_gen;
+
+ /* split the clients to threads using the mask */
+ CClientPortion portion;
+ split_clients(m_thread_id,
+ m_max_threads,
+ getDualPortId(),
+ *tuple_gen,
+ portion);
+
+ init_from_global(portion);
+ m_smart_gen.Create(0,m_thread_id,
+ cdSEQ_DIST,
+ portion.m_client_start,
+ portion.m_client_end,
+ portion.m_server_start,
+ portion.m_server_end,
+ get_longest_flow(),
+ get_total_kcps()*1000,
+ m_flow_list);
+
+
+ CMessagingManager * rx_dp=CMsgIns::Ins()->getRxDp();
+
+ m_ring_from_rx = rx_dp->getRingCpToDp(thread_id);
+ m_ring_to_rx =rx_dp->getRingDpToCp(thread_id);
+
+ assert(m_ring_from_rx);
+ assert(m_ring_to_rx);
+ return (true);
+}
+
+/* return the client ip , port */
+FORCE_NO_INLINE void CFlowGenListPerThread::handler_defer_job(CGenNode *p){
+ CGenNodeDeferPort * defer=(CGenNodeDeferPort *)p;
+ int i;
+ for (i=0; i<defer->m_cnt; i++) {
+ m_smart_gen.FreePort(defer->m_clients[i],defer->m_ports[i]);
+ }
+}
+
+FORCE_NO_INLINE void CFlowGenListPerThread::handler_defer_job_flush(void){
+ /* flush the pending job of free ports */
+ if (m_tcp_dpc) {
+ handler_defer_job((CGenNode *)m_tcp_dpc);
+ free_node((CGenNode *)m_tcp_dpc);
+ m_tcp_dpc=0;
+ }
+ if (m_udp_dpc) {
+ handler_defer_job((CGenNode *)m_udp_dpc);
+ free_node((CGenNode *)m_udp_dpc);
+ m_udp_dpc=0;
+ }
+}
+
+
+void CFlowGenListPerThread::defer_client_port_free(bool is_tcp,
+ uint32_t c_ip,
+ uint16_t port){
+ /* free is not required in this case */
+ if (!m_smart_gen.IsFreePortRequired() ){
+ return;
+ }
+ CGenNodeDeferPort * defer;
+ if (is_tcp) {
+ if (CGlobalInfo::m_options.m_tcp_aging==0) {
+ m_smart_gen.FreePort(c_ip,port);
+ return;
+ }
+ defer=get_tcp_defer();
+ }else{
+ if (CGlobalInfo::m_options.m_udp_aging==0) {
+ m_smart_gen.FreePort(c_ip,port);
+ return;
+ }
+ defer=get_udp_defer();
+ }
+ if ( defer->add_client(c_ip,port) ){
+ if (is_tcp) {
+ m_node_gen.schedule_node((CGenNode *)defer,CGlobalInfo::m_options.m_tcp_aging);
+ m_tcp_dpc=0;
+ }else{
+ m_node_gen.schedule_node((CGenNode *)defer,CGlobalInfo::m_options.m_udp_aging);
+ m_udp_dpc=0;
+ }
+ }
+}
+
+
+void CFlowGenListPerThread::defer_client_port_free(CGenNode *p){
+ defer_client_port_free(p->m_pkt_info->m_pkt_indication.m_desc.IsTcp(),p->m_src_ip,p->m_src_port);
+}
+
+
+
+/* copy all info from global and div by num of threads */
+void CFlowGenListPerThread::init_from_global(CClientPortion& portion){
+ /* copy generator , it is the same */
+ m_yaml_info =m_flow_list->m_yaml_info;
+
+ /* copy first the flow info */
+ int i;
+ for (i=0; i<(int)m_flow_list->m_cap_gen.size(); i++) {
+ CFlowGeneratorRec * lp=m_flow_list->m_cap_gen[i];
+ CFlowGeneratorRecPerThread * lp_thread=new CFlowGeneratorRecPerThread();
+ /* TBD leak of memory */
+ CFlowYamlInfo * yaml_info =new CFlowYamlInfo();
+
+ yaml_info->m_name = lp->m_info->m_name;
+ yaml_info->m_k_cps = lp->m_info->m_k_cps/(double)m_max_threads;
+ yaml_info->m_ipg_sec = lp->m_info->m_ipg_sec;
+ yaml_info->m_rtt_sec = lp->m_info->m_rtt_sec;
+ yaml_info->m_w = lp->m_info->m_w;
+ yaml_info->m_cap_mode =lp->m_info->m_cap_mode;
+ yaml_info->m_wlength =lp->m_info->m_wlength;
+ yaml_info->m_plugin_id = lp->m_info->m_plugin_id;
+ yaml_info->m_one_app_server = lp->m_info->m_one_app_server;
+ yaml_info->m_server_addr = lp->m_info->m_server_addr;
+ yaml_info->m_dpPkt =lp->m_info->m_dpPkt;
+
+ /* fix this */
+ assert(m_max_threads>0);
+ if ( m_max_threads == 1 ) {
+ /* we have one thread the limit */
+ yaml_info->m_limit = lp->m_info->m_limit;
+ }else{
+ yaml_info->m_limit = lp->m_info->m_limit/m_max_threads;
+ /* thread is zero base */
+ if ( m_thread_id == 0){
+ yaml_info->m_limit += lp->m_info->m_limit % m_max_threads;
+ }
+ if (yaml_info->m_limit==0) {
+ yaml_info->m_limit=1;
+ }
+ }
+
+ yaml_info->m_limit_was_set = lp->m_info->m_limit_was_set;
+ yaml_info->m_flowcnt = 0;
+ yaml_info->m_restart_time = ( yaml_info->m_limit_was_set ) ?
+ (yaml_info->m_limit / (yaml_info->m_k_cps * 1000.0)) : 0;
+
+
+ lp_thread->Create( &m_smart_gen,
+ yaml_info,
+ lp->m_flows_info,
+ &lp->m_flow_info,
+ lp->m_id,
+ m_thread_id);
+
+ m_cap_gen.push_back(lp_thread);
+ }
+}
+
+static void free_map_flow_id_to_node(CGenNode *p){
+ CGlobalInfo::free_node(p);
+}
+
+
+void CFlowGenListPerThread::Delete(){
+
+ // free all current maps
+ m_flow_id_to_node_lookup.remove_all(free_map_flow_id_to_node);
+ // free object
+ m_flow_id_to_node_lookup.Delete();
+
+ m_smart_gen.Delete();
+ m_node_gen.Delete();
+ Clean();
+ m_cpu_cp_u.Delete();
+}
+
+
+
+void CFlowGenListPerThread::Clean(){
+ int i;
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
+ lp->Delete();
+ delete lp;
+ }
+ m_cap_gen.clear();
+}
+
+//uint64_t _start_time;
+
+void CNodeGenerator::dump_json(std::string & json){
+
+ json="{\"name\":\"tx-gen\",\"type\":0,\"data\":{";
+ m_realtime_his.dump_json("realtime-hist",json);
+ json+="\"unknown\":0}}" ;
+}
+
+
+int CNodeGenerator::flush_file(dsec_t max_time,
+ dsec_t d_time,
+ bool always,
+ CFlowGenListPerThread * thread,
+ double &old_offset){
+ CGenNode * node;
+ dsec_t flush_time=now_sec();
+ dsec_t offset=0.0;
+ dsec_t n_time;
+ if (always) {
+ offset=old_offset;
+ }
+ uint32_t events=0;
+ bool done=false;
+
+ thread->m_cpu_dp_u.start_work();
+ while (!m_p_queue.empty()) {
+ node = m_p_queue.top();
+ n_time = node->m_time+ offset;
+
+ if (( (n_time) > max_time ) &&
+ (always==false) ) {
+ /* nothing to do */
+ break;
+ }
+ events++;
+/*#ifdef VALG
+ if (events > 1 ) {
+ CALLGRIND_START_INSTRUMENTATION;
+ }
+#endif*/
+
+ if ( likely ( m_is_realtime ) ){
+ dsec_t dt ;
+ thread->m_cpu_dp_u.commit();
+ bool once=false;
+
+ while ( true ) {
+ dt = now_sec() - n_time ;
+
+ if (dt> (-0.00003)) {
+ break;
+ }
+
+ if (!once) {
+ /* check the msg queue once */
+ thread->check_msgs();
+ once=true;
+ }
+
+ rte_pause();
+ }
+ thread->m_cpu_dp_u.start_work();
+
+ /* add offset in case of faliures more than 100usec */
+ if ( unlikely( dt > 0.000100 ) ) {
+ offset += dt;
+ }
+ /* update histogram */
+ if ( unlikely( events % 16 ) ==0 ) {
+ m_realtime_his.Add(dt);
+ }
+ /* flush evey 10 usec */
+ if ( now_sec() - flush_time > 0.00001 ){
+ m_v_if->flush_tx_queue();
+ flush_time=now_sec();
+ }
+ }
+ #ifndef RTE_DPDK
+ thread->check_msgs();
+ #endif
+
+ uint8_t type=node->m_type;
+
+ if ( likely( type == CGenNode::FLOW_PKT ) ) {
+ /* PKT */
+ if ( !(node->is_repeat_flow()) || (always==false)) {
+ flush_one_node_to_file(node);
+ #ifdef _DEBUG
+ update_stats(node);
+ #endif
+ }
+ m_p_queue.pop();
+ if ( node->is_last_in_flow() ) {
+ if ((node->is_repeat_flow()) && (always==false)) {
+ /* Flow is repeated, reschedule it */
+ thread->reschedule_flow( node);
+ }else{
+ /* Flow will not be repeated, so free node */
+ thread->free_last_flow_node( node);
+ }
+ }else{
+ node->update_next_pkt_in_flow();
+ m_p_queue.push(node);
+ }
+ }else{
+ if ((type == CGenNode::FLOW_FIF)) {
+ /* callback to our method */
+ m_p_queue.pop();
+ if ( always == false) {
+ thread->m_cur_time_sec = node->m_time ;
+
+ if ( thread->generate_flows_roundrobin(&done) <0){
+ break;
+ }
+ if (!done) {
+ node->m_time +=d_time;
+ m_p_queue.push(node);
+ }else{
+ thread->free_node(node);
+ }
+ }else{
+ thread->free_node(node);
+ }
+
+ }else{
+ handle_slow_messages(type,node,thread,always);
+ }
+ }
+ }
+
+
+ if (!always) {
+ old_offset =offset;
+ }else{
+ // free the left other
+ thread->handler_defer_job_flush();
+ }
+ return (0);
+}
+
+void CNodeGenerator::handle_slow_messages(uint8_t type,
+ CGenNode * node,
+ CFlowGenListPerThread * thread,
+ bool always){
+
+ if (unlikely (type == CGenNode::FLOW_DEFER_PORT_RELEASE) ) {
+ m_p_queue.pop();
+ thread->handler_defer_job(node);
+ thread->free_node(node);
+ }else{
+ if (type == CGenNode::FLOW_PKT_NAT) {
+ /*repeat and NAT is not supported */
+ if ( node->is_nat_first_state() ){
+ node->set_nat_wait_state();
+ flush_one_node_to_file(node);
+ #ifdef _DEBUG
+ update_stats(node);
+ #endif
+ }else{
+ if ( node->is_nat_wait_state() ) {
+ if (node->is_responder_pkt()) {
+ m_p_queue.pop();
+ /* time out, need to free the flow and remove the association , we didn't get convertion yet*/
+ thread->terminate_nat_flows(node);
+ return;
+
+ }else{
+ flush_one_node_to_file(node);
+ #ifdef _DEBUG
+ update_stats(node);
+ #endif
+ }
+ }else{
+ assert(0);
+ }
+ }
+ m_p_queue.pop();
+ if ( node->is_last_in_flow() ) {
+ thread->free_last_flow_node( node);
+ }else{
+ node->update_next_pkt_in_flow();
+ m_p_queue.push(node);
+ }
+
+ }else{
+ if ( type == CGenNode::FLOW_SYNC ){
+ thread->check_msgs(); /* check messages */
+ m_v_if->flush_tx_queue(); /* flush pkt each timeout */
+ m_p_queue.pop();
+ if ( always == false) {
+ node->m_time += SYNC_TIME_OUT;
+ m_p_queue.push(node);
+ }else{
+ thread->free_node(node);
+ }
+
+ }else{
+ printf(" ERROR type is not valid %d \n",type);
+ assert(0);
+ }
+ }
+ }
+}
+
+
+
+
+void CFlowGenListPerThread::Dump(FILE *fd){
+ fprintf(fd,"yaml info ");
+ m_yaml_info.Dump(fd);
+
+ fprintf(fd,"\n");
+ fprintf(fd,"cap file info");
+ int i;
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
+ lp->Dump(stdout);
+ }
+}
+
+
+void CFlowGenListPerThread::DumpStats(FILE *fd){
+ m_stats.dump(fd);
+}
+
+
+void CFlowGenListPerThread::DumpCsv(FILE *fd){
+ CFlowStats::DumpHeader(fd);
+
+ CFlowStats stats;
+ CFlowStats sum;
+ int i;
+
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
+ lp->getFlowStats(&stats);
+ stats.Dump(fd);
+ sum.Add(stats);
+ }
+ fprintf(fd,"\n");
+ sum.m_name= "sum";
+ sum.Dump(fd);
+}
+
+
+uint32_t CFlowGenListPerThread::getDualPortId(){
+ return ( ::getDualPortId(m_thread_id) );
+}
+
+double CFlowGenListPerThread::get_longest_flow(){
+ int i;
+ double longest_flow = 0.0;
+ for (i=0;i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
+ double tmp_len;
+ tmp_len = lp->m_flow_info->get_cap_file_length_sec();
+ if (longest_flow < tmp_len ) {
+ longest_flow = tmp_len;
+ }
+ }
+ return longest_flow;
+}
+
+double CFlowGenListPerThread::get_total_kcps(){
+ int i;
+ double total=0.0;
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
+ total +=lp->m_info->m_k_cps;
+ }
+ return (total);
+}
+
+double CFlowGenListPerThread::get_delta_flow_is_sec(){
+ return (1.0/(1000.0*get_total_kcps()));
+}
+
+
+void CFlowGenListPerThread::inc_current_template(void){
+ m_cur_template++;
+ if (m_cur_template == m_cap_gen.size()) {
+ m_cur_template=0;
+ }
+}
+
+
+int CFlowGenListPerThread::generate_flows_roundrobin(bool *done){
+ // round robin
+
+ CFlowGeneratorRecPerThread * cur;
+ bool found=false;
+ // try current
+ int i;
+ *done = true;
+ for (i=0;i<(int)m_cap_gen.size();i++ ) {
+ cur=m_cap_gen[m_cur_template];
+ if (!(cur->m_info->m_limit_was_set) ||
+ (cur->m_info->m_flowcnt < cur->m_info->m_limit)) {
+ *done = false;
+ if ( cur->m_policer.update(1.0,m_cur_time_sec) ){
+ cur->m_info->m_flowcnt++;
+ found=true;
+ break;
+ }
+ }
+ inc_current_template();
+ }
+
+ if (found) {
+ /* generate the flow into the generator*/
+ CGenNode * node= create_node() ;
+
+ cur->generate_flow(&m_node_gen,m_cur_time_sec,m_cur_flow_id,node);
+ m_cur_flow_id++;
+
+ /* this is estimation */
+ m_stats.m_total_open_flows += cur->m_flow_info->get_total_flows();
+ m_stats.m_total_bytes += cur->m_flow_info->get_total_bytes();
+ m_stats.m_total_pkt += cur->m_flow_info->Size();
+ inc_current_template();
+ }
+ return (0);
+}
+
+
+int CFlowGenListPerThread::reschedule_flow(CGenNode *node){
+
+ // Re-schedule the node
+ node->reset_pkt_in_flow();
+ node->m_time += node->m_template_info->m_restart_time;
+ m_node_gen.add_node(node);
+
+ m_stats.m_total_bytes += node->m_flow_info->get_total_bytes();
+ m_stats.m_total_pkt += node->m_flow_info->Size();
+
+ return (0);
+}
+
+void CFlowGenListPerThread::terminate_nat_flows(CGenNode *p){
+ m_stats.m_nat_flow_timeout++;
+ m_stats.m_nat_lookup_remove_flow_id++;
+ m_flow_id_to_node_lookup.remove_no_lookup(p->get_short_fid());
+ free_last_flow_node( p);
+}
+
+
+void CFlowGenListPerThread::handel_latecy_pkt_msg(CGenNodeLatencyPktInfo * msg){
+ /* send the packet */
+ #ifdef LATENCY_QUEUE_TRACE_
+ printf(" latency msg dir %d\n",msg->m_dir);
+ struct rte_mbuf * m;
+ m=msg->m_pkt;
+ rte_pktmbuf_dump(stdout,m, rte_pktmbuf_pkt_len(m));
+ #endif
+
+ /* update timestamp */
+ struct rte_mbuf * m;
+ m=msg->m_pkt;
+ uint8_t *p=rte_pktmbuf_mtod(m, uint8_t*);
+ latency_header * h=(latency_header *)(p+msg->m_latency_offset);
+ h->time_stamp = os_get_hr_tick_64();
+
+ m_node_gen.m_v_if->send_one_pkt((pkt_dir_t)msg->m_dir,msg->m_pkt);
+}
+
+
+void CFlowGenListPerThread::handel_nat_msg(CGenNodeNatInfo * msg){
+ int i;
+ for (i=0; i<msg->m_cnt; i++) {
+ CNatFlowInfo * nat_msg=&msg->m_data[i];
+ CGenNode * node=m_flow_id_to_node_lookup.lookup(nat_msg->m_fid);
+ if (!node) {
+ /* this should be move to a notification module */
+ #ifdef NAT_TRACE_
+ printf(" ERORR not valid flow_id %d probably flow was aged \n",nat_msg->m_fid);
+ #endif
+ m_stats.m_nat_lookup_no_flow_id++;
+ continue;
+ }
+ #ifdef NAT_TRACE_
+ printf(" %.03f RX :set node %p:%x %x:%x:%x \n",now_sec() ,node,nat_msg->m_fid,nat_msg->m_external_ip,nat_msg->m_external_ip_server,nat_msg->m_external_port);
+ #endif
+ node->set_nat_ipv4_addr(nat_msg->m_external_ip);
+ node->set_nat_ipv4_port(nat_msg->m_external_port);
+ node->set_nat_ipv4_addr_server(nat_msg->m_external_ip_server);
+
+ assert(node->is_nat_wait_state());
+ if ( CGlobalInfo::is_learn_verify_mode() ){
+ if (!node->is_external_is_eq_to_internal_ip() ){
+ m_stats.m_nat_flow_learn_error++;
+ }
+ }
+ node->set_nat_learn_state();
+ /* remove from the hash */
+ m_flow_id_to_node_lookup.remove_no_lookup(nat_msg->m_fid);
+ m_stats.m_nat_lookup_remove_flow_id++;
+
+ }
+}
+
+
+void CFlowGenListPerThread::check_msgs(void){
+ if ( likely ( m_ring_from_rx->isEmpty() ) ){
+ return;
+ }
+ #ifdef NAT_TRACE_
+ printf(" %.03f got message from RX \n",now_sec());
+ #endif
+ while ( true ) {
+ CGenNode * node;
+ if ( m_ring_from_rx->Dequeue(node)!=0 ){
+ break;
+ }
+ assert(node);
+ //printf ( " message: thread %d, node->m_flow_id : %d \n", m_thread_id,node->m_flow_id);
+ /* only one message is supported right now */
+
+ CGenNodeMsgBase * msg=(CGenNodeMsgBase *)node;
+
+ uint8_t msg_type = msg->m_msg_type;
+ switch (msg_type ) {
+ case CGenNodeMsgBase::NAT_FIRST:
+ handel_nat_msg((CGenNodeNatInfo * )msg);
+ break;
+ case CGenNodeMsgBase::LATENCY_PKT:
+ handel_latecy_pkt_msg((CGenNodeLatencyPktInfo *) msg);
+ break;
+ default:
+ printf("ERROR pkt-thread message type is not valid %d \n",msg_type);
+ assert(0);
+ }
+
+ CGlobalInfo::free_node(node);
+ }
+}
+
+
+void CFlowGenListPerThread::generate_erf(std::string erf_file_name,
+ CPreviewMode & preview){
+ /* now we are ready to generate*/
+ if ( m_cap_gen.size()==0 ){
+ fprintf(stderr," nothing to generate no template loaded \n");
+ return;
+ }
+ m_preview_mode = preview;
+ m_node_gen.open_file(erf_file_name,&m_preview_mode);
+ dsec_t d_time_flow=get_delta_flow_is_sec();
+ m_cur_time_sec = 0.01+m_thread_id*m_flow_list->get_delta_flow_is_sec();
+ if ( CGlobalInfo::is_realtime() ){
+ m_cur_time_sec += now_sec() + 0.5 ;
+ }
+ dsec_t c_stop_sec = m_cur_time_sec + m_yaml_info.m_duration_sec;
+ m_stop_time_sec =c_stop_sec;
+ m_cur_flow_id =1;
+ m_cur_template =(m_thread_id % m_cap_gen.size());
+ m_stats.clear();
+
+ fprintf(stdout," Generating erf file ... \n");
+ CGenNode * node= create_node() ;
+ /* add periodic */
+ node->m_type = CGenNode::FLOW_FIF;
+ node->m_time = m_cur_time_sec;
+ m_node_gen.add_node(node);
+
+ double old_offset=0.0;
+
+ node= create_node() ;
+ node->m_type = CGenNode::FLOW_SYNC;
+ node->m_time = m_cur_time_sec + SYNC_TIME_OUT ;
+ m_node_gen.add_node(node);
+
+ #ifdef _DEBUG
+ if ( m_preview_mode.getVMode() >2 ){
+
+ CGenNode::DumpHeader(stdout);
+ }
+ #endif
+
+ m_node_gen.flush_file(c_stop_sec,d_time_flow, false,this,old_offset);
+#ifdef VALG
+ CALLGRIND_STOP_INSTRUMENTATION;
+ printf (" %llu \n",os_get_hr_tick_64()-_start_time);
+#endif
+ if ( !CGlobalInfo::m_options.preview.getNoCleanFlowClose() ){
+ /* clean close */
+ m_node_gen.flush_file(m_cur_time_sec, d_time_flow, true,this,old_offset);
+ }
+
+ if (m_preview_mode.getVMode() > 1 ) {
+ fprintf(stdout,"\n\n");
+ fprintf(stdout,"\n\n");
+ fprintf(stdout,"file stats \n");
+ fprintf(stdout,"=================\n");
+ m_stats.dump(stdout);
+ }
+ m_node_gen.close_file(this);
+}
+
+
+bool CFlowGenList::Create(){
+ check_objects_sizes();
+ CPluginCallback::callback= new CPluginCallbackSimple();
+ return (true);
+}
+
+
+void CFlowGenList::generate_p_thread_info(uint32_t num_threads){
+ clean_p_thread_info();
+ BP_ASSERT(num_threads < 64);
+ int i;
+ for (i=0; i<(int)num_threads; i++) {
+ CFlowGenListPerThread * lp= new CFlowGenListPerThread();
+ lp->Create(i,i,this,num_threads);
+ m_threads_info.push_back(lp);
+ }
+}
+
+
+void CFlowGenList::clean_p_thread_info(void){
+ int i;
+ for (i=0; i<(int)m_threads_info.size(); i++) {
+ CFlowGenListPerThread * lp=m_threads_info[i];
+ lp->Delete();
+ delete lp;
+ }
+ m_threads_info.clear();
+}
+
+
+void CFlowGenList::Delete(){
+ clean_p_thread_info();
+ Clean();
+}
+
+int CFlowGenList::load_from_mac_file(std::string file_name) {
+ if ( !utl_is_file_exists (file_name) ){
+ printf(" ERROR no mac_file is set, file %s does not exist \n",file_name.c_str());
+ exit(-1);
+ }
+ is_mac_info_configured = true;
+
+ try {
+ std::ifstream fin((char *)file_name.c_str());
+ YAML::Parser parser(fin);
+ YAML::Node doc;
+
+ parser.GetNextDocument(doc);
+ doc[0] >> m_mac_info;
+ } catch ( const std::exception& e ) {
+ std::cout << e.what() << "\n";
+ m_mac_info.clear();
+ exit(-1);
+ }
+
+}
+
+
+int CFlowGenList::load_from_yaml(std::string file_name,
+ uint32_t num_threads){
+ is_mac_info_configured = false;
+ uint8_t idx;
+ m_yaml_info.load_from_yaml_file(file_name);
+ if (m_yaml_info.verify_correctness(num_threads) ==false){
+ exit(0);
+ }
+
+ /* move it to global info, better CPU D-cache usage */
+ CGlobalInfo::m_options.preview.set_vlan_mode_enable(m_yaml_info.m_vlan_info.m_enable);
+ CGlobalInfo::m_options.m_vlan_port[0] = m_yaml_info.m_vlan_info.m_vlan_per_port[0];
+ CGlobalInfo::m_options.m_vlan_port[1] = m_yaml_info.m_vlan_info.m_vlan_per_port[1];
+ CGlobalInfo::m_options.preview.set_mac_ip_overide_enable(m_yaml_info.m_mac_replace_by_ip);
+ CGlobalInfo::m_options.m_tcp_aging = m_yaml_info.m_tuple_gen.m_tcp_aging_sec;
+ CGlobalInfo::m_options.m_udp_aging = m_yaml_info.m_tuple_gen.m_udp_aging_sec;
+
+
+ if ( m_yaml_info.m_mac_base.size() != 6 ){
+ printf(" mac addr is not valid \n");
+ exit(0);
+ }
+
+ if (m_yaml_info.m_ipv6_set == true) {
+ // Copy the most significant 96-bits from yaml data
+ for (idx=0; idx<6; idx++){
+ CGlobalInfo::m_options.m_src_ipv6[idx] = m_yaml_info.m_src_ipv6[idx];
+ CGlobalInfo::m_options.m_dst_ipv6[idx] = m_yaml_info.m_dst_ipv6[idx];
+ }
+ }else{
+ // Set the most signifcant 96-bits to zero which represents an
+ // IPv4-compatible IPv6 address
+ for (idx=0; idx<6; idx++){
+ CGlobalInfo::m_options.m_src_ipv6[idx] = 0;
+ CGlobalInfo::m_options.m_dst_ipv6[idx] = 0;
+ }
+ }
+
+ int i=0;
+ Clean();
+ bool all_template_has_one_direction=true;
+ for (i=0; i<(int)m_yaml_info.m_vec.size(); i++) {
+ CFlowGeneratorRec * lp=new CFlowGeneratorRec();
+ if ( lp->Create(&m_yaml_info.m_vec[i],&m_yaml_info,i) == false){
+ fprintf(stdout,"\n ERROR reading YAML template files, please verify that they are valid \n\n");
+ exit(-1);
+ return (-1);
+ }
+ m_cap_gen.push_back(lp);
+
+ if (lp->m_flow_info.GetPacket(0)->m_pkt_indication.m_desc.IsBiDirectionalFlow() ) {
+ all_template_has_one_direction=false;
+ }
+ }
+
+ if ( CGlobalInfo::is_learn_mode() && all_template_has_one_direction ) {
+ fprintf(stdout,"\n Warning --learn mode has nothing to do when all templates are one directional, please remove it \n");
+ }
+ return (0);
+}
+
+
+
+void CFlowGenList::Clean(){
+ int i;
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRec * lp=m_cap_gen[i];
+ lp->Delete();
+ delete lp;
+ }
+ m_cap_gen.clear();
+}
+
+double CFlowGenList::GetCpuUtil(){
+ int i;
+ double c=0.0;
+ for (i=0; i<(int)m_threads_info.size(); i++) {
+ CFlowGenListPerThread * lp=m_threads_info[i];
+ c+=lp->m_cpu_cp_u.GetVal();
+ }
+ return (c/m_threads_info.size());
+}
+
+
+void CFlowGenList::Update(){
+
+ int i;
+ for (i=0; i<(int)m_threads_info.size(); i++) {
+ CFlowGenListPerThread * lp=m_threads_info[i];
+ lp->Update();
+ }
+}
+
+
+
+void CFlowGenList::Dump(FILE *fd){
+ fprintf(fd,"yaml info \n");
+ fprintf(fd,"--------------\n");
+ m_yaml_info.Dump(fd);
+
+ fprintf(fd,"\n");
+ fprintf(fd,"cap file info \n");
+ fprintf(fd,"----------------------\n");
+ int i;
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRec * lp=m_cap_gen[i];
+ lp->Dump(stdout);
+ }
+}
+
+
+void CFlowGenList::DumpPktSize(){
+
+ int i;
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRec * lp=m_cap_gen[i];
+ lp->m_flow_info.dump_pkt_sizes();
+ }
+}
+
+
+void CFlowGenList::DumpCsv(FILE *fd){
+ CFlowStats::DumpHeader(fd);
+
+ CFlowStats stats;
+ CFlowStats sum;
+ int i;
+
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRec * lp=m_cap_gen[i];
+ lp->getFlowStats(&stats);
+ stats.Dump(fd);
+ sum.Add(stats);
+ }
+ fprintf(fd,"\n");
+ sum.m_name= "sum";
+ sum.Dump(fd);
+ sum.m_memory.dump(fd);
+}
+
+
+uint32_t CFlowGenList::get_total_repeat_flows(){
+ uint32_t flows=0;
+ int i;
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRec * lp=m_cap_gen[i];
+ flows+=lp->m_info->m_limit ;
+ }
+ return (flows);
+}
+
+
+double CFlowGenList::get_total_tx_bps(){
+ CFlowStats stats;
+ double total=0.0;
+ int i;
+
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRec * lp=m_cap_gen[i];
+ lp->getFlowStats(&stats);
+ total+=(stats.m_mb_sec);
+ }
+ return (_1Mb_DOUBLE*total);
+}
+
+double CFlowGenList::get_total_pps(){
+
+ CFlowStats stats;
+ double total=0.0;
+ int i;
+
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRec * lp=m_cap_gen[i];
+ lp->getFlowStats(&stats);
+ total+=stats.m_pps;
+ }
+ return (total);
+}
+
+
+double CFlowGenList::get_total_kcps(){
+
+ CFlowStats stats;
+ double total=0.0;
+ int i;
+
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRec * lp=m_cap_gen[i];
+ lp->getFlowStats(&stats);
+ total+= stats.get_normal_cps();
+ }
+ return ((total/1000.0));
+}
+
+double CFlowGenList::get_delta_flow_is_sec(){
+ return (1.0/(1000.0*get_total_kcps()));
+}
+
+
+
+bool CPolicer::update(double dsize,double now_sec){
+ if ( m_last_time ==0.0 ) {
+ /* first time */
+ m_last_time = now_sec;
+ return (true);
+ }
+ if (m_cir == 0.0) {
+ return (false);
+ }
+
+ // check if there is a need to add tokens
+ if(now_sec > m_last_time) {
+ dsec_t dtime=(now_sec - m_last_time);
+ dsec_t dsize =dtime*m_cir;
+ m_level +=dsize;
+ if (m_level > m_bucket_size) {
+ m_level = m_bucket_size;
+ }
+ m_last_time = now_sec;
+ }
+
+ if (m_level > dsize) {
+ m_level -= dsize;
+ return (true);
+ }else{
+ return (false);
+ }
+}
+
+
+float CPPSMeasure::add(uint64_t pkts){
+ if ( false == m_start ){
+ m_start=true;
+ m_last_time_msec = os_get_time_msec() ;
+ m_last_pkts=pkts;
+ return (0.0);
+ }
+
+ uint32_t ctime=os_get_time_msec();
+ if ((ctime - m_last_time_msec) <os_get_time_freq() ) {
+ return (m_last_result);
+ }
+
+ uint32_t dtime_msec = ctime-m_last_time_msec;
+ uint32_t dpkts = (pkts - m_last_pkts);
+
+ m_last_time_msec = ctime;
+ m_last_pkts = pkts;
+
+ m_last_result= 0.5*calc_pps(dtime_msec,dpkts) +0.5*(m_last_result);
+ return ( m_last_result );
+}
+
+
+
+CBwMeasure::CBwMeasure() {
+ reset();
+}
+
+void CBwMeasure::reset(void) {
+ m_start=false;
+ m_last_time_msec=0;
+ m_last_bytes=0;
+ m_last_result=0.0;
+};
+
+double CBwMeasure::calc_MBsec(uint32_t dtime_msec,
+ uint64_t dbytes){
+ double rate=0.000008*( ( (double)dbytes*(double)os_get_time_freq())/((double)dtime_msec) );
+ return (rate);
+}
+
+double CBwMeasure::add(uint64_t size) {
+ if ( false == m_start ){
+ m_start=true;
+ m_last_time_msec = os_get_time_msec() ;
+ m_last_bytes=size;
+ return (0.0);
+ }
+
+ uint32_t ctime=os_get_time_msec();
+ if ((ctime - m_last_time_msec) <os_get_time_freq() ) {
+ return (m_last_result);
+ }
+
+ uint32_t dtime_msec = ctime-m_last_time_msec;
+ uint64_t dbytes = size - m_last_bytes;
+
+ m_last_time_msec = ctime;
+ m_last_bytes = size;
+
+ m_last_result= 0.5*calc_MBsec(dtime_msec,dbytes) +0.5*(m_last_result);
+ return ( m_last_result );
+}
+
+
+
+
+
+
+void CParserOption::dump(FILE *fd){
+ preview.Dump(fd);
+ fprintf(fd," cfg file : %s \n",cfg_file.c_str());
+ fprintf(fd," mac file : %s \n",mac_file.c_str());
+ fprintf(fd," out file : %s \n",out_file.c_str());
+ fprintf(fd," duration : %.0f \n",m_duration);
+ fprintf(fd," factor : %.0f \n",m_factor);
+ fprintf(fd," latency : %d pkt/sec \n",m_latency_rate);
+ fprintf(fd," zmq_port : %d \n",m_zmq_port);
+ fprintf(fd," telnet_port : %d \n",m_telnet_port);
+ fprintf(fd," expected_ports : %d \n",m_expected_portd);
+ if (preview.get_vlan_mode_enable() ) {
+ fprintf(fd," vlans : [%d,%d] \n",m_vlan_port[0],m_vlan_port[1]);
+ }
+ fprintf(fd," mac spreading: %d \n",(int)m_mac_splitter);
+
+
+ int i;
+ for (i=0; i<MAX_LATENCY_PORTS; i++) {
+ fprintf(fd," port : %d dst:",i);
+ CMacAddrCfg * lp=&m_mac_addr[i];
+ dump_mac_addr(fd,lp->u.m_mac.dest);
+ fprintf(fd," src:");
+ dump_mac_addr(fd,lp->u.m_mac.src);
+ fprintf(fd,"\n");
+ }
+}
+
+#if 0
+
+void CTupleGlobalGenerator::Dump(FILE *fd){
+ fprintf(fd," src:%x dest: %x \n",m_result_src_ip,m_result_dest_ip);
+}
+
+bool CTupleGlobalGenerator::Create(){
+ was_generated=false;
+ return (true);
+}
+
+
+void CTupleGlobalGenerator::Copy(CTupleGlobalGenerator * gen){
+ was_generated=false;
+ m_min_src_ip = gen->m_min_src_ip;
+ m_max_src_ip = gen->m_max_src_ip;
+ m_min_dest_ip = gen->m_min_dest_ip;
+ m_max_dest_ip = gen->m_max_dest_ip;
+}
+
+
+void CTupleGlobalGenerator::Delete(){
+ was_generated=false;
+}
+
+#endif
+
+static uint32_t get_rand_32(uint32_t MinimumRange ,
+ uint32_t MaximumRange );
+
+
+#if 0
+void CTupleGlobalGenerator::Generate(uint32_t thread_id,
+ uint32_t num_addr ){
+ if ( was_generated == false) {
+ /* first time */
+ was_generated = true;
+ cur_src_ip = m_min_src_ip;
+ cur_dst_ip = m_min_dest_ip;
+ }
+
+ if ( ( cur_src_ip + num_addr ) > m_max_src_ip ) {
+ cur_src_ip = m_min_src_ip;
+ }
+
+ /* copy the results */
+ m_result_src_ip = cur_src_ip;
+ m_result_dest_ip = cur_dst_ip;
+ cur_src_ip += num_addr;
+ cur_dst_ip += 1;
+ if (cur_dst_ip > m_max_dest_ip ) {
+ cur_dst_ip = m_min_dest_ip;
+ }
+}
+
+
+
+
+void CTupleTemplateGenerator::Dump(FILE *fd){
+ fprintf(fd," id: %x, %x:%x - %x \n",m_id,m_result_src_ip,m_result_dest_ip,m_result_src_port);
+}
+
+
+bool CTupleTemplateGenerator::Create(CTupleGlobalGenerator * global_gen,
+ uint16_t w,
+ uint16_t wlength,
+ uint32_t _id,
+ uint32_t thread_id){
+ m_was_generated = false;
+ m_thread_id = thread_id;
+ m_lp_global_gen = global_gen;
+ BP_ASSERT(m_lp_global_gen);
+ m_cur_src_port = 1;
+ m_cur_src_port_cnt=0;
+
+ m_w = w;
+ m_wlength = wlength;
+
+ m_id = _id;
+ m_was_init=true;
+ return(true);
+}
+
+void CTupleTemplateGenerator::Delete(){
+ m_was_generated = false;
+ m_was_init=false;
+}
+
+void CTupleTemplateGenerator::Generate_src_dest(){
+ /* TBD need to fix the 100*/
+ m_lp_global_gen->Generate(m_thread_id,m_wlength);
+ m_result_src_ip = m_lp_global_gen->m_result_src_ip;
+
+ m_dest_ip = m_lp_global_gen->m_result_dest_ip;
+ m_result_dest_ip = update_dest_ip(m_dest_ip );
+ m_cnt=0;
+}
+
+uint16_t CTupleTemplateGenerator::GenerateOneSourcePort(){
+ /* handle port */
+ m_cur_src_port++;
+ /* do not use port zero */
+ if (m_cur_src_port == 0) {
+ m_cur_src_port=1;
+ }
+ m_result_src_port=m_cur_src_port;
+ return (m_cur_src_port);
+}
+
+void CTupleTemplateGenerator::Generate(){
+ BP_ASSERT(m_was_init);
+ if ( m_was_generated == false ) {
+ /* first time */
+ Generate_src_dest();
+ m_was_generated = true;
+ }else{
+ /* ip+cnt,dest+cnt*/
+ m_cnt++;
+ if ( m_cnt >= m_wlength ) {
+ m_cnt =0;
+ m_result_src_ip -=m_wlength;
+ m_result_dest_ip = m_dest_ip;
+ m_cur_src_port_cnt++;
+ if (m_cur_src_port_cnt >= m_w ) {
+ Generate_src_dest();
+ m_cur_src_port_cnt=0;
+ }
+ }
+ m_result_src_ip += 1;
+ m_result_dest_ip = update_dest_ip(m_dest_ip +m_cnt );
+ }
+
+
+ /* handle port */
+ m_cur_src_port++;
+ /* do not use port zero */
+ if (m_cur_src_port == 0) {
+ m_cur_src_port=1;
+ }
+ m_result_src_ip =update_src_ip( m_result_src_ip );
+ m_result_src_port=m_cur_src_port;
+}
+
+#endif
+
+
+static uint32_t get_rand_32(uint32_t MinimumRange ,
+ uint32_t MaximumRange ){
+ enum {RANDS_NUM = 2 , RAND_MAX_BITS = 0xf , UNSIGNED_INT_BITS = 0x20 , TWO_BITS_MASK = 0x3};
+
+ const double TWO_POWER_32_BITS = 0x10000000 * (double)0x10;
+
+ uint32_t RandomNumber = 0;
+ for (int i = 0 ; i < RANDS_NUM;i++) {
+ RandomNumber = (RandomNumber<<RAND_MAX_BITS) + rand();
+ }
+
+ RandomNumber = (RandomNumber<<(UNSIGNED_INT_BITS - RAND_MAX_BITS * RANDS_NUM)) + (rand() | TWO_BITS_MASK);
+
+ uint32_t Range;
+ if ((Range = MaximumRange - MinimumRange) == 0xffffffff) {
+ return RandomNumber;
+ }
+ return (uint32_t)(((Range + 1) / TWO_POWER_32_BITS * RandomNumber) + MinimumRange );
+}
+
+
+
+int CNullIF::send_node(CGenNode * node){
+ #if 0
+ CFlowPktInfo * lp=node->m_pkt_info;
+ rte_mbuf_t * buf=lp->generate_new_mbuf(node);
+ //rte_pktmbuf_dump(buf, buf->pkt_len);
+ //sending it ??
+ // free it here as if driver does
+ rte_pktmbuf_free(buf);
+ #endif
+ return (0);
+}
+
+
+int CErfIF::send_node(CGenNode * node){
+ if ( m_preview_mode->getFileWrite() ){
+
+ CFlowPktInfo * lp=node->m_pkt_info;
+ rte_mbuf_t * m=lp->generate_new_mbuf(node);
+ fill_pkt(m_raw,m);
+ CPktNsecTimeStamp t_c(node->m_time);
+ m_raw->time_nsec = t_c.m_time_nsec;
+ m_raw->time_sec = t_c.m_time_sec;
+
+ pkt_dir_t dir=node->cur_interface_dir();
+ uint8_t p_id = (uint8_t)dir;
+
+ m_raw->setInterface(p_id);
+
+ /* update mac addr dest/src 12 bytes */
+ uint8_t *p=(uint8_t *)m_raw->raw;
+ memcpy(p,CGlobalInfo::m_options.get_dst_src_mac_addr(p_id),12);
+
+ /* If vlan is enabled, add vlan header */
+ if ( unlikely( CGlobalInfo::m_options.preview.get_vlan_mode_enable() ) ){
+ /* retrieve vlan ID and form vlan tag */
+ uint8_t vlan_port = (node->m_src_ip &1);
+ uint16_t vlan_protocol = EthernetHeader::Protocol::VLAN;
+ uint16_t vlan_id = CGlobalInfo::m_options.m_vlan_port[vlan_port];
+ uint32_t vlan_tag = (vlan_protocol << 16) | vlan_id;
+ vlan_tag = PKT_HTONL(vlan_tag);
+
+ /* insert vlan tag and adjust packet size */
+ memcpy(cbuff+4, p+12, m_raw->pkt_len-12);
+ memcpy(cbuff, &vlan_tag, 4);
+ memcpy(p+12, cbuff, m_raw->pkt_len-8);
+ m_raw->pkt_len += 4;
+ }
+
+ //utl_DumpBuffer(stdout,p, 12,0);
+
+ BP_ASSERT(m_writer);
+
+ bool res=m_writer->write_packet(m_raw);
+
+ //utl_DumpBuffer(stdout,m_raw->raw,m_raw->pkt_len,0);
+
+ BP_ASSERT(res);
+ rte_pktmbuf_free(m);
+ }
+ return (0);
+}
+
+
+int CErfIF::flush_tx_queue(void){
+ return (0);
+}
+
+
+
+const uint8_t sctp_pkt[]={
+
+ 0x00,0x04,0x96,0x08,0xe0,0x40,
+ 0x00,0x0e,0x2e,0x24,0x37,0x5f,
+ 0x08,0x00,
+
+ 0x45,0x02,0x00,0x30,
+ 0x00,0x00,0x40,0x00,
+ 0x40,0x84,0xbd,0x04,
+ 0x9b,0xe6,0x18,0x9b, //sIP
+ 0xcb,0xff,0xfc,0xc2, //DIP
+
+ 0x80,0x44,//SPORT
+ 0x00,0x50,//DPORT
+
+ 0x00,0x00,0x00,0x00, //checksum
+
+ 0x11,0x22,0x33,0x44, // magic
+ 0x00,0x00,0x00,0x00, //64 bit counter
+ 0x00,0x00,0x00,0x00,
+ 0x00,0x01,0xa0,0x00, //seq
+ 0x00,0x00,0x00,0x00,
+
+};
+
+// 20+8+20`
+
+void CLatencyPktInfo::Create(){
+ m_packet = new CCapPktRaw( sizeof(sctp_pkt) );
+ m_packet->pkt_cnt=0;
+ m_packet->time_sec=0;
+ m_packet->time_nsec=0;
+ memcpy(m_packet->raw,sctp_pkt,sizeof(sctp_pkt));
+ m_packet->pkt_len=sizeof(sctp_pkt);
+
+ m_pkt_indication.m_packet =m_packet;
+
+ m_pkt_indication.m_ether = (EthernetHeader *)m_packet->raw;
+ m_pkt_indication.l3.m_ipv4=(IPHeader *)(m_packet->raw+14);
+ m_pkt_indication.m_is_ipv6 = false;
+ m_pkt_indication.l4.m_udp=(UDPHeader *)m_packet->raw+14+20;
+ m_pkt_indication.m_payload=(uint8_t *)m_packet->raw+14+20+16;
+ m_pkt_indication.m_payload_len=0;
+ m_pkt_indication.m_packet_padding=4;
+
+
+ m_pkt_indication.m_ether_offset =0;
+ m_pkt_indication.m_ip_offset =14;
+ m_pkt_indication.m_udp_tcp_offset = 34;
+ m_pkt_indication.m_payload_offset = 34+8;
+
+ CPacketDescriptor * lpd=&m_pkt_indication.m_desc;
+ lpd->Clear();
+ lpd->SetInitSide(true);
+ lpd->SetSwapTuple(false);
+ lpd->SetIsValidPkt(true);
+ lpd->SetIsUdp(true);
+ lpd->SetIsLastPkt(true);
+ m_pkt_info.Create(&m_pkt_indication);
+
+ memset(&m_dummy_node,0,sizeof(m_dummy_node));
+
+ m_dummy_node.set_socket_id( CGlobalInfo::m_socket.port_to_socket(0) );
+
+ m_dummy_node.m_time =0.1;
+ m_dummy_node.m_pkt_info = &m_pkt_info;
+ m_dummy_node.m_dest_ip = 0;
+ m_dummy_node.m_src_ip = 0;
+ m_dummy_node.m_src_port = 0x11;
+ m_dummy_node.m_flow_id =0;
+ m_dummy_node.m_flags =CGenNode::NODE_FLAGS_LATENCY;
+
+}
+
+
+rte_mbuf_t * CLatencyPktInfo::generate_pkt(int port_id,uint32_t extern_ip){
+
+ bool is_client_to_serever=(port_id%2==0)?true:false;
+
+ int dual_port_index=(port_id>>1);
+ uint32_t c=m_client_ip.v4;
+ uint32_t s=m_server_ip.v4;
+ if ( extern_ip ){
+ c=extern_ip;
+ }
+
+ if (!is_client_to_serever) {
+ /*swap */
+ uint32_t t=c;
+ c=s;
+ s=t;
+ }
+ uint32_t mask=dual_port_index*m_dual_port_mask;
+ if ( extern_ip==0 ){
+ c+=mask;
+ }
+ s+=mask;
+ m_dummy_node.m_src_ip = c;
+ m_dummy_node.m_dest_ip = s;
+
+ rte_mbuf_t * m=m_pkt_info.generate_new_mbuf(&m_dummy_node);
+ return (m);
+
+
+}
+
+
+void CLatencyPktInfo::set_ip(uint32_t src,
+ uint32_t dst,
+ uint32_t dual_port_mask){
+
+ m_client_ip.v4=src;
+ m_server_ip.v4=dst;
+ m_dual_port_mask=dual_port_mask;
+
+}
+
+
+void CLatencyPktInfo::Delete(){
+ m_pkt_info.Delete();
+ delete m_packet;
+}
+
+void CCPortLatency::reset(){
+ m_rx_seq =m_tx_seq;
+ m_pad = 0;
+
+ m_tx_pkt_err=0;
+ m_tx_pkt_ok =0;
+ m_pkt_ok=0;
+ m_no_magic=0;
+ m_unsup_prot=0;
+ m_no_id=0;
+ m_seq_error=0;
+ m_length_error=0;
+ m_no_ipv4_option=0;
+ m_hist.Reset();
+}
+
+
+static uint8_t nat_is_port_can_send(uint8_t port_id){
+ uint8_t offset= ((port_id>>1)<<1);
+ uint8_t client_index = (port_id %2);
+ return (client_index ==0 ?1:0);
+}
+
+
+bool CCPortLatency::Create(CLatencyManager * parent,
+ uint8_t id,
+ uint16_t offset,
+ uint16_t pkt_size,
+ CCPortLatency * rx_port){
+ m_parent = parent;
+ m_id = id;
+ m_tx_seq =0x12345678;
+ m_offset = offset;
+ m_pkt_size = pkt_size;
+ m_rx_port = rx_port;
+ m_nat_can_send = nat_is_port_can_send(m_id);
+ m_nat_learn = m_nat_can_send;
+ m_nat_external_ip=0;
+
+ m_hist.Create();
+ reset();
+ return (true);
+}
+
+void CCPortLatency::Delete(){
+ m_hist.Delete();
+}
+
+void CCPortLatency::update_packet(rte_mbuf_t * m){
+ uint8_t *p=rte_pktmbuf_mtod(m, uint8_t*);
+ /* update mac addr dest/src 12 bytes */
+ memcpy(p,CGlobalInfo::m_options.get_dst_src_mac_addr(m_id),12);
+
+ latency_header * h=(latency_header *)(p+m_offset);
+ h->magic = LATENCY_MAGIC | m_id ;
+ h->time_stamp = os_get_hr_tick_64();
+ h->seq = m_tx_seq;
+ m_tx_seq++;
+}
+
+
+void CCPortLatency::DumpShortHeader(FILE *fd){
+
+
+ fprintf(fd," if| tx_ok , rx_ok , rx ,error, average , max , Jitter , max window \n");
+ fprintf(fd," | , , check, , latency(usec),latency (usec) ,(usec) , \n");
+ fprintf(fd," ---------------------------------------------------------------------------------------------------------------- \n");
+}
+
+
+
+std::string CCPortLatency::get_field(std::string name,float f){
+ char buff[200];
+ sprintf(buff,"\"%s-%d\":%.1f,",name.c_str(),m_id,f);
+ return (std::string(buff));
+}
+
+
+void CCPortLatency::dump_json_v2(std::string & json ){
+ char buff[200];
+ sprintf(buff,"\"port-%d\": {",m_id);
+ json+=std::string(buff);
+ m_hist.dump_json("hist",json);
+ dump_counters_json(json);
+ json+="},";
+}
+
+void CCPortLatency::dump_json(std::string & json ){
+ json += get_field("avg",m_hist.get_average_latency() );
+ json += get_field("max",m_hist.get_max_latency() );
+ json += get_field("c-max",m_hist.get_max_latency_last_update() );
+ json += get_field("error",(float)(m_unsup_prot+m_no_magic+m_no_id+m_seq_error+m_length_error) );
+ json += get_field("jitter",(float)get_jitter_usec() );
+}
+
+
+void CCPortLatency::DumpShort(FILE *fd){
+
+ m_hist.update();
+ fprintf(fd,"%8lu,%8lu,%10lu,%4lu,",
+ m_tx_pkt_ok,
+ m_pkt_ok,
+ m_rx_check,
+ m_unsup_prot+m_no_magic+m_no_id+m_seq_error+m_length_error+m_no_ipv4_option+m_tx_pkt_err
+ );
+
+ fprintf(fd," %8.0f ,%8.0f,%8d ",
+ m_hist.get_average_latency(),
+ m_hist.get_max_latency(),
+ get_jitter_usec()
+ );
+ fprintf(fd," | ");
+ m_hist.DumpWinMax(fd);
+
+}
+
+#define DPL_J(f) json+=add_json(#f,f);
+#define DPL_J_LAST(f) json+=add_json(#f,f,true);
+
+void CCPortLatency::dump_counters_json(std::string & json ){
+
+ json+="\"stats\" : {";
+ DPL_J(m_tx_pkt_ok);
+ DPL_J(m_tx_pkt_err);
+ DPL_J(m_pkt_ok);
+ DPL_J(m_unsup_prot);
+ DPL_J(m_no_magic);
+ DPL_J(m_no_id);
+ DPL_J(m_seq_error);
+ DPL_J(m_length_error);
+ DPL_J(m_no_ipv4_option);
+ json+=add_json("m_jitter",get_jitter_usec());
+ /* must be last */
+ DPL_J_LAST(m_rx_check);
+ json+="}";
+
+
+}
+
+void CCPortLatency::DumpCounters(FILE *fd){
+ #define DP_A1(f) if (f) fprintf(fd," %-40s : %llu \n",#f,f)
+
+ fprintf(fd," counter \n");
+ fprintf(fd," -----------\n");
+
+ DP_A1(m_tx_pkt_err);
+ DP_A1(m_tx_pkt_ok);
+ DP_A1(m_pkt_ok);
+ DP_A1(m_unsup_prot);
+ DP_A1(m_no_magic);
+ DP_A1(m_no_id);
+ DP_A1(m_seq_error);
+ DP_A1(m_length_error);
+ DP_A1(m_rx_check);
+ DP_A1(m_no_ipv4_option);
+
+
+ fprintf(fd," -----------\n");
+ m_hist.Dump(fd);
+ fprintf(fd," %-40s : %llu \n","jitter",get_jitter_usec());
+}
+
+bool CCPortLatency::dump_packet(rte_mbuf_t * m){
+ fprintf(stdout," %f.03 dump packet ..\n",now_sec());
+ uint8_t *p=rte_pktmbuf_mtod(m, uint8_t*);
+ uint16_t pkt_size=rte_pktmbuf_pkt_len(m);
+ utl_DumpBuffer(stdout,p,pkt_size,0);
+ return (0);
+
+
+
+ if (pkt_size < ( sizeof(CRx_check_header)+14+20) ) {
+ assert(0);
+ }
+ CRx_check_header * lp=(CRx_check_header *)(p+pkt_size-sizeof(CRx_check_header));
+
+ lp->dump(stdout);
+
+
+ uint16_t vlan_offset=0;
+ if ( unlikely( CGlobalInfo::m_options.preview.get_vlan_mode_enable() ) ){
+ vlan_offset=4;
+ }
+// utl_DumpBuffer(stdout,p,pkt_size,0);
+ return (0);
+
+}
+
+bool CCPortLatency::check_rx_check(rte_mbuf_t * m){
+ m_rx_check++;
+ return (true);
+}
+
+bool CCPortLatency::do_learn(uint32_t external_ip){
+ m_nat_learn=true;
+ m_nat_can_send=true;
+ m_nat_external_ip=external_ip;
+ return (true);
+}
+
+bool CCPortLatency::check_packet(rte_mbuf_t * m,CRx_check_header * & rx_p){
+
+ CSimplePacketParser parser(m);
+ if ( !parser.Parse() ){
+ m_unsup_prot++; // Unsupported protocol
+ return (false);
+ }
+
+ uint16_t pkt_size=rte_pktmbuf_pkt_len(m);
+ /* check if CRC was extracted */
+ if ( parser.getPktSize() == pkt_size-4) {
+ // CRC was not extracted by driver (VM E1000 driver issue) extract it
+ pkt_size=pkt_size-4;
+ }
+
+ uint16_t vlan_offset=parser.m_vlan_offset;
+ uint8_t *p=rte_pktmbuf_mtod(m, uint8_t*);
+
+ rx_p=(CRx_check_header *)0;
+ bool managed_by_ip_options=false;
+ bool is_rx_check=true;
+
+ if ( !parser.IsLatencyPkt() ){
+
+ #ifdef NAT_TRACE_
+ printf(" %.3f RX : got packet !!! \n",now_sec() );
+ #endif
+
+ /* ipv6+rx-check */
+ if ( parser.m_ipv6 ) {
+ /* if we have ipv6 packet */
+ if (parser.m_protocol == RX_CHECK_V6_OPT_TYPE) {
+ if ( get_is_rx_check_mode() ){
+ m_rx_check++;
+ rx_p=(CRx_check_header *)((uint8_t*)parser.m_ipv6 +IPv6Header::DefaultSize);
+ return (true);
+ }
+
+ }
+ m_seq_error++;
+ return (false);
+ }
+
+ uint8_t opt_len = parser.m_ipv4->getOptionLen();
+ uint8_t *opt_ptr = parser.m_ipv4->getOption();
+ /* Process IP option header(s) */
+ while ( opt_len != 0 ) {
+ switch (*opt_ptr) {
+ case RX_CHECK_V4_OPT_TYPE:
+ /* rx-check option header */
+ if ( ( !get_is_rx_check_mode() ) ||
+ (opt_len < RX_CHECK_LEN) ) {
+ m_seq_error++;
+ return (false);
+ }
+ m_rx_check++;
+ rx_p=(CRx_check_header *)opt_ptr;
+ opt_len -= RX_CHECK_LEN;
+ opt_ptr += RX_CHECK_LEN;
+ break;
+ case CNatOption::noIPV4_OPTION:
+ /* NAT learn option header */
+ CNatOption *lp;
+ if ( ( !CGlobalInfo::is_learn_mode() ) ||
+ (opt_len < CNatOption::noOPTION_LEN) ) {
+ m_seq_error++;
+ return (false);
+ }
+ lp = (CNatOption *)opt_ptr;
+ if ( !lp->is_valid_ipv4_magic() ) {
+ m_no_ipv4_option++;
+ return (false);
+ }
+ m_parent->get_nat_manager()->handle_packet_ipv4(lp,parser.m_ipv4);
+ opt_len -= CNatOption::noOPTION_LEN;
+ opt_ptr += CNatOption::noOPTION_LEN;
+ break;
+ default:
+ m_seq_error++;
+ return (false);
+ } // End of switch
+ } // End of while
+
+ return (true);
+ } // End of check for non-latency packet
+
+ if ( CGlobalInfo::is_learn_mode() && (m_nat_learn ==false) ) {
+ do_learn(parser.m_ipv4->getSourceIp());
+ }
+
+ if ( (pkt_size-vlan_offset) != m_pkt_size ) {
+ m_length_error++;
+ return (false);
+ }
+
+ latency_header * h=(latency_header *)(p+m_offset+vlan_offset);
+
+ if ( (h->magic & 0xffffff00) != LATENCY_MAGIC ){
+ m_no_magic++;
+ return (false);
+ }
+
+ if ( h->seq != m_rx_seq ){
+ m_seq_error++;
+ m_rx_seq =h->seq +1;
+ return (false);
+ }else{
+ m_rx_seq++;
+ }
+ m_pkt_ok++;
+ uint64_t d = (os_get_hr_tick_64() - h->time_stamp );
+ dsec_t ctime=ptime_convert_hr_dsec(d);
+ m_hist.Add(ctime);
+ m_jitter.calc(ctime);
+ return (true);
+}
+
+void CLatencyManager::Delete(){
+ m_pkt_gen.Delete();
+
+ if ( get_is_rx_check_mode() ) {
+ m_rx_check_manager.Delete();
+ }
+ if ( CGlobalInfo::is_learn_mode() ){
+ m_nat_check_manager.Delete();
+ }
+ m_cpu_cp_u.Delete();
+}
+
+/* 0->1
+ 1->0
+ 2->3
+ 3->2
+*/
+static uint8_t swap_port(uint8_t port_id){
+ uint8_t offset= ((port_id>>1)<<1);
+ uint8_t client_index = (port_id %2);
+ return (offset+client_index^1);
+}
+
+
+
+bool CLatencyManager::Create(CLatencyManagerCfg * cfg){
+ m_max_ports=cfg->m_max_ports;
+ assert (m_max_ports<=MAX_LATENCY_PORTS);
+ assert ((m_max_ports%2)==0);
+ m_port_mask =0xffffffff;
+ m_do_stop =false;
+ m_is_active =false;
+ m_pkt_gen.Create();
+ int i;
+ for (i=0; i<m_max_ports; i++) {
+ CLatencyManagerPerPort * lp=&m_ports[i];
+ CCPortLatency * lpo=&m_ports[swap_port(i)].m_port;
+
+ lp->m_io=cfg->m_ports[i];
+ lp->m_port.Create(this,
+ i,
+ m_pkt_gen.get_payload_offset(),
+ m_pkt_gen.get_pkt_size(),lpo );
+ }
+ m_cps= cfg->m_cps;
+ m_d_time =ptime_convert_dsec_hr((1.0/m_cps));
+ m_delta_sec =(1.0/m_cps);
+
+
+ if ( get_is_rx_check_mode() ) {
+ assert(m_rx_check_manager.Create());
+ m_rx_check_manager.m_cur_time= now_sec();
+ }
+
+
+ m_pkt_gen.set_ip(cfg->m_client_ip.v4,cfg->m_server_ip.v4,cfg->m_dual_port_mask);
+ m_cpu_cp_u.Create(&m_cpu_dp_u);
+ if ( CGlobalInfo::is_learn_mode() ){
+ m_nat_check_manager.Create();
+ }
+ return (true);
+}
+
+
+void CLatencyManager::send_pkt_all_ports(){
+ m_start_time = os_get_hr_tick_64();
+ int i;
+ for (i=0; i<m_max_ports; i++) {
+ if ( m_port_mask & (1<<i) ){
+ CLatencyManagerPerPort * lp=&m_ports[i];
+ if (lp->m_port.can_send_packet() ){
+ rte_mbuf_t * m=m_pkt_gen.generate_pkt(i,lp->m_port.external_nat_ip());
+ lp->m_port.update_packet(m);
+ if ( lp->m_io->tx(m) == 0 ){
+ lp->m_port.m_tx_pkt_ok++;
+ }else{
+ lp->m_port.m_tx_pkt_err++;
+ }
+
+ }
+ }
+ }
+}
+
+
+void CLatencyManager::wait_for_rx_dump(){
+ rte_mbuf_t * rx_pkts[64];
+ int i;
+ while ( true ) {
+ rte_pause();
+ rte_pause();
+ rte_pause();
+ for (i=0; i<m_max_ports; i++) {
+ CLatencyManagerPerPort * lp=&m_ports[i];
+ rte_mbuf_t * m;
+ uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64);
+ if (cnt_p) {
+ int j;
+ for (j=0; j<cnt_p; j++) {
+ m=rx_pkts[j] ;
+ lp->m_port.dump_packet( m);
+ rte_pktmbuf_free(m);
+ }
+ } /*cnt_p*/
+ }/* for*/
+ }
+}
+
+
+void CLatencyManager::handle_rx_pkt(CLatencyManagerPerPort * lp,
+ rte_mbuf_t * m){
+ CRx_check_header *rxc;
+ lp->m_port.check_packet(m,rxc);
+ if ( unlikely(rxc!=NULL) ){
+ m_rx_check_manager.handle_packet(rxc);
+ }
+ rte_pktmbuf_free(m);
+}
+
+void CLatencyManager::handle_latecy_pkt_msg(uint8_t thread_id,
+ CGenNodeLatencyPktInfo * msg){
+
+ assert(msg->m_latency_offset==0xdead);
+
+ uint8_t rx_port_index=(thread_id<<1)+(msg->m_dir&1);
+ assert( rx_port_index <m_max_ports ) ;
+ CLatencyManagerPerPort * lp=&m_ports[rx_port_index];
+ handle_rx_pkt(lp,(rte_mbuf_t *)msg->m_pkt);
+}
+
+
+void CLatencyManager::run_rx_queue_msgs(uint8_t thread_id,
+ CNodeRing * r){
+
+ while ( true ) {
+ CGenNode * node;
+ if ( r->Dequeue(node)!=0 ){
+ break;
+ }
+ assert(node);
+
+ CGenNodeMsgBase * msg=(CGenNodeMsgBase *)node;
+
+ CGenNodeLatencyPktInfo * msg1=(CGenNodeLatencyPktInfo *)msg;
+
+ uint8_t msg_type = msg->m_msg_type;
+ switch (msg_type ) {
+ case CGenNodeMsgBase::LATENCY_PKT:
+ handle_latecy_pkt_msg(thread_id,(CGenNodeLatencyPktInfo *) msg);
+ break;
+ default:
+ printf("ERROR latency-thread message type is not valid %d \n",msg_type);
+ assert(0);
+ }
+
+ CGlobalInfo::free_node(node);
+ }
+}
+
+void CLatencyManager::try_rx_queues(){
+
+ CMessagingManager * rx_dp = CMsgIns::Ins()->getRxDp();
+ uint8_t threads=CMsgIns::Ins()->get_num_threads();
+ int ti;
+ for (ti=0; ti<(int)threads; ti++) {
+ CNodeRing * r = rx_dp->getRingDpToCp(ti);
+ if ( !r->isEmpty() ){
+ run_rx_queue_msgs((uint8_t)ti,r);
+ }
+ }
+}
+
+
+void CLatencyManager::try_rx(){
+ rte_mbuf_t * rx_pkts[64];
+ int i;
+ for (i=0; i<m_max_ports; i++) {
+ CLatencyManagerPerPort * lp=&m_ports[i];
+ rte_mbuf_t * m;
+ m_cpu_dp_u.start_work();
+ /* try to read 64 packets clean up the queue */
+ uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64);
+ if (cnt_p) {
+ int j;
+ for (j=0; j<cnt_p; j++) {
+ m=rx_pkts[j] ;
+ handle_rx_pkt(lp,m);
+ }
+ /* commit only if there was work to do ! */
+ m_cpu_dp_u.commit();
+ }/* if work */
+ }// all ports
+}
+
+
+void CLatencyManager::reset(){
+
+ int i;
+ for (i=0; i<m_max_ports; i++) {
+ CLatencyManagerPerPort * lp=&m_ports[i];
+ lp->m_port.reset();
+ }
+
+}
+
+void CLatencyManager::start(int iter){
+ m_do_stop =false;
+ m_is_active =false;
+ int cnt=0;
+
+ double n_time;
+ CGenNode * node = new CGenNode();
+ node->m_type = CGenNode::FLOW_SYNC; /* general stuff */
+ node->m_time = now_sec()+0.007;
+ m_p_queue.push(node);
+
+ node = new CGenNode();
+ node->m_type = CGenNode::FLOW_PKT; /* latency */
+ node->m_time = now_sec(); /* 1/cps rate */
+ m_p_queue.push(node);
+ bool do_try_rx_queue =CGlobalInfo::m_options.preview.get_vm_one_queue_enable()?true:false;
+
+
+ while ( !m_p_queue.empty() ) {
+ node = m_p_queue.top();
+ n_time = node->m_time;
+
+ /* wait for event */
+ while ( true ) {
+ double dt = now_sec() - n_time ;
+ if (dt> (0.0)) {
+ break;
+ }
+ if (do_try_rx_queue){
+ try_rx_queues();
+ }
+ try_rx();
+ rte_pause();
+ }
+
+ switch (node->m_type) {
+ case CGenNode::FLOW_SYNC:
+ if ( CGlobalInfo::is_learn_mode() ) {
+ m_nat_check_manager.handle_aging();
+ }
+
+ m_p_queue.pop();
+ node->m_time += SYNC_TIME_OUT;
+ m_p_queue.push(node);
+
+ break;
+ case CGenNode::FLOW_PKT:
+ m_cpu_dp_u.start_work();
+ send_pkt_all_ports();
+ m_p_queue.pop();
+ node->m_time += m_delta_sec;
+ m_p_queue.push(node);
+ m_cpu_dp_u.commit();
+ break;
+ }
+
+ /* this will be called every sync which is 1msec */
+ if ( m_do_stop ) {
+ break;
+ }
+ if ( iter>0 ){
+ if ( ( cnt>iter) ){
+ printf("stop due iter %d %d \n",iter);
+ break;
+ }
+ }
+ cnt++;
+ }
+
+ /* free all nodes in the queue */
+ while (!m_p_queue.empty()) {
+ node = m_p_queue.top();
+ m_p_queue.pop();
+ delete node;
+ }
+
+ printf(" latency daemon has stopped\n");
+ if ( get_is_rx_check_mode() ) {
+ m_rx_check_manager.tw_drain();
+ }
+
+}
+
+void CLatencyManager::stop(){
+ m_do_stop =true;
+}
+
+bool CLatencyManager::is_active(){
+ return (m_is_active);
+}
+
+
+double CLatencyManager::get_max_latency(){
+ double l=0.0;
+ int i;
+ for (i=0; i<m_max_ports; i++) {
+ CLatencyManagerPerPort * lp=&m_ports[i];
+ if ( l <lp->m_port.m_hist.get_max_latency() ){
+ l=lp->m_port.m_hist.get_max_latency();
+ }
+ }
+ return (l);
+}
+
+double CLatencyManager::get_avr_latency(){
+ double l=0.0;
+ int i;
+ for (i=0; i<m_max_ports; i++) {
+ CLatencyManagerPerPort * lp=&m_ports[i];
+ if ( l <lp->m_port.m_hist.get_average_latency() ){
+ l=lp->m_port.m_hist.get_average_latency();
+ }
+ }
+ return (l);
+}
+
+uint64_t CLatencyManager::get_total_pkt(){
+ int i;
+ uint64_t t=0;
+ for (i=0; i<m_max_ports; i++) {
+ CLatencyManagerPerPort * lp=&m_ports[i];
+ t+=lp->m_port.m_tx_pkt_ok ;
+ }
+ return t;
+}
+
+uint64_t CLatencyManager::get_total_bytes(){
+ int i;
+ uint64_t t=0;
+ for (i=0; i<m_max_ports; i++) {
+ CLatencyManagerPerPort * lp=&m_ports[i];
+ t+=lp->m_port.m_tx_pkt_ok* (m_pkt_gen.get_pkt_size()+4);
+ }
+ return t;
+
+}
+
+
+bool CLatencyManager::is_any_error(){
+ int i;
+ for (i=0; i<m_max_ports; i++) {
+ CLatencyManagerPerPort * lp=&m_ports[i];
+ if ( lp->m_port.is_any_err() ){
+ return (true);
+ }
+ }
+ return (false);
+}
+
+
+void CLatencyManager::dump_json(std::string & json ){
+ json="{\"name\":\"trex-latecny\",\"type\":0,\"data\":{";
+ int i;
+ for (i=0; i<m_max_ports; i++) {
+ CLatencyManagerPerPort * lp=&m_ports[i];
+ lp->m_port.dump_json(json);
+ }
+
+ json+="\"unknown\":0}}" ;
+
+}
+
+void CLatencyManager::dump_json_v2(std::string & json ){
+ json="{\"name\":\"trex-latecny-v2\",\"type\":0,\"data\":{";
+ json+=add_json("cpu_util",m_cpu_cp_u.GetVal());
+
+ int i;
+ for (i=0; i<m_max_ports; i++) {
+ CLatencyManagerPerPort * lp=&m_ports[i];
+ lp->m_port.dump_json_v2(json);
+ }
+
+ json+="\"unknown\":0}}" ;
+
+}
+
+void CLatencyManager::DumpRxCheck(FILE *fd){
+ if ( get_is_rx_check_mode() ) {
+ fprintf(fd," rx checker : \n");
+ m_rx_check_manager.DumpShort(fd);
+ m_rx_check_manager.Dump(fd);
+ }
+}
+
+void CLatencyManager::DumpShortRxCheck(FILE *fd){
+ if ( get_is_rx_check_mode() ) {
+ m_rx_check_manager.DumpShort(fd);
+ }
+}
+
+void CLatencyManager::rx_check_dump_json(std::string & json){
+ if ( get_is_rx_check_mode() ) {
+ m_rx_check_manager.dump_json(json );
+ }
+}
+
+void CLatencyManager::update(){
+ m_cpu_cp_u.Update() ;
+}
+
+void CLatencyManager::DumpShort(FILE *fd){
+ int i;
+ fprintf(fd," Cpu Utilization : %2.1f %% \n",m_cpu_cp_u.GetVal());
+ CCPortLatency::DumpShortHeader(fd);
+ for (i=0; i<m_max_ports; i++) {
+ fprintf(fd," %d | ",i);
+ CLatencyManagerPerPort * lp=&m_ports[i];
+ lp->m_port.DumpShort(fd);
+ fprintf(fd,"\n");
+ }
+
+
+}
+
+void CLatencyManager::Dump(FILE *fd){
+ int i;
+ fprintf(fd," cpu : %2.1f %% \n",m_cpu_cp_u.GetVal());
+ for (i=0; i<m_max_ports; i++) {
+ fprintf(fd," port %d \n",i);
+ fprintf(fd," -----------------\n");
+ CLatencyManagerPerPort * lp=&m_ports[i];
+ lp->m_port.DumpCounters(fd);
+ }
+}
+
+void CLatencyManager::DumpRxCheckVerification(FILE *fd,
+ uint64_t total_tx_rx_check){
+ if ( !get_is_rx_check_mode() ) {
+ fprintf(fd," rx_checker is disabled \n");
+ return;
+ }
+ fprintf(fd," rx_check Tx : %u \n",total_tx_rx_check);
+ fprintf(fd," rx_check Rx : %u \n",m_rx_check_manager.getTotalRx() );
+ fprintf(fd," rx_check verification :" );
+ if (m_rx_check_manager.getTotalRx() == total_tx_rx_check) {
+ fprintf(fd," OK \n" );
+ }else{
+ fprintf(fd," FAIL \n" );
+ }
+}
+
+
+
+void CTcpSeq::update(uint8_t *p, CFlowPktInfo *pkt_info, int16_t s_size){
+ TCPHeader *tcp= (TCPHeader *)(p+pkt_info->m_pkt_indication.getFastTcpOffset());
+ uint32_t seqnum, acknum;
+
+ // This routine will adjust the TCP segment size for packets
+ // based on the modifications made by the plugins.
+ // Basically it will keep track of the size changes
+ // and adjust the TCP sequence numbers accordingly.
+
+ bool is_init=pkt_info->m_pkt_indication.m_desc.IsInitSide();
+
+ // Update TCP seq number
+ seqnum = tcp->getSeqNumber();
+ acknum = tcp->getAckNumber();
+ if (is_init) {
+ // Packet is from client
+ seqnum += client_seq_delta;
+ acknum += server_seq_delta;
+ } else {
+ // Packet is from server
+ seqnum += server_seq_delta;
+ acknum += client_seq_delta;
+ }
+ tcp->setSeqNumber(seqnum);
+ tcp->setAckNumber(acknum);
+
+ // Adjust delta being tracked
+ if (is_init) {
+ client_seq_delta += s_size;
+ } else {
+ server_seq_delta += s_size;
+ }
+}
+
+
+void on_node_first(uint8_t plugin_id,CGenNode * node,
+ CFlowYamlInfo * template_info,
+ CTupleTemplateGeneratorSmart * tuple_gen,
+ CFlowGenListPerThread * flow_gen){
+
+ if (CPluginCallback::callback) {
+ CPluginCallback::callback->on_node_first(plugin_id,node,template_info, tuple_gen,flow_gen);
+ }
+}
+
+void on_node_last(uint8_t plugin_id,CGenNode * node){
+ if (CPluginCallback::callback) {
+ CPluginCallback::callback->on_node_last(plugin_id,node);
+ }
+
+}
+
+rte_mbuf_t * on_node_generate_mbuf(uint8_t plugin_id,CGenNode * node,CFlowPktInfo * pkt_info){
+ if (CPluginCallback::callback) {
+ CPluginCallback::callback->on_node_generate_mbuf(plugin_id,node,pkt_info);
+ }
+
+}
+
+
+class CPlugin_rtsp : public CTcpSeq {
+public:
+ void * m_gen;
+ uint16_t rtp_client_0;
+ uint16_t rtp_client_1;
+};
+
+
+void CPluginCallbackSimple::on_node_first(uint8_t plugin_id,
+ CGenNode * node,
+ CFlowYamlInfo * template_info,
+ CTupleTemplateGeneratorSmart * tuple_gen,
+ CFlowGenListPerThread * flow_gen ){
+ //printf(" on on_node_first callback %d node %x! \n",(int)plugin_id,node);
+ /* generate 2 ports from client side */
+
+ if ( (plugin_id == mpRTSP) || (plugin_id == mpSIP_VOICE) ) {
+ CPlugin_rtsp * lpP=new CPlugin_rtsp();
+ assert(lpP);
+
+ /* TBD need to be fixed using new API */
+ lpP->rtp_client_0 = tuple_gen->GenerateOneSourcePort();
+ lpP->rtp_client_1 = tuple_gen->GenerateOneSourcePort();
+ lpP->m_gen=flow_gen;
+ node->m_plugin_info = (void *)lpP;
+ }else{
+ if (plugin_id ==mpDYN_PYLOAD) {
+ /* nothing to do */
+ }else{
+ if (plugin_id ==mpAVL_HTTP_BROWSIN) {
+ CTcpSeq * lpP=new CTcpSeq();
+ assert(lpP);
+ node->m_plugin_info = (void *)lpP;
+ }else{
+ /* do not support this */
+ assert(0);
+ }
+ }
+ }
+}
+
+void CPluginCallbackSimple::on_node_last(uint8_t plugin_id,CGenNode * node){
+ //printf(" on on_node_last callback %d %x! \n",(int)plugin_id,node);
+ if ( (plugin_id == mpRTSP) || (plugin_id == mpSIP_VOICE) ) {
+ CPlugin_rtsp * lpP=(CPlugin_rtsp * )node->m_plugin_info;
+ /* free the ports */
+ CFlowGenListPerThread * flow_gen=(CFlowGenListPerThread *) lpP->m_gen;
+ bool is_tcp=node->m_pkt_info->m_pkt_indication.m_desc.IsTcp();
+ flow_gen->defer_client_port_free(is_tcp,node->m_src_ip,lpP->rtp_client_0);
+ flow_gen->defer_client_port_free(is_tcp,node->m_src_ip,lpP->rtp_client_1);
+ assert(lpP);
+ delete lpP;
+ node->m_plugin_info=0;
+ }else{
+ if (plugin_id ==mpDYN_PYLOAD) {
+ /* nothing to do */
+ }else{
+ if (plugin_id ==mpAVL_HTTP_BROWSIN) {
+ /* nothing to do */
+ CTcpSeq * lpP=(CTcpSeq * )node->m_plugin_info;
+ delete lpP;
+ node->m_plugin_info=0;
+ }else{
+ /* do not support this */
+ assert(0);
+ }
+ }
+ }
+}
+
+rte_mbuf_t * CPluginCallbackSimple::http_plugin(uint8_t plugin_id,
+ CGenNode * node,
+ CFlowPktInfo * pkt_info){
+ CPacketDescriptor * lpd=&pkt_info->m_pkt_indication.m_desc;
+ assert(lpd->getFlowId()==0); /* only one flow */
+ CMiniVMCmdBase * program[2];
+ CMiniVMReplaceIP replace_cmd;
+ CMiniVMCmdBase eop_cmd;
+ CTcpSeq * lpP=(CTcpSeq * )node->m_plugin_info;
+ assert(lpP);
+ rte_mbuf_t *mbuf;
+ int16_t s_size=0;
+
+ if ( likely (lpd->getFlowPktNum() != 3) ){
+ if (unlikely (CGlobalInfo::is_ipv6_enable()) ) {
+ // Request a larger initial segment for IPv6
+ mbuf = pkt_info->do_generate_new_mbuf_big(node);
+ }else{
+ mbuf = pkt_info->do_generate_new_mbuf(node);
+ }
+
+ }else{
+ CFlowInfo flow_info;
+ flow_info.vm_program=0;
+
+ flow_info.client_ip = node->m_src_ip;
+ flow_info.server_ip = node->m_dest_ip;
+ flow_info.client_port = node->m_src_port;
+ flow_info.server_port = 0;
+ flow_info.replace_server_port =false;
+ flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
+ flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
+
+
+ replace_cmd.m_cmd = VM_REPLACE_IP_OFFSET;
+ replace_cmd.m_flags = 0;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest IP address. There is a single address
+ // string of 8 bytes that needs to be replaced.
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ // For IPv6, accomodate use of brackets (+2 bytes)
+ replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 8;
+
+ // Mark as IPv6 and set the upper 96-bits
+ replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
+ for (uint8_t idx=0; idx<6; idx++){
+ replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
+ }
+ } else {
+ replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 8;
+ }
+
+ // Set m_start_0/m_stop_1 at start/end of IP address to be replaced.
+ // For this packet we know the IP addr string length is 8 bytes.
+ replace_cmd.m_start_0 = 10+16;
+ replace_cmd.m_stop_1 = replace_cmd.m_start_0 + 8;
+
+ replace_cmd.m_server_ip.v4 = flow_info.server_ip;
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &replace_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+
+ mbuf = pkt_info->do_generate_new_mbuf_ex_vm(node,&flow_info, &s_size);
+ }
+
+ // Fixup the TCP sequence numbers
+ uint8_t *p=rte_pktmbuf_mtod(mbuf, uint8_t*);
+
+ // Update TCP sequence numbers
+ lpP->update(p, pkt_info, s_size);
+
+ return(mbuf);
+}
+
+rte_mbuf_t * CPluginCallbackSimple::dyn_pyload_plugin(uint8_t plugin_id,
+ CGenNode * node,
+ CFlowPktInfo * pkt_info){
+
+ CMiniVMCmdBase * program[2];
+
+ CMiniVMDynPyload dyn_cmd;
+ CMiniVMCmdBase eop_cmd;
+
+ CPacketDescriptor * lpd=&pkt_info->m_pkt_indication.m_desc;
+ CFlowYamlDynamicPyloadPlugin * lpt = node->m_template_info->m_dpPkt;
+ assert(lpt);
+ CFlowInfo flow_info;
+ flow_info.vm_program=0;
+ int16_t s_size=0;
+
+ // IPv6 packets are not supported
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ fprintf (stderr," IPv6 is not supported for dynamic pyload change\n");
+ exit(-1);
+ }
+
+ if ( lpd->getFlowId() == 0 ) {
+
+ flow_info.client_ip = node->m_src_ip;
+ flow_info.server_ip = node->m_dest_ip;
+ flow_info.client_port = node->m_src_port;
+ flow_info.server_port = 0;
+ flow_info.replace_server_port =false;
+ flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
+ flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
+
+ uint32_t pkt_num = lpd->getFlowPktNum();
+ if (pkt_num < 253) {
+ int i;
+ /* fast filter */
+ for (i=0; i<lpt->m_num; i++) {
+ if (lpt->m_pkt_ids[i] == pkt_num ) {
+ //add a program here
+ dyn_cmd.m_cmd = VM_DYN_PYLOAD;
+ dyn_cmd.m_ptr= &lpt->m_program[i];
+ dyn_cmd.m_flags = 0;
+ dyn_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 8;
+ dyn_cmd.m_ip.v4=node->m_src_ip;
+
+ eop_cmd.m_cmd = VM_EOP;
+ program[0] = &dyn_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ }
+ }
+ // only for the first flow
+ }else{
+ fprintf (stderr," only one flow is allowed for dynamic pyload change \n");
+ exit(-1);
+ }/* only for the first flow */
+
+ if ( unlikely( flow_info.vm_program != 0 ) ) {
+
+ return ( pkt_info->do_generate_new_mbuf_ex_vm(node,&flow_info, &s_size) );
+ }else{
+ return ( pkt_info->do_generate_new_mbuf_ex(node,&flow_info) );
+ }
+}
+
+rte_mbuf_t * CPluginCallbackSimple::sip_voice_plugin(uint8_t plugin_id,CGenNode * node,CFlowPktInfo * pkt_info){
+ CMiniVMCmdBase * program[2];
+
+ CMiniVMReplaceIP_PORT_IP_IP_Port via_replace_cmd;
+ CMiniVMCmdBase eop_cmd;
+
+ CPacketDescriptor * lpd=&pkt_info->m_pkt_indication.m_desc;
+ CPlugin_rtsp * lpP=(CPlugin_rtsp * )node->m_plugin_info;
+ assert(lpP);
+ // printf(" %d %d \n",lpd->getFlowId(),lpd->getFlowPktNum());
+ CFlowInfo flow_info;
+ flow_info.vm_program=0;
+ int16_t s_size=0;
+
+ switch ( lpd->getFlowId() ) {
+ /* flow - SIP , packet #0,#1 control */
+ case 0:
+ flow_info.client_ip = node->m_src_ip;
+ flow_info.server_ip = node->m_dest_ip;
+ flow_info.client_port = node->m_src_port;
+ flow_info.server_port = 0;
+ flow_info.replace_server_port =false;
+ flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
+ flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
+
+
+ /* program to replace ip server */
+ switch ( lpd->getFlowPktNum() ) {
+ case 0:
+ {
+ via_replace_cmd.m_cmd = VM_REPLACE_IPVIA_IP_IP_PORT;
+ via_replace_cmd.m_flags = 0;
+ via_replace_cmd.m_start_0 = 0;
+ via_replace_cmd.m_stop_1 = 0;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest IP address. There are 3 address
+ // strings (each 9 bytes) that needs to be replaced.
+ // We also need to accomodate IPv6 use of brackets
+ // (+2 bytes) in a URI.
+ // There are also 2 port strings that needs to be
+ // replaced (1 is 4 bytes the other is 5 bytes).
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ via_replace_cmd.m_add_pkt_len = (((INET6_ADDRSTRLEN + 2) - 9) * 3) +
+ ((INET_PORTSTRLEN * 2) - 9);
+
+ // Mark as IPv6 and set the upper 96-bits
+ via_replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
+ for (uint8_t idx=0; idx<6; idx++){
+ via_replace_cmd.m_ip.v6[idx] = CGlobalInfo::m_options.m_src_ipv6[idx];
+ via_replace_cmd.m_ip_via.v6[idx] = CGlobalInfo::m_options.m_src_ipv6[idx];
+ }
+ } else {
+ via_replace_cmd.m_add_pkt_len = ((INET_ADDRSTRLEN - 9) * 3) +
+ ((INET_PORTSTRLEN * 2) - 9);
+ }
+ via_replace_cmd.m_ip.v4 =node->m_src_ip;
+ via_replace_cmd.m_ip0_start = 377;
+ via_replace_cmd.m_ip0_stop = 377+9;
+
+ via_replace_cmd.m_ip1_start = 409;
+ via_replace_cmd.m_ip1_stop = 409+9;
+
+
+ via_replace_cmd.m_port =lpP->rtp_client_0;
+ via_replace_cmd.m_port_start = 435;
+ via_replace_cmd.m_port_stop = 435+5;
+
+ via_replace_cmd.m_ip_via.v4 = node->m_src_ip;
+ via_replace_cmd.m_port_via = node->m_src_port;
+
+ via_replace_cmd.m_ip_via_start = 208;
+ via_replace_cmd.m_ip_via_stop = 208+9+5;
+
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &via_replace_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+ case 1:
+ {
+ via_replace_cmd.m_cmd = VM_REPLACE_IPVIA_IP_IP_PORT;
+ via_replace_cmd.m_flags = 0;
+ via_replace_cmd.m_start_0 = 0;
+ via_replace_cmd.m_stop_1 = 0;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest IP address. There are 3 address
+ // strings (each 9 bytes) that needs to be replaced.
+ // We also need to accomodate IPv6 use of brackets
+ // (+2 bytes) in a URI.
+ // There are also 2 port strings that needs to be
+ // replaced (1 is 4 bytes the other is 5 bytes).
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ via_replace_cmd.m_add_pkt_len = (((INET6_ADDRSTRLEN + 2) - 9) * 3) +
+ ((INET_PORTSTRLEN * 2) - 9);
+
+ // Mark as IPv6 and set the upper 96-bits
+ via_replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
+ for (uint8_t idx=0; idx<6; idx++){
+ via_replace_cmd.m_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
+ via_replace_cmd.m_ip_via.v6[idx] = CGlobalInfo::m_options.m_src_ipv6[idx];
+ }
+ } else {
+ via_replace_cmd.m_add_pkt_len = ((INET_ADDRSTRLEN - 9) * 3) +
+ ((INET_PORTSTRLEN * 2) - 9);
+ }
+
+ via_replace_cmd.m_ip.v4 =node->m_dest_ip;
+ via_replace_cmd.m_ip0_start = 370;
+ via_replace_cmd.m_ip0_stop = 370+8;
+
+ via_replace_cmd.m_ip1_start = 401;
+ via_replace_cmd.m_ip1_stop = 401+8;
+
+
+ via_replace_cmd.m_port =lpP->rtp_client_0;
+ via_replace_cmd.m_port_start = 426;
+ via_replace_cmd.m_port_stop = 426+5;
+
+
+ via_replace_cmd.m_ip_via.v4 = node->m_src_ip;
+ via_replace_cmd.m_port_via = node->m_src_port;
+
+ via_replace_cmd.m_ip_via_start = 207;
+ via_replace_cmd.m_ip_via_stop = 207+9+5;
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &via_replace_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+
+
+ }/* end of big switch on packet */
+ break;
+
+ case 1:
+ flow_info.client_ip = node->m_src_ip ;
+ flow_info.server_ip = node->m_dest_ip;
+ flow_info.client_port = lpP->rtp_client_0;
+ /* this is tricky ..*/
+ flow_info.server_port = lpP->rtp_client_0;
+ flow_info.replace_server_port = true;
+ flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
+ flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
+
+ break;
+ default:
+ assert(0);
+ break;
+ };
+
+ //printf(" c_ip:%x s_ip:%x c_po:%x s_po:%x init:%x replace:%x \n",flow_info.client_ip,flow_info.server_ip,flow_info.client_port,flow_info.server_port,flow_info.is_init_dir,flow_info.replace_server_port);
+
+ //printf(" program %p \n",flow_info.vm_program);
+ if ( unlikely( flow_info.vm_program != 0 ) ) {
+
+ return ( pkt_info->do_generate_new_mbuf_ex_vm(node,&flow_info, &s_size) );
+ }else{
+ return ( pkt_info->do_generate_new_mbuf_ex(node,&flow_info) );
+ }
+}
+
+rte_mbuf_t * CPluginCallbackSimple::rtsp_plugin(uint8_t plugin_id,CGenNode * node,CFlowPktInfo * pkt_info){
+
+ CMiniVMCmdBase * program[2];
+
+ CMiniVMReplaceIP replace_cmd;
+ CMiniVMCmdBase eop_cmd;
+ CMiniVMReplaceIPWithPort replace_port_cmd;
+ rte_mbuf_t *mbuf;
+
+ CPacketDescriptor * lpd=&pkt_info->m_pkt_indication.m_desc;
+ CPlugin_rtsp * lpP=(CPlugin_rtsp * )node->m_plugin_info;
+
+ assert(lpP);
+ // printf(" %d %d \n",lpd->getFlowId(),lpd->getFlowPktNum());
+ CFlowInfo flow_info;
+ flow_info.vm_program=0;
+ int16_t s_size=0;
+
+ switch ( lpd->getFlowId() ) {
+ /* flow - control */
+ case 0:
+ flow_info.client_ip = node->m_src_ip;
+ flow_info.server_ip = node->m_dest_ip;
+ flow_info.client_port = node->m_src_port;
+ flow_info.server_port = 0;
+ flow_info.replace_server_port =false;
+ flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
+ flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
+
+
+ /* program to replace ip server */
+ switch ( lpd->getFlowPktNum() ) {
+ case 3:
+ {
+ replace_cmd.m_cmd = VM_REPLACE_IP_OFFSET;
+ replace_cmd.m_flags = 0;
+ replace_cmd.m_start_0 = 16;
+ replace_cmd.m_stop_1 = 16+9;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest IP address. There is a single address
+ // string of 9 bytes that needs to be replaced.
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ // For IPv6, accomodate use of brackets (+2 bytes)
+ replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
+
+ // Mark as IPv6 and set the upper 96-bits
+ replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
+ for (uint8_t idx=0; idx<6; idx++){
+ replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
+ }
+ } else {
+ replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
+ }
+ replace_cmd.m_server_ip.v4 = flow_info.server_ip;
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &replace_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+ case 4:
+ {
+ replace_cmd.m_cmd = VM_REPLACE_IP_OFFSET;
+ replace_cmd.m_flags = 0;
+ replace_cmd.m_start_0 = 46;
+ replace_cmd.m_stop_1 = 46+9;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest IP address. There is a single address
+ // string of 9 bytes that needs to be replaced.
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ // For IPv6, accomodate use of brackets (+2 bytes)
+ replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
+
+ // Mark as IPv6 and set the upper 96-bits
+ replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
+ for (uint8_t idx=0; idx<6; idx++){
+ replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
+ }
+ } else {
+ replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
+ }
+ replace_cmd.m_server_ip.v4 = flow_info.server_ip;
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &replace_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+
+ case 5:
+ {
+
+ replace_port_cmd.m_cmd = VM_REPLACE_IP_PORT_OFFSET;
+ replace_port_cmd.m_flags = 0;
+ replace_port_cmd.m_start_0 = 13;
+ replace_port_cmd.m_stop_1 = 13+9;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest IP address. There is a single address
+ // string of 9 bytes that needs to be replaced.
+ // There are also 2 port strings (8 bytes) that needs to be
+ // replaced.
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ // For IPv6, accomodate use of brackets (+2 bytes)
+ replace_port_cmd.m_add_pkt_len = ((INET6_ADDRSTRLEN + 2) - 9) +
+ ((INET_PORTSTRLEN * 2) - 8);
+
+ // Mark as IPv6 and set the upper 96-bits
+ replace_port_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
+ for (uint8_t idx=0; idx<6; idx++){
+ replace_port_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
+ }
+ } else {
+ replace_port_cmd.m_add_pkt_len = (INET_ADDRSTRLEN - 9) +
+ ((INET_PORTSTRLEN * 2) - 8);
+ }
+ replace_port_cmd.m_server_ip.v4 = flow_info.server_ip;
+ replace_port_cmd.m_start_port = 164;
+ replace_port_cmd.m_stop_port = 164+(4*2)+1;
+ replace_port_cmd.m_client_port = lpP->rtp_client_0;
+ replace_port_cmd.m_server_port =0;
+
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &replace_port_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+
+ case 6:
+ {
+
+ replace_port_cmd.m_cmd = VM_REPLACE_IP_PORT_RESPONSE_OFFSET;
+ replace_port_cmd.m_flags = 0;
+ replace_port_cmd.m_start_0 = 0;
+ replace_port_cmd.m_stop_1 = 0;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest port addresses. There are 4 port address
+ // strings (16 bytes) that needs to be replaced.
+ replace_port_cmd.m_add_pkt_len = ((INET_PORTSTRLEN * 4) - 16);
+
+ replace_port_cmd.m_server_ip.v4 = flow_info.server_ip;
+ replace_port_cmd.m_start_port = 247;
+ replace_port_cmd.m_stop_port = 247+(4*4)+2+13;
+ replace_port_cmd.m_client_port = lpP->rtp_client_0;
+ replace_port_cmd.m_server_port = lpP->rtp_client_0;
+
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &replace_port_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+
+
+ case 7:
+ {
+
+ replace_port_cmd.m_cmd = VM_REPLACE_IP_PORT_OFFSET;
+ replace_port_cmd.m_flags = 0;
+ replace_port_cmd.m_start_0 = 13;
+ replace_port_cmd.m_stop_1 = 13+9;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest IP address. There is a single address
+ // string of 9 bytes that needs to be replaced.
+ // There are also 2 port strings (8 bytes) that needs to be
+ // replaced.
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ // For IPv6, accomodate use of brackets (+2 bytes)
+ replace_port_cmd.m_add_pkt_len = ((INET6_ADDRSTRLEN + 2) - 9) +
+ ((INET_PORTSTRLEN * 2) - 8);
+
+ // Mark as IPv6 and set the upper 96-bits
+ replace_port_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
+ for (uint8_t idx=0; idx<6; idx++){
+ replace_port_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
+ }
+ } else {
+ replace_port_cmd.m_add_pkt_len = (INET_ADDRSTRLEN - 9) +
+ ((INET_PORTSTRLEN * 2) - 8);
+ }
+ replace_port_cmd.m_server_ip.v4 = flow_info.server_ip;
+ replace_port_cmd.m_start_port = 164;
+ replace_port_cmd.m_stop_port = 164+(4*2)+1;
+ replace_port_cmd.m_client_port = lpP->rtp_client_1;
+ replace_port_cmd.m_server_port =0;
+
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &replace_port_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+
+ case 8:
+
+ {
+
+ replace_port_cmd.m_cmd = VM_REPLACE_IP_PORT_RESPONSE_OFFSET;
+ replace_port_cmd.m_flags = 0;
+ replace_port_cmd.m_start_0 = 0;
+ replace_port_cmd.m_stop_1 = 0;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest port addresses. There are 4 port address
+ // strings (16 bytes) that needs to be replaced.
+ replace_port_cmd.m_add_pkt_len = ((INET_PORTSTRLEN * 4) - 16);
+
+ replace_port_cmd.m_server_ip.v4 = flow_info.server_ip;
+ replace_port_cmd.m_start_port = 247;
+ replace_port_cmd.m_stop_port = 247+(4*4)+2+13;
+ replace_port_cmd.m_client_port = lpP->rtp_client_1;
+ replace_port_cmd.m_server_port = lpP->rtp_client_1;
+
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &replace_port_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+
+ /* PLAY */
+ case 9:
+ {
+
+ replace_cmd.m_cmd = VM_REPLACE_IP_OFFSET;
+ replace_cmd.m_flags = 0;
+ replace_cmd.m_start_0 = 12;
+ replace_cmd.m_stop_1 = 12+9;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest IP address. There is a single address
+ // string of 9 bytes that needs to be replaced.
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ // For IPv6, accomodate use of brackets (+2 bytes)
+ replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
+
+ // Mark as IPv6 and set the upper 96-bits
+ replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
+ for (uint8_t idx=0; idx<6; idx++){
+ replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
+ }
+ } else {
+ replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
+ }
+ replace_cmd.m_server_ip.v4 = flow_info.server_ip;
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &replace_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+
+ /*OPTION 0*/
+ case 12:
+ {
+
+ replace_cmd.m_cmd = VM_REPLACE_IP_OFFSET;
+ replace_cmd.m_flags = 0;
+ replace_cmd.m_start_0 = 15;
+ replace_cmd.m_stop_1 = 15+9;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest IP address. There is a single address
+ // string of 9 bytes that needs to be replaced.
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ // For IPv6, accomodate use of brackets (+2 bytes)
+ replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
+
+ // Mark as IPv6 and set the upper 96-bits
+ replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
+ for (uint8_t idx=0; idx<6; idx++){
+ replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
+ }
+ } else {
+ replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
+ }
+ replace_cmd.m_server_ip.v4 = flow_info.server_ip;
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &replace_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+
+ /* option #2*/
+ case 15:
+ {
+
+ replace_cmd.m_cmd = VM_REPLACE_IP_OFFSET;
+ replace_cmd.m_flags = 0;
+ replace_cmd.m_start_0 = 15;
+ replace_cmd.m_stop_1 = 15+9;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest IP address. There is a single address
+ // string of 9 bytes that needs to be replaced.
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ // For IPv6, accomodate use of brackets (+2 bytes)
+ replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
+
+ // Mark as IPv6 and set the upper 96-bits
+ replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
+ for (uint8_t idx=0; idx<6; idx++){
+ replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
+ }
+ } else {
+ replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
+ }
+ replace_cmd.m_server_ip.v4 = flow_info.server_ip;
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &replace_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+
+ case 18:
+ {
+
+ replace_cmd.m_cmd = VM_REPLACE_IP_OFFSET;
+ replace_cmd.m_flags = 0;
+ replace_cmd.m_start_0 = 16;
+ replace_cmd.m_stop_1 = 16+9;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest IP address. There is a single address
+ // string of 9 bytes that needs to be replaced.
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ // For IPv6, accomodate use of brackets (+2 bytes)
+ replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
+
+ // Mark as IPv6 and set the upper 96-bits
+ replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
+ for (uint8_t idx=0; idx<6; idx++){
+ replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
+ }
+ } else {
+ replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
+ }
+ replace_cmd.m_server_ip.v4 = flow_info.server_ip;
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &replace_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+
+
+ }/* end of big switch on packet */
+ break;
+
+ case 1:
+ flow_info.client_ip = node->m_src_ip ;
+ flow_info.server_ip = node->m_dest_ip;
+ flow_info.client_port = lpP->rtp_client_0;
+ /* this is tricky ..*/
+ flow_info.server_port = lpP->rtp_client_0;
+ flow_info.replace_server_port = true;
+ flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
+ flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
+
+ break;
+ case 2:
+ flow_info.client_ip = node->m_src_ip ;
+ flow_info.server_ip = node->m_dest_ip;
+ flow_info.client_port = lpP->rtp_client_1;
+ /* this is tricky ..*/
+ flow_info.server_port = lpP->rtp_client_1;
+ flow_info.replace_server_port =true;
+ flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
+ flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
+
+ break;
+ default:
+ assert(0);
+ break;
+ };
+
+ //printf(" c_ip:%x s_ip:%x c_po:%x s_po:%x init:%x replace:%x \n",flow_info.client_ip,flow_info.server_ip,flow_info.client_port,flow_info.server_port,flow_info.is_init_dir,flow_info.replace_server_port);
+
+ //printf(" program %p \n",flow_info.vm_program);
+ if ( unlikely( flow_info.vm_program != 0 ) ) {
+
+ mbuf = pkt_info->do_generate_new_mbuf_ex_vm(node,&flow_info, &s_size);
+ }else{
+ if (unlikely (CGlobalInfo::is_ipv6_enable()) ) {
+ // Request a larger initial segment for IPv6
+ mbuf = pkt_info->do_generate_new_mbuf_ex_big(node,&flow_info);
+ }else{
+ mbuf = pkt_info->do_generate_new_mbuf_ex(node,&flow_info);
+ }
+ }
+
+ // Fixup the TCP sequence numbers for the TCP flow
+ if ( lpd->getFlowId() == 0 ) {
+ uint8_t *p=rte_pktmbuf_mtod(mbuf, uint8_t*);
+
+ // Update TCP sequence numbers
+ lpP->update(p, pkt_info, s_size);
+ }
+
+ return(mbuf);
+}
+
+
+/* replace the tuples */
+rte_mbuf_t * CPluginCallbackSimple::on_node_generate_mbuf(uint8_t plugin_id,CGenNode * node,CFlowPktInfo * pkt_info){
+ switch (plugin_id) {
+ case mpRTSP:
+ rtsp_plugin(plugin_id,node,pkt_info);
+ break;
+ case mpSIP_VOICE:
+ sip_voice_plugin(plugin_id,node,pkt_info);
+ break;
+ case mpDYN_PYLOAD:
+ dyn_pyload_plugin(plugin_id,node,pkt_info);
+ break;
+ case mpAVL_HTTP_BROWSIN:
+ http_plugin(plugin_id,node,pkt_info);
+ break;
+ default:
+ assert(0);
+ }
+}
+
+
+int CMiniVM::mini_vm_run(CMiniVMCmdBase * cmds[]){
+
+ m_new_pkt_size=0;
+ bool need_to_stop=false;
+ int cnt=0;
+ CMiniVMCmdBase * cmd=cmds[cnt];
+ while (! need_to_stop) {
+ switch (cmd->m_cmd) {
+ case VM_REPLACE_IP_OFFSET:
+ mini_vm_replace_ip((CMiniVMReplaceIP *)cmd);
+ break;
+ case VM_REPLACE_IP_PORT_OFFSET:
+ mini_vm_replace_port_ip((CMiniVMReplaceIPWithPort *)cmd);
+ break;
+ case VM_REPLACE_IP_PORT_RESPONSE_OFFSET:
+ mini_vm_replace_ports((CMiniVMReplaceIPWithPort *)cmd);
+ break;
+
+ case VM_REPLACE_IP_IP_PORT:
+ mini_vm_replace_ip_ip_ports((CMiniVMReplaceIP_IP_Port * )cmd);
+ break;
+
+ case VM_REPLACE_IPVIA_IP_IP_PORT:
+ mini_vm_replace_ip_via_ip_ip_ports((CMiniVMReplaceIP_PORT_IP_IP_Port *)cmd);
+ break;
+
+ case VM_DYN_PYLOAD:
+ mini_vm_dyn_payload((CMiniVMDynPyload *)cmd);
+ break;
+
+ case VM_EOP:
+ need_to_stop=true;
+ break;
+ default:
+ printf(" vm cmd %d does not exist \n",cmd->m_cmd);
+ assert(0);
+ }
+ cnt++;
+ cmd=cmds[cnt];
+ }
+ return (0);
+}
+
+inline int cp_pkt_len(char *to,char *from,uint16_t from_offset,uint16_t len){
+ memcpy(to, from+from_offset , len);
+ return (len);
+}
+
+/* not including the to_offset
+
+ 0 1
+ x
+
+*/
+inline int cp_pkt_to_from(char *to,char *from,uint16_t from_offset,uint16_t to_offset){
+ memcpy(to, from+from_offset , to_offset-from_offset) ;
+ return (to_offset-from_offset);
+}
+
+
+int CMiniVM::mini_vm_dyn_payload( CMiniVMDynPyload * cmd){
+ /* copy all the packet */
+ CFlowYamlDpPkt * dyn=(CFlowYamlDpPkt *)cmd->m_ptr;
+ uint16_t l7_offset = m_pkt_info->m_pkt_indication.getFastPayloadOffset();
+ uint16_t len = m_pkt_info->m_packet->pkt_len - l7_offset ;
+ char * original_l7_ptr=m_pkt_info->m_packet->raw+l7_offset;
+ char * p=m_pyload_mbuf_ptr;
+ /* copy payload */
+ memcpy(p,original_l7_ptr,len);
+ if ( ( dyn->m_pyld_offset+ (dyn->m_len*4)) < ( len-4) ){
+ // we can change the packet
+ int i;
+ uint32_t *l=(uint32_t *)(p+dyn->m_pyld_offset);
+ for (i=0; i<dyn->m_len; i++) {
+ if ( dyn->m_type==0 ) {
+ *l=(rand() & dyn->m_pkt_mask);
+ }else if (dyn->m_type==1){
+ *l=(PKT_NTOHL(cmd->m_ip.v4) & dyn->m_pkt_mask);
+ }
+ l++;
+ }
+
+ }
+
+ // Return packet size which hasn't changed
+ m_new_pkt_size = m_pkt_info->m_packet->pkt_len;
+
+ return (0);
+}
+
+
+int CMiniVM::mini_vm_replace_ip_via_ip_ip_ports(CMiniVMReplaceIP_PORT_IP_IP_Port * cmd){
+ uint16_t l7_offset = m_pkt_info->m_pkt_indication.getFastPayloadOffset();
+ uint16_t len = m_pkt_info->m_packet->pkt_len - l7_offset;
+ char * original_l7_ptr=m_pkt_info->m_packet->raw+l7_offset;
+ char * p=m_pyload_mbuf_ptr;
+
+ p+=cp_pkt_to_from(p,original_l7_ptr,
+ 0,
+ cmd->m_ip_via_start);
+
+ if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
+ p+=ipv6_to_str(&cmd->m_ip_via,p);
+ } else {
+ p+=ip_to_str(cmd->m_ip_via.v4,p);
+ }
+ p+=sprintf(p,":%u",cmd->m_port_via);
+
+ /* up to the IP */
+ p+=cp_pkt_to_from(p,original_l7_ptr,
+ cmd->m_ip_via_stop,
+ cmd->m_ip0_start);
+ /*IP */
+ if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
+ p[-2] = '6';
+ p+=ipv6_to_str(&cmd->m_ip,p);
+ } else {
+ p+=ip_to_str(cmd->m_ip.v4,p);
+ }
+ /* up to IP 2 */
+ p+=cp_pkt_to_from(p, original_l7_ptr ,
+ cmd->m_ip0_stop,
+ cmd->m_ip1_start);
+ /* IP2 */
+ if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
+ p[-2] = '6';
+ p+=ipv6_to_str(&cmd->m_ip,p);
+ } else {
+ p+=ip_to_str(cmd->m_ip.v4,p);
+ }
+
+ /* up to port */
+ p+=cp_pkt_to_from(p, original_l7_ptr ,
+ cmd->m_ip1_stop,
+ cmd->m_port_start);
+ /* port */
+ p+=sprintf(p,"%u",cmd->m_port);
+
+ /* up to end */
+ p+=cp_pkt_to_from(p, original_l7_ptr ,
+ cmd->m_port_stop,
+ len);
+
+ // Determine new packet size
+ m_new_pkt_size= ((p+l7_offset) - m_pyload_mbuf_ptr);
+
+ return (0);
+}
+
+
+int CMiniVM::mini_vm_replace_ip_ip_ports(CMiniVMReplaceIP_IP_Port * cmd){
+ uint16_t l7_offset = m_pkt_info->m_pkt_indication.getFastPayloadOffset();
+ uint16_t len = m_pkt_info->m_packet->pkt_len - l7_offset;
+ char * original_l7_ptr=m_pkt_info->m_packet->raw+l7_offset;
+ char * p=m_pyload_mbuf_ptr;
+
+ /* up to the IP */
+ p+=cp_pkt_to_from(p,original_l7_ptr,
+ 0,
+ cmd->m_ip0_start);
+ /*IP */
+ if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
+ p+=ipv6_to_str(&cmd->m_ip,p);
+ } else {
+ p+=ip_to_str(cmd->m_ip.v4,p);
+ }
+ /* up to IP 2 */
+ p+=cp_pkt_to_from(p, original_l7_ptr ,
+ cmd->m_ip0_stop,
+ cmd->m_ip1_start);
+ /* IP2 */
+ if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
+ p+=ipv6_to_str(&cmd->m_ip,p);
+ } else {
+ p+=ip_to_str(cmd->m_ip.v4,p);
+ }
+
+ /* up to port */
+ p+=cp_pkt_to_from(p, original_l7_ptr ,
+ cmd->m_ip1_stop,
+ cmd->m_port_start);
+ /* port */
+ p+=sprintf(p,"%u",cmd->m_port);
+
+ /* up to end */
+ p+=cp_pkt_to_from(p, original_l7_ptr ,
+ cmd->m_port_stop,
+ len);
+
+ // Determine new packet size
+ m_new_pkt_size= ((p+l7_offset) - m_pyload_mbuf_ptr);
+
+ return (0);
+}
+
+int CMiniVM::mini_vm_replace_ports(CMiniVMReplaceIPWithPort * cmd){
+ uint16_t l7_offset = m_pkt_info->m_pkt_indication.getFastPayloadOffset();
+ uint16_t len = m_pkt_info->m_packet->pkt_len - l7_offset;
+ char * original_l7_ptr=m_pkt_info->m_packet->raw+l7_offset;
+
+ memcpy(m_pyload_mbuf_ptr, original_l7_ptr,cmd->m_start_port);
+ char * p=m_pyload_mbuf_ptr+cmd->m_start_port;
+ p+=sprintf(p,"%u-%u;server_port=%u-%u",cmd->m_client_port,cmd->m_client_port+1,cmd->m_server_port,cmd->m_server_port+1);
+ memcpy(p, original_l7_ptr+cmd->m_stop_port,len-cmd->m_stop_port);
+ p+=(len-cmd->m_stop_port);
+
+ // Determine new packet size
+ m_new_pkt_size= ((p+l7_offset) - m_pyload_mbuf_ptr);
+
+ return (0);
+}
+
+
+int CMiniVM::mini_vm_replace_port_ip(CMiniVMReplaceIPWithPort * cmd){
+ uint16_t l7_offset=m_pkt_info->m_pkt_indication.getFastPayloadOffset();
+ uint16_t len = m_pkt_info->m_packet->pkt_len - l7_offset - 0;
+ char * original_l7_ptr=m_pkt_info->m_packet->raw+l7_offset;
+
+ memcpy(m_pyload_mbuf_ptr, original_l7_ptr,cmd->m_start_0);
+ char *p=m_pyload_mbuf_ptr+cmd->m_start_0;
+ if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
+ p+=ipv6_to_str(&cmd->m_server_ip,p);
+ } else {
+ p+=ip_to_str(cmd->m_server_ip.v4,p);
+ }
+ /* copy until the port start offset */
+ int len1=cmd->m_start_port-cmd->m_stop_1 ;
+ memcpy(p, original_l7_ptr+cmd->m_stop_1,len1);
+ p+=len1;
+ p+=sprintf(p,"%u-%u",cmd->m_client_port,cmd->m_client_port+1);
+ memcpy(p, original_l7_ptr+cmd->m_stop_port,len-cmd->m_stop_port);
+ p+=len-cmd->m_stop_port;
+
+ // Determine new packet size
+ m_new_pkt_size= ((p+l7_offset) - m_pyload_mbuf_ptr);
+
+ return (0);
+}
+
+int CMiniVM::mini_vm_replace_ip(CMiniVMReplaceIP * cmd){
+ uint16_t l7_offset = m_pkt_info->m_pkt_indication.getFastPayloadOffset();
+ uint16_t len = m_pkt_info->m_packet->pkt_len - l7_offset;
+ char * original_l7_ptr = m_pkt_info->m_packet->raw+l7_offset;
+
+ memcpy(m_pyload_mbuf_ptr, original_l7_ptr,cmd->m_start_0);
+ char *p=m_pyload_mbuf_ptr+cmd->m_start_0;
+
+ int n_size=0;
+ if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
+ n_size=ipv6_to_str(&cmd->m_server_ip,p);
+ } else {
+ n_size=ip_to_str(cmd->m_server_ip.v4,p);
+ }
+ p+=n_size;
+ memcpy(p, original_l7_ptr+cmd->m_stop_1,len-cmd->m_stop_1);
+
+ // Determine new packet size
+ m_new_pkt_size= ((p+l7_offset+(len-cmd->m_stop_1)) - m_pyload_mbuf_ptr);
+
+ return (0);
+}
+
+
+void CFlowYamlDpPkt::Dump(FILE *fd){
+ fprintf(fd," pkt_id : %d \n",(int)m_pkt_id);
+ fprintf(fd," offset : %d \n",(int)m_pyld_offset);
+ fprintf(fd," offset : %d \n",(int)m_type);
+ fprintf(fd," len : %d \n",(int)m_len);
+ fprintf(fd," mask : 0x%x \n",(int)m_pkt_mask);
+}
+
+
+void CFlowYamlDynamicPyloadPlugin::Add(CFlowYamlDpPkt & fd){
+ if (m_num ==MAX_PYLOAD_PKT_CHANGE) {
+ fprintf (stderr,"ERROR can set only %d rules \n",MAX_PYLOAD_PKT_CHANGE);
+ exit(-1);
+ }
+ m_pkt_ids[m_num]=fd.m_pkt_id;
+ m_program[m_num]=fd;
+ m_num+=1;
+}
+
+void CFlowYamlDynamicPyloadPlugin::Dump(FILE *fd){
+ int i;
+ fprintf(fd," pkts :");
+ for (i=0; i<m_num; i++) {
+ fprintf(fd," %d ",m_pkt_ids[i]);
+ }
+ fprintf(fd,"\n");
+ for (i=0; i<m_num; i++) {
+ fprintf(fd," program : %d \n",i);
+ fprintf(fd,"---------------- \n");
+ m_program[i].Dump(fd);
+ }
+}
+
+bool is_mac_info_conf(CFlowGenList *fl_list) {
+ if (fl_list) {
+ return fl_list->is_mac_info_configured;
+ }
+ return false;
+}
+
+mac_addr_align_t * get_mac_addr_by_ip(CFlowGenList *fl_list,
+ uint32_t ip) {
+ if (fl_list &&
+ fl_list->is_mac_info_configured &&
+ fl_list->m_mac_info.count(ip)>0) {
+ return &fl_list->m_mac_info[ip];
+ }
+ return NULL;
+}
+
+
+
+uint16_t CSimplePacketParser::getPktSize(){
+ uint16_t ip_len=0;
+ if (m_ipv4) {
+ ip_len=m_ipv4->getTotalLength();
+ }
+ if (m_ipv6) {
+ ip_len=m_ipv6->getSize()+m_ipv6->getPayloadLen();
+ }
+ return ( ip_len +m_vlan_offset+14);
+}
+
+
+
+
+uint8_t CSimplePacketParser::getTTl(){
+ if (m_ipv4) {
+ return ( m_ipv4->getTimeToLive() );
+ }
+ if (m_ipv6) {
+ return ( m_ipv6->getHopLimit() );
+ }
+ return (0);
+}
+
+bool CSimplePacketParser::Parse(){
+
+ rte_mbuf_t * m=m_m;
+ uint8_t *p=rte_pktmbuf_mtod(m, uint8_t*);
+ EthernetHeader *m_ether = (EthernetHeader *)p;
+ IPHeader * ipv4=0;
+ IPv6Header * ipv6=0;
+ uint16_t pkt_size=rte_pktmbuf_pkt_len(m);
+ m_vlan_offset=0;
+ m_option_offset=0;
+
+ uint8_t protocol = 0;
+
+ // Retrieve the protocol type from the packet
+ switch( m_ether->getNextProtocol() ) {
+ case EthernetHeader::Protocol::IP :
+ // IPv4 packet
+ ipv4=(IPHeader *)(p+14);
+ protocol = ipv4->getProtocol();
+ m_option_offset = 14 + IPV4_HDR_LEN;
+ break;
+ case EthernetHeader::Protocol::IPv6 :
+ // IPv6 packet
+ ipv6=(IPv6Header *)(p+14);
+ protocol = ipv6->getNextHdr();
+ m_option_offset = 14 +IPV6_HDR_LEN;
+ break;
+ case EthernetHeader::Protocol::VLAN :
+ m_vlan_offset = 4;
+ switch ( m_ether->getVlanProtocol() ){
+ case EthernetHeader::Protocol::IP:
+ // IPv4 packet
+ ipv4=(IPHeader *)(p+18);
+ protocol = ipv4->getProtocol();
+ m_option_offset = 18+ IPV4_HDR_LEN;
+ break;
+ case EthernetHeader::Protocol::IPv6 :
+ // IPv6 packet
+ ipv6=(IPv6Header *)(p+18);
+ protocol = ipv6->getNextHdr();
+ m_option_offset = 18 + IPV6_HDR_LEN;
+ break;
+ default:
+ break;
+ }
+ default:
+ break;
+ }
+ m_protocol =protocol;
+ m_ipv4=ipv4;
+ m_ipv6=ipv6;
+
+ if ( protocol == 0 ){
+ return (false);
+ }
+ return (true);
+}
+
+
+
+
+
+