summaryrefslogtreecommitdiffstats
path: root/src/bp_sim.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/bp_sim.cpp')
-rwxr-xr-xsrc/bp_sim.cpp6522
1 files changed, 6522 insertions, 0 deletions
diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp
new file mode 100755
index 00000000..62e8d822
--- /dev/null
+++ b/src/bp_sim.cpp
@@ -0,0 +1,6522 @@
+/*
+ Hanoh Haim
+ Cisco Systems, Inc.
+*/
+
+/*
+Copyright (c) 2015-2015 Cisco Systems, Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+#include "bp_sim.h"
+#include "stateful_rx_core.h"
+#include "utl_json.h"
+#include "utl_yaml.h"
+#include "msg_manager.h"
+#include "trex_watchdog.h"
+
+#include <common/basic_utils.h>
+
+#include <trex_stream_node.h>
+#include <trex_stateless_messaging.h>
+
+#undef VALG
+
+#ifdef VALG
+#include <valgrind/callgrind.h>
+#endif
+
+
+CPluginCallback * CPluginCallback::callback;
+
+
+uint32_t getDualPortId(uint32_t thread_id){
+ return ( thread_id % (CGlobalInfo::m_options.get_expected_dual_ports()) );
+}
+
+
+
+CRteMemPool CGlobalInfo::m_mem_pool[MAX_SOCKETS_SUPPORTED];
+
+uint32_t CGlobalInfo::m_nodes_pool_size = 10*1024;
+CParserOption CGlobalInfo::m_options;
+CGlobalMemory CGlobalInfo::m_memory_cfg;
+CPlatformSocketInfo CGlobalInfo::m_socket;
+
+
+
+
+
+void CGlobalMemory::Dump(FILE *fd){
+ fprintf(fd," Total Memory : \n");
+
+ const std::string * names =get_mbuf_names();
+
+ uint32_t c_size=64;
+ uint32_t c_total=0;
+
+ int i=0;
+ for (i=0; i<MBUF_ELM_SIZE; i++) {
+ if ( (i>MBUF_9k) && (i<MBUF_DP_FLOWS)){
+ continue;
+ }
+ if ( i<TRAFFIC_MBUF_64 ){
+ c_total= m_mbuf[i] *c_size;
+ c_size=c_size*2;
+ }
+
+ fprintf(fd," %-40s : %lu \n",names[i].c_str(),(ulong)m_mbuf[i]);
+ }
+ c_total += (m_mbuf[MBUF_DP_FLOWS] * sizeof(CGenNode));
+
+ fprintf(fd," %-40s : %lu \n","get_each_core_dp_flows",(ulong)get_each_core_dp_flows());
+ fprintf(fd," %-40s : %s \n","Total memory",double_to_human_str(c_total,"bytes",KBYE_1024).c_str() );
+}
+
+
+void CGlobalMemory::set(const CPlatformMemoryYamlInfo &info,float mul){
+ int i;
+ for (i=0; i<MBUF_ELM_SIZE; i++) {
+ m_mbuf[i]=(uint32_t)((float)info.m_mbuf[i]*mul);
+ }
+ /* no need to multiply */
+ m_mbuf[MBUF_64] += info.m_mbuf[TRAFFIC_MBUF_64];
+ m_mbuf[MBUF_128] += info.m_mbuf[TRAFFIC_MBUF_128];
+ m_mbuf[MBUF_256] += info.m_mbuf[TRAFFIC_MBUF_256];
+ m_mbuf[MBUF_512] += info.m_mbuf[TRAFFIC_MBUF_512];
+ m_mbuf[MBUF_1024] += info.m_mbuf[TRAFFIC_MBUF_1024];
+ m_mbuf[MBUF_2048] += info.m_mbuf[TRAFFIC_MBUF_2048];
+ m_mbuf[MBUF_4096] += info.m_mbuf[TRAFFIC_MBUF_4096];
+ m_mbuf[MBUF_9k] += info.m_mbuf[MBUF_9k];
+
+ for (i=0; i<MBUF_1024; i++) {
+ float per_queue_factor= (float)m_mbuf[i]/((float)m_pool_cache_size*(float)m_num_cores);
+ if (per_queue_factor<2.0) {
+ printf("WARNING not enough mbuf memory for this configuration trying to auto update\n");
+ printf(" %d : %f \n",(int)i,per_queue_factor);
+ m_mbuf[i]=(uint32_t)(m_mbuf[i]*2.0/per_queue_factor);
+ }
+ }
+}
+
+
+////////////////////////////////////////
+
+
+bool CPlatformSocketInfoNoConfig::is_sockets_enable(socket_id_t socket){
+ if ( socket==0 ) {
+ return(true);
+ }
+ return (false);
+}
+
+socket_id_t CPlatformSocketInfoNoConfig::max_num_active_sockets(){
+ return (1);
+}
+
+
+socket_id_t CPlatformSocketInfoNoConfig::port_to_socket(port_id_t port){
+ return (0);
+}
+
+
+void CPlatformSocketInfoNoConfig::set_rx_thread_is_enabled(bool enable) {
+ m_rx_is_enabled = enable;
+}
+
+void CPlatformSocketInfoNoConfig::set_number_of_dual_ports(uint8_t num_dual_ports){
+ m_dual_if = num_dual_ports;
+}
+
+
+void CPlatformSocketInfoNoConfig::set_number_of_threads_per_ports(uint8_t num_threads){
+ m_threads_per_dual_if = num_threads;
+}
+
+bool CPlatformSocketInfoNoConfig::sanity_check(){
+ return (true);
+}
+
+/* return the core mask */
+uint64_t CPlatformSocketInfoNoConfig::get_cores_mask(){
+
+ uint32_t cores_number = m_threads_per_dual_if*m_dual_if;
+ if ( m_rx_is_enabled ) {
+ cores_number += 2;
+ }else{
+ cores_number += 1; /* only MASTER*/
+ }
+ int i;
+ int offset=0;
+ /* master */
+ uint32_t res=1;
+ uint32_t mask=(1<<(offset+1));
+ for (i=0; i<(cores_number-1); i++) {
+ res |= mask ;
+ mask = mask <<1;
+ }
+ return (res);
+}
+
+virtual_thread_id_t CPlatformSocketInfoNoConfig::thread_phy_to_virt(physical_thread_id_t phy_id){
+ return (phy_id);
+}
+
+physical_thread_id_t CPlatformSocketInfoNoConfig::thread_virt_to_phy(virtual_thread_id_t virt_id){
+ return (virt_id);
+}
+
+physical_thread_id_t CPlatformSocketInfoNoConfig::get_master_phy_id() {
+ return (0);
+}
+
+bool CPlatformSocketInfoNoConfig::thread_phy_is_rx(physical_thread_id_t phy_id){
+ return (phy_id==(m_threads_per_dual_if*m_dual_if+1));
+}
+
+
+void CPlatformSocketInfoNoConfig::dump(FILE *fd){
+ fprintf(fd," there is no configuration file given \n");
+}
+
+////////////////////////////////////////
+
+bool CPlatformSocketInfoConfig::Create(CPlatformCoresYamlInfo * platform){
+ m_platform=platform;
+ assert(m_platform);
+ assert(m_platform->m_is_exists);
+ reset();
+ return (true);
+}
+
+bool CPlatformSocketInfoConfig::init(){
+
+ /* iterate the sockets */
+ uint32_t num_threads=0;
+ uint32_t num_dual_if = m_platform->m_dual_if.size();
+
+ if ( m_num_dual_if > num_dual_if ){
+ printf("ERROR number of dual if %d is higher than defined in configuration file %d\n",
+ (int)m_num_dual_if,
+ (int)num_dual_if);
+ }
+
+ int i;
+ for (i=0; i<m_num_dual_if; i++) {
+ CPlatformDualIfYamlInfo * lp=&m_platform->m_dual_if[i];
+ if ( lp->m_socket>=MAX_SOCKETS_SUPPORTED ){
+ printf("ERROR socket %d is bigger than max %d \n",lp->m_socket,MAX_SOCKETS_SUPPORTED);
+ exit(1);
+ }
+
+ if (!m_sockets_enable[lp->m_socket] ) {
+ m_sockets_enable[lp->m_socket]=true;
+ m_sockets_enabled++;
+ }
+
+ m_socket_per_dual_if[i]=lp->m_socket;
+
+ /* learn how many threads per dual-if */
+ if (i==0) {
+ num_threads = lp->m_threads.size();
+ m_max_threads_per_dual_if = num_threads;
+ }else{
+ if (lp->m_threads.size() != num_threads) {
+ printf("ERROR, the number of threads per dual ports should be the same for all dual ports\n");
+ exit(1);
+ }
+ }
+
+ int j;
+
+ for (j=0; j<m_threads_per_dual_if; j++) {
+ uint8_t virt_thread = 1+ i + j*m_num_dual_if; /* virtual thread */
+ uint8_t phy_thread = lp->m_threads[j];
+
+ if (phy_thread>MAX_THREADS_SUPPORTED) {
+ printf("ERROR, physical thread id is %d higher than max %d \n",phy_thread,MAX_THREADS_SUPPORTED);
+ exit(1);
+ }
+
+ if (virt_thread>MAX_THREADS_SUPPORTED) {
+ printf("ERROR virtual thread id is %d higher than max %d \n",virt_thread,MAX_THREADS_SUPPORTED);
+ exit(1);
+ }
+
+ if ( m_thread_phy_to_virtual[phy_thread] ){
+ printf("ERROR physical thread %d defined twice\n",phy_thread);
+ exit(1);
+ }
+ m_thread_phy_to_virtual[phy_thread]=virt_thread;
+ m_thread_virt_to_phy[virt_thread] =phy_thread;
+ }
+ }
+
+ if ( m_thread_phy_to_virtual[m_platform->m_master_thread] ){
+ printf("ERROR physical master thread %d already defined \n",m_platform->m_master_thread);
+ exit(1);
+ }
+
+ if ( m_thread_phy_to_virtual[m_platform->m_rx_thread] ){
+ printf("ERROR physical latency thread %d already defined \n",m_platform->m_rx_thread);
+ exit(1);
+ }
+
+ if (m_max_threads_per_dual_if < m_threads_per_dual_if ) {
+ printf("ERROR number of threads asked per dual if is %d lower than max %d \n",
+ (int)m_threads_per_dual_if,
+ (int)m_max_threads_per_dual_if);
+ exit(1);
+ }
+ return (true);
+}
+
+
+void CPlatformSocketInfoConfig::dump(FILE *fd){
+ fprintf(fd," core_mask %llx \n",(unsigned long long)get_cores_mask());
+ fprintf(fd," sockets :");
+ int i;
+ for (i=0; i<MAX_SOCKETS_SUPPORTED; i++) {
+ if ( is_sockets_enable(i) ){
+ fprintf(fd," %d ",i);
+ }
+ }
+ fprintf(fd," \n");
+ fprintf(fd," active sockets : %d \n",max_num_active_sockets());
+
+ fprintf(fd," ports_sockets : %d \n",max_num_active_sockets());
+
+ for (i = 0; i < TREX_MAX_PORTS; i++) {
+ fprintf(fd,"%d,",port_to_socket(i));
+ }
+ fprintf(fd,"\n");
+
+ fprintf(fd," phy | virt \n");
+ for (i=0; i<MAX_THREADS_SUPPORTED; i++) {
+ virtual_thread_id_t virt=thread_phy_to_virt(i);
+ if ( virt ){
+ fprintf(fd," %d %d \n",i,virt);
+ }
+ }
+}
+
+
+void CPlatformSocketInfoConfig::reset(){
+ m_sockets_enabled=0;
+ int i;
+ for (i=0; i<MAX_SOCKETS_SUPPORTED; i++) {
+ m_sockets_enable[i]=false;
+ }
+
+ for (i=0; i<MAX_THREADS_SUPPORTED; i++) {
+ m_thread_virt_to_phy[i]=0;
+ }
+ for (i=0; i<MAX_THREADS_SUPPORTED; i++) {
+ m_thread_phy_to_virtual[i]=0;
+ }
+ for (i = 0; i < TREX_MAX_PORTS >> 1; i++) {
+ m_socket_per_dual_if[i]=0;
+ }
+
+ m_num_dual_if=0;
+
+ m_threads_per_dual_if=0;
+ m_rx_is_enabled=false;
+ m_max_threads_per_dual_if=0;
+}
+
+
+void CPlatformSocketInfoConfig::Delete(){
+
+}
+
+bool CPlatformSocketInfoConfig::is_sockets_enable(socket_id_t socket){
+ assert(socket<MAX_SOCKETS_SUPPORTED);
+ return ( m_sockets_enable[socket] );
+}
+
+socket_id_t CPlatformSocketInfoConfig::max_num_active_sockets(){
+ return ((socket_id_t)m_sockets_enabled);
+}
+
+socket_id_t CPlatformSocketInfoConfig::port_to_socket(port_id_t port){
+ return ( m_socket_per_dual_if[(port>>1)]);
+}
+
+void CPlatformSocketInfoConfig::set_rx_thread_is_enabled(bool enable){
+ m_rx_is_enabled =enable;
+}
+
+void CPlatformSocketInfoConfig::set_number_of_dual_ports(uint8_t num_dual_ports){
+ m_num_dual_if = num_dual_ports;
+}
+
+void CPlatformSocketInfoConfig::set_number_of_threads_per_ports(uint8_t num_threads){
+ m_threads_per_dual_if =num_threads;
+}
+
+bool CPlatformSocketInfoConfig::sanity_check(){
+ return (init());
+}
+
+/* return the core mask */
+uint64_t CPlatformSocketInfoConfig::get_cores_mask(){
+ int i;
+ uint64_t mask=0;
+ for (i=0; i<MAX_THREADS_SUPPORTED; i++) {
+ if ( m_thread_phy_to_virtual[i] ) {
+
+ if (i>=64) {
+ printf(" ERROR phy threads can't be higher than 64 \n");
+ exit(1);
+ }
+ mask |=(1<<i);
+ }
+ }
+
+ mask |=(1<<m_platform->m_master_thread);
+ assert(m_platform->m_master_thread<64);
+ if (m_rx_is_enabled) {
+ mask |=(1<<m_platform->m_rx_thread);
+ assert(m_platform->m_rx_thread<64);
+ }
+ return (mask);
+}
+
+virtual_thread_id_t CPlatformSocketInfoConfig::thread_phy_to_virt(physical_thread_id_t phy_id){
+ return (m_thread_phy_to_virtual[phy_id]);
+}
+
+physical_thread_id_t CPlatformSocketInfoConfig::thread_virt_to_phy(virtual_thread_id_t virt_id){
+ return ( m_thread_virt_to_phy[virt_id]);
+}
+
+physical_thread_id_t CPlatformSocketInfoConfig::get_master_phy_id() {
+ return m_platform->m_master_thread;
+}
+
+bool CPlatformSocketInfoConfig::thread_phy_is_rx(physical_thread_id_t phy_id){
+ return (m_platform->m_rx_thread == phy_id?true:false);
+}
+
+
+
+////////////////////////////////////////
+
+
+bool CPlatformSocketInfo::Create(CPlatformCoresYamlInfo * platform){
+ if ( (platform) && (platform->m_is_exists) ) {
+ CPlatformSocketInfoConfig * lp=new CPlatformSocketInfoConfig();
+ assert(lp);
+ lp->Create(platform);
+ m_obj= lp;
+ }else{
+ m_obj= new CPlatformSocketInfoNoConfig();
+ }
+ return(true);
+}
+
+void CPlatformSocketInfo::Delete(){
+ if ( m_obj ){
+ delete m_obj;
+ m_obj=NULL;
+ }
+}
+
+bool CPlatformSocketInfo::is_sockets_enable(socket_id_t socket){
+ return ( m_obj->is_sockets_enable(socket) );
+}
+
+socket_id_t CPlatformSocketInfo::max_num_active_sockets(){
+ return ( m_obj->max_num_active_sockets() );
+}
+
+
+socket_id_t CPlatformSocketInfo::port_to_socket(port_id_t port){
+ return ( m_obj->port_to_socket(port) );
+}
+
+
+void CPlatformSocketInfo::set_rx_thread_is_enabled(bool enable){
+ m_obj->set_rx_thread_is_enabled(enable);
+}
+
+void CPlatformSocketInfo::set_number_of_dual_ports(uint8_t num_dual_ports){
+ m_obj->set_number_of_dual_ports(num_dual_ports);
+}
+
+void CPlatformSocketInfo::set_number_of_threads_per_ports(uint8_t num_threads){
+ m_obj->set_number_of_threads_per_ports(num_threads);
+}
+
+bool CPlatformSocketInfo::sanity_check(){
+ return ( m_obj->sanity_check());
+}
+
+/* return the core mask */
+uint64_t CPlatformSocketInfo::get_cores_mask(){
+ return ( m_obj->get_cores_mask());
+}
+
+virtual_thread_id_t CPlatformSocketInfo::thread_phy_to_virt(physical_thread_id_t phy_id){
+ return ( m_obj->thread_phy_to_virt(phy_id));
+}
+
+physical_thread_id_t CPlatformSocketInfo::thread_virt_to_phy(virtual_thread_id_t virt_id){
+ return ( m_obj->thread_virt_to_phy(virt_id));
+}
+
+bool CPlatformSocketInfo::thread_phy_is_master(physical_thread_id_t phy_id){
+ return ( m_obj->thread_phy_is_master(phy_id));
+}
+
+physical_thread_id_t CPlatformSocketInfo::get_master_phy_id() {
+ return ( m_obj->get_master_phy_id());
+}
+
+bool CPlatformSocketInfo::thread_phy_is_rx(physical_thread_id_t phy_id) {
+ return ( m_obj->thread_phy_is_rx(phy_id));
+}
+
+void CPlatformSocketInfo::dump(FILE *fd){
+ m_obj->dump(fd);
+}
+
+////////////////////////////////////////
+
+
+void CRteMemPool::dump_in_case_of_error(FILE *fd){
+ fprintf(fd," ERROR,there is no enough memory in socket %d \n",m_pool_id);
+ fprintf(fd," Try to enlarge the memory values in the configuration file -/etc/trex_cfg.yaml ,see manual for more detail \n");
+ dump(fd);
+}
+
+void CRteMemPool::add_to_json(Json::Value &json, std::string name, rte_mempool_t * pool){
+ uint32_t p_free = rte_mempool_count(pool);
+ uint32_t p_size = pool->size;
+ json[name].append((unsigned long long)p_free);
+ json[name].append((unsigned long long)p_size);
+}
+
+
+void CRteMemPool::dump_as_json(Json::Value &json){
+ add_to_json(json, "64b", m_small_mbuf_pool);
+ add_to_json(json, "128b", m_mbuf_pool_128);
+ add_to_json(json, "256b", m_mbuf_pool_256);
+ add_to_json(json, "512b", m_mbuf_pool_512);
+ add_to_json(json, "1024b", m_mbuf_pool_1024);
+ add_to_json(json, "2048b", m_mbuf_pool_2048);
+ add_to_json(json, "4096b", m_mbuf_pool_4096);
+ add_to_json(json, "9kb", m_mbuf_pool_9k);
+}
+
+
+void CRteMemPool::dump(FILE *fd){
+ #define DUMP_MBUF(a,b) { float p=(100.0*(float)rte_mempool_count(b)/(float)b->size); fprintf(fd," %-30s : %.2f %% %s \n",a,p,(p<5.0?"<-":"OK") ); }
+
+ DUMP_MBUF("mbuf_64",m_small_mbuf_pool);
+ DUMP_MBUF("mbuf_128",m_mbuf_pool_128);
+ DUMP_MBUF("mbuf_256",m_mbuf_pool_256);
+ DUMP_MBUF("mbuf_512",m_mbuf_pool_512);
+ DUMP_MBUF("mbuf_1024",m_mbuf_pool_1024);
+ DUMP_MBUF("mbuf_2048",m_mbuf_pool_2048);
+ DUMP_MBUF("mbuf_4096",m_mbuf_pool_4096);
+ DUMP_MBUF("mbuf_9k",m_mbuf_pool_9k);
+
+}
+
+////////////////////////////////////////
+
+void CGlobalInfo::dump_pool_as_json(Json::Value &json){
+ CPlatformSocketInfo * lpSocket =&m_socket;
+
+ for (int i=0; i<(int)MAX_SOCKETS_SUPPORTED; i++) {
+ if (lpSocket->is_sockets_enable((socket_id_t)i)) {
+ std::string socket_id = "cpu-socket-" + std::to_string(i);
+ m_mem_pool[i].dump_as_json(json["mbuf_stats"][socket_id]);
+ }
+ }
+}
+
+std::string CGlobalInfo::dump_pool_as_json_str(void){
+ Json::Value json;
+ dump_pool_as_json(json);
+ return (json.toStyledString());
+}
+
+void CGlobalInfo::free_pools(){
+ CPlatformSocketInfo * lpSocket =&m_socket;
+ CRteMemPool * lpmem;
+ int i;
+ for (i=0; i<(int)MAX_SOCKETS_SUPPORTED; i++) {
+ if (lpSocket->is_sockets_enable((socket_id_t)i)) {
+ lpmem= &m_mem_pool[i];
+ utl_rte_mempool_delete(lpmem->m_small_mbuf_pool);
+ utl_rte_mempool_delete(lpmem->m_mbuf_pool_128);
+ utl_rte_mempool_delete(lpmem->m_mbuf_pool_256);
+ utl_rte_mempool_delete(lpmem->m_mbuf_pool_512);
+ utl_rte_mempool_delete(lpmem->m_mbuf_pool_1024);
+ utl_rte_mempool_delete(lpmem->m_mbuf_pool_2048);
+ utl_rte_mempool_delete(lpmem->m_mbuf_pool_4096);
+ utl_rte_mempool_delete(lpmem->m_mbuf_pool_9k);
+ }
+ utl_rte_mempool_delete(m_mem_pool[0].m_mbuf_global_nodes);
+ }
+}
+
+
+void CGlobalInfo::init_pools(uint32_t rx_buffers){
+ /* this include the pkt from 64- */
+ CGlobalMemory * lp=&CGlobalInfo::m_memory_cfg;
+ CPlatformSocketInfo * lpSocket =&m_socket;
+
+ CRteMemPool * lpmem;
+
+ int i;
+ for (i=0; i<(int)MAX_SOCKETS_SUPPORTED; i++) {
+ if (lpSocket->is_sockets_enable((socket_id_t)i)) {
+ lpmem= &m_mem_pool[i];
+ lpmem->m_pool_id=i;
+
+
+ /* this include the packet from 0-64 this is for small packets */
+ lpmem->m_small_mbuf_pool =utl_rte_mempool_create("small-pkt-const",
+ lp->m_mbuf[MBUF_64],
+ CONST_SMALL_MBUF_SIZE,
+ 32,(i<<5)+ 2,i);
+ assert(lpmem->m_small_mbuf_pool);
+
+
+
+
+ lpmem->m_mbuf_pool_128=utl_rte_mempool_create("_128-pkt-const",
+ lp->m_mbuf[MBUF_128],
+ CONST_128_MBUF_SIZE,
+ 32,(i<<5)+ 6,i);
+
+
+ assert(lpmem->m_mbuf_pool_128);
+
+
+ lpmem->m_mbuf_pool_256=utl_rte_mempool_create("_256-pkt-const",
+ lp->m_mbuf[MBUF_256],
+ CONST_256_MBUF_SIZE,
+ 32,(i<<5)+ 3,i);
+
+ assert(lpmem->m_mbuf_pool_256);
+
+ lpmem->m_mbuf_pool_512=utl_rte_mempool_create("_512_-pkt-const",
+ lp->m_mbuf[MBUF_512],
+ CONST_512_MBUF_SIZE,
+ 32,(i<<5)+ 4,i);
+ assert(lpmem->m_mbuf_pool_512);
+
+ lpmem->m_mbuf_pool_1024=utl_rte_mempool_create("_1024-pkt-const",
+ lp->m_mbuf[MBUF_1024],
+ CONST_1024_MBUF_SIZE,
+ 32,(i<<5)+ 5,i);
+
+ assert(lpmem->m_mbuf_pool_1024);
+
+ lpmem->m_mbuf_pool_2048=utl_rte_mempool_create("_2048-pkt-const",
+ lp->m_mbuf[MBUF_2048],
+ CONST_2048_MBUF_SIZE,
+ 32,(i<<5)+ 5,i);
+
+ assert(lpmem->m_mbuf_pool_2048);
+
+ lpmem->m_mbuf_pool_4096=utl_rte_mempool_create("_4096-pkt-const",
+ lp->m_mbuf[MBUF_4096],
+ CONST_4096_MBUF_SIZE,
+ 32,(i<<5)+ 5,i);
+
+ assert(lpmem->m_mbuf_pool_4096);
+
+ lpmem->m_mbuf_pool_9k=utl_rte_mempool_create("_9k-pkt-const",
+ lp->m_mbuf[MBUF_9k]+rx_buffers,
+ CONST_9k_MBUF_SIZE,
+ 32,(i<<5)+ 5,i);
+
+ assert(lpmem->m_mbuf_pool_9k);
+
+ }
+ }
+
+ /* global always from socket 0 */
+ m_mem_pool[0].m_mbuf_global_nodes = utl_rte_mempool_create_non_pkt("global-nodes",
+ lp->m_mbuf[MBUF_GLOBAL_FLOWS],
+ sizeof(CGenNode),
+ 128,
+ 0 ,
+ SOCKET_ID_ANY);
+
+ assert(m_mem_pool[0].m_mbuf_global_nodes);
+
+
+}
+
+
+
+void CFlowYamlInfo::Dump(FILE *fd){
+ fprintf(fd,"name : %s \n",m_name.c_str());
+ fprintf(fd,"cps : %f \n",m_k_cps);
+ fprintf(fd,"ipg : %f \n",m_ipg_sec);
+ fprintf(fd,"rtt : %f \n",m_rtt_sec);
+ fprintf(fd,"w : %d \n",m_w);
+ fprintf(fd,"wlength : %d \n",m_wlength);
+ fprintf(fd,"limit : %d \n",m_limit);
+ fprintf(fd,"limit_was_set : %d \n",m_limit_was_set?1:0);
+ fprintf(fd,"cap_mode : %d \n",m_cap_mode?1:0);
+ fprintf(fd,"plugin_id : %d \n",m_plugin_id);
+ fprintf(fd,"one_server : %d \n",m_one_app_server?1:0);
+ fprintf(fd,"one_server_was_set : %d \n",m_one_app_server_was_set?1:0);
+ if (m_dpPkt) {
+ m_dpPkt->Dump(fd);
+ }
+}
+
+
+
+
+void dump_mac_addr(FILE* fd,uint8_t *p){
+ int i;
+ for (i=0; i<6; i++) {
+ uint8_t a=p[i];
+ if (i==5) {
+ fprintf(fd,"%02x",a);
+ }else{
+ fprintf(fd,"%02x:",a);
+ }
+ }
+
+}
+
+
+
+static uint8_t human_tbl[]={
+ ' ',
+ 'K',
+ 'M',
+ 'G',
+ 'T'
+};
+
+std::string double_to_human_str(double num,
+ std::string units,
+ human_kbyte_t etype){
+ double abs_num=num;
+ if (num<0.0) {
+ abs_num=-num;
+ }
+ int i=0;
+ int max_cnt=sizeof(human_tbl)/sizeof(human_tbl[0]);
+ double div =1.0;
+ double f=1000.0;
+ if (etype ==KBYE_1024){
+ f=1024.0;
+ }
+ while ((abs_num > f ) && (i < max_cnt - 1)){
+ abs_num/=f;
+ div*=f;
+ i++;
+ }
+
+ char buf [100];
+ sprintf(buf,"%10.2f %c%s",num/div,human_tbl[i],units.c_str());
+ std::string res(buf);
+ return (res);
+}
+
+
+void CPreviewMode::Dump(FILE *fd){
+ fprintf(fd," flags : %x\n", m_flags);
+ fprintf(fd," write_file : %d\n", getFileWrite()?1:0);
+ fprintf(fd," verbose : %d\n", (int)getVMode() );
+ fprintf(fd," realtime : %d\n", (int)getRealTime() );
+ fprintf(fd," flip : %d\n", (int)getClientServerFlip() );
+ fprintf(fd," cores : %d\n", (int)getCores() );
+ fprintf(fd," single core : %d\n", (int)getSingleCore() );
+ fprintf(fd," flow-flip : %d\n", (int)getClientServerFlowFlip() );
+ fprintf(fd," no clean close : %d\n", (int)getNoCleanFlowClose() );
+ fprintf(fd," zmq_publish : %d\n", (int)get_zmq_publish_enable() );
+ fprintf(fd," vlan_enable : %d\n", (int)get_vlan_mode_enable() );
+ fprintf(fd," client_cfg : %d\n", (int)get_is_client_cfg_enable() );
+ fprintf(fd," mbuf_cache_disable : %d\n", (int)isMbufCacheDisabled() );
+ fprintf(fd," vm mode : %d\n", (int)get_vm_one_queue_enable()?1:0 );
+}
+
+void CFlowGenStats::clear(){
+ m_nat_lookup_no_flow_id=0;
+ m_total_bytes=0;
+ m_total_pkt=0;
+ m_total_open_flows =0;
+ m_total_close_flows =0;
+ m_nat_lookup_no_flow_id=0;
+ m_nat_lookup_remove_flow_id=0;
+ m_nat_lookup_wait_ack_state = 0;
+ m_nat_lookup_add_flow_id=0;
+ m_nat_flow_timeout=0;
+ m_nat_flow_timeout_wait_ack = 0;
+ m_nat_flow_learn_error=0;
+}
+
+void CFlowGenStats::dump(FILE *fd){
+ std::string s_bytes=double_to_human_str((double )(m_total_bytes),
+ "bytes",
+ KBYE_1024);
+
+ std::string s_pkt=double_to_human_str((double )(m_total_pkt),
+ "pkt",
+ KBYE_1000);
+
+ std::string s_flows=double_to_human_str((double )(m_total_open_flows),
+ "flows",
+ KBYE_1000);
+
+ DP_S(m_total_bytes,s_bytes);
+ DP_S(m_total_pkt,s_pkt);
+ DP_S(m_total_open_flows,s_flows);
+ DP(m_total_pkt);
+ DP(m_total_open_flows);
+ DP(m_total_close_flows);
+ DP_name("active",(m_total_open_flows-m_total_close_flows));
+ DP(m_total_bytes);
+ DP(m_nat_lookup_no_flow_id);
+
+ DP(m_nat_lookup_no_flow_id);
+ DP(m_nat_lookup_remove_flow_id);
+ DP(m_nat_lookup_wait_ack_state);
+ DP(m_nat_lookup_add_flow_id);
+ DP(m_nat_flow_timeout);
+ DP(m_nat_flow_timeout_wait_ack);
+ DP_name("active_nat",(m_nat_lookup_add_flow_id-m_nat_lookup_remove_flow_id));
+ DP_name("active_nat_wait_syn", (m_nat_lookup_add_flow_id - m_nat_lookup_wait_ack_state));
+ DP(m_nat_flow_learn_error);
+}
+
+
+
+int CErfIF::open_file(std::string file_name){
+ BP_ASSERT(m_writer==0);
+
+ if ( m_preview_mode->getFileWrite() ){
+ capture_type_e file_type=ERF;
+ if ( m_preview_mode->get_pcap_mode_enable() ){
+ file_type=LIBPCAP;
+ }
+ m_writer = CCapWriterFactory::CreateWriter(file_type,(char *)file_name.c_str());
+ if (m_writer == NULL) {
+ fprintf(stderr,"ERROR can't create cap file %s ",(char *)file_name.c_str());
+ return (-1);
+ }
+ }
+ m_raw = new CCapPktRaw();
+ return (0);
+}
+
+
+int CErfIF::write_pkt(CCapPktRaw *pkt_raw){
+
+ BP_ASSERT(m_writer);
+
+ if ( m_preview_mode->getFileWrite() ){
+ BP_ASSERT(m_writer);
+ bool res=m_writer->write_packet(pkt_raw);
+ if (res != true) {
+ fprintf(stderr,"ERROR can't write to cap file");
+ return (-1);
+ }
+ }
+ return (0);
+}
+
+
+int CErfIF::close_file(void){
+
+ if (m_raw) {
+ delete m_raw;
+ m_raw = NULL;
+ }
+
+ if ( m_preview_mode->getFileWrite() ){
+ if (m_writer) {
+ delete m_writer;
+ m_writer = NULL;
+ }
+ }
+
+ return (0);
+}
+
+
+
+void CFlowKey::Clean(){
+ m_ipaddr1=0;
+ m_ipaddr2=0;
+ m_port1=0;
+ m_port2=0;
+ m_ip_proto=0;
+ m_l2_proto=0;
+ m_vrfid=0;
+}
+
+void CFlowKey::Dump(FILE *fd){
+ fprintf(fd," %x:%x:%x:%x:%x:%x:%x\n",m_ipaddr1,m_ipaddr2,m_port1,m_port2,m_ip_proto,m_l2_proto,m_vrfid);
+}
+
+
+
+void CPacketDescriptor::Dump(FILE *fd){
+ fprintf(fd," IsSwapTuple : %d \n",IsSwapTuple()?1:0);
+ fprintf(fd," IsSInitDir : %d \n",IsInitSide()?1:0);
+ fprintf(fd," Isvalid : %d ",IsValidPkt()?1:0);
+ fprintf(fd," IsRtt : %d ",IsRtt()?1:0);
+ fprintf(fd," IsLearn : %d ",IsLearn()?1:0);
+
+ if (IsTcp() ) {
+ fprintf(fd," TCP ");
+ }else{
+ fprintf(fd," UDP ");
+ }
+ fprintf(fd," IsLast Pkt : %d ", IsLastPkt() ?1:0);
+ fprintf(fd," id : %d \n",getId() );
+
+ fprintf(fd," flow_ID : %d , max_pkts : %u, max_aging: %d sec , pkt_id : %u, init: %d ( dir:%d dir_max :%d ) bid:%d \n",getFlowId(),
+ GetMaxPktsPerFlow(),
+ GetMaxFlowTimeout() ,
+ getFlowPktNum(),
+ IsInitSide(),
+ GetDirInfo()->GetPktNum(),
+ GetDirInfo()->GetMaxPkts(),
+ IsBiDirectionalFlow()?1:0
+
+ );
+ fprintf(fd,"\n");
+}
+
+
+void CPacketIndication::UpdateOffsets(){
+ m_ether_offset = getEtherOffset();
+ m_ip_offset = getIpOffset();
+ m_udp_tcp_offset = getTcpOffset();
+ m_payload_offset = getPayloadOffset();
+}
+
+void CPacketIndication::UpdatePacketPadding(){
+ m_packet_padding = m_packet->getTotalLen() - (l3.m_ipv4->getTotalLength()+ getIpOffset());
+}
+
+
+void CPacketIndication::RefreshPointers(){
+
+ char *pobase=getBasePtr();
+
+ m_ether = (EthernetHeader *) (pobase + m_ether_offset);
+ l3.m_ipv4 = (IPHeader *) (pobase + m_ip_offset);
+ l4.m_tcp= (TCPHeader *)(pobase + m_udp_tcp_offset);
+ if ( m_payload_offset ){
+ m_payload =(uint8_t *)(pobase + m_payload_offset);
+ }else{
+ m_payload =(uint8_t *)(0);
+ }
+}
+
+// copy ref assume pkt point to a fresh
+void CPacketIndication::Clone(CPacketIndication * obj,CCapPktRaw * pkt){
+ Clean();
+ m_cap_ipg = obj->m_cap_ipg;
+ m_packet = pkt;
+ char *pobase=getBasePtr();
+ m_flow = obj->m_flow;
+
+ m_ether = (EthernetHeader *) (pobase + obj->getEtherOffset());
+ l3.m_ipv4 = (IPHeader *) (pobase + obj->getIpOffset());
+ m_is_ipv6 = obj->m_is_ipv6;
+ l4.m_tcp= (TCPHeader *)(pobase + obj->getTcpOffset());
+ if ( obj->getPayloadOffset() ){
+ m_payload =(uint8_t *)(pobase + obj->getPayloadOffset());
+ }else{
+ m_payload =(uint8_t *)(0);
+ }
+ m_payload_len = obj->m_payload_len;
+ m_flow_key = obj->m_flow_key;
+ m_desc = obj->m_desc;
+
+ m_packet_padding = obj->m_packet_padding;
+ /* copy offsets*/
+ m_ether_offset = obj->m_ether_offset;
+ m_ip_offset = obj->m_ip_offset;
+ m_udp_tcp_offset = obj->m_udp_tcp_offset;;
+ m_payload_offset = obj->m_payload_offset;
+}
+
+
+
+void CPacketIndication::Dump(FILE *fd,int verbose){
+ fprintf(fd," ipg : %f \n",m_cap_ipg);
+ fprintf(fd," key \n");
+ fprintf(fd," ------\n");
+ m_flow_key.Dump(fd);
+
+ fprintf(fd," L2 info \n");
+ fprintf(fd," ------\n");
+ m_packet->Dump(fd,verbose);
+
+ fprintf(fd," Descriptor \n");
+ fprintf(fd," ------\n");
+ m_desc.Dump(fd);
+
+ if ( m_desc.IsValidPkt() ) {
+ fprintf(fd," ipv4 \n");
+ l3.m_ipv4->dump(fd);
+ if ( m_desc.IsUdp() ) {
+ l4.m_udp->dump(fd);
+ }else{
+ l4.m_tcp->dump(fd);
+ }
+ fprintf(fd," payload len : %d \n",m_payload_len);
+ }else{
+ fprintf(fd," not valid packet \n");
+ }
+}
+
+void CPacketIndication::Clean(){
+ m_desc.Clear();
+ m_ether=0;
+ l3.m_ipv4=0;
+ l4.m_tcp=0;
+ m_payload=0;
+ m_payload_len=0;
+}
+
+
+
+uint64_t CCPacketParserCounters::getTotalErrors(){
+ uint64_t res=
+ m_non_ip+
+ m_arp+
+ m_mpls+
+ m_non_valid_ipv4_ver+
+ m_ip_checksum_error+
+ m_ip_length_error+
+ m_ip_not_first_fragment_error+
+ m_ip_ttl_is_zero_error+
+ m_ip_multicast_error+
+
+ m_non_tcp_udp_ah+
+ m_non_tcp_udp_esp+
+ m_non_tcp_udp_icmp+
+ m_non_tcp_udp_gre+
+ m_non_tcp_udp_ip+
+ m_tcp_udp_pkt_length_error;
+ return (res);
+}
+
+void CCPacketParserCounters::Clear(){
+ m_pkt=0;
+ m_non_ip=0;
+ m_vlan=0;
+ m_arp=0;
+ m_mpls=0;
+
+ m_non_valid_ipv4_ver=0;
+ m_ip_checksum_error=0;
+ m_ip_length_error=0;
+ m_ip_not_first_fragment_error=0;
+ m_ip_ttl_is_zero_error=0;
+ m_ip_multicast_error=0;
+ m_ip_header_options=0;
+
+ m_non_tcp_udp=0;
+ m_non_tcp_udp_ah=0;
+ m_non_tcp_udp_esp=0;
+ m_non_tcp_udp_icmp=0;
+ m_non_tcp_udp_gre=0;
+ m_non_tcp_udp_ip=0;
+ m_tcp_header_options=0;
+ m_tcp_udp_pkt_length_error=0;
+ m_tcp=0;
+ m_udp=0;
+ m_valid_udp_tcp=0;
+}
+
+
+void CCPacketParserCounters::Dump(FILE *fd){
+
+ DP (m_pkt);
+ DP (m_non_ip);
+ DP (m_vlan);
+ DP (m_arp);
+ DP (m_mpls);
+
+ DP (m_non_valid_ipv4_ver);
+ DP (m_ip_checksum_error);
+ DP (m_ip_length_error);
+ DP (m_ip_not_first_fragment_error);
+ DP (m_ip_ttl_is_zero_error);
+ DP (m_ip_multicast_error);
+ DP (m_ip_header_options);
+
+ DP (m_non_tcp_udp);
+ DP (m_non_tcp_udp_ah);
+ DP (m_non_tcp_udp_esp);
+ DP (m_non_tcp_udp_icmp);
+ DP (m_non_tcp_udp_gre);
+ DP (m_non_tcp_udp_ip);
+ DP (m_tcp_header_options);
+ DP (m_tcp_udp_pkt_length_error);
+ DP (m_tcp);
+ DP (m_udp);
+ DP (m_valid_udp_tcp);
+}
+
+
+bool CPacketParser::Create(){
+ m_counter.Clear();
+ return (true);
+}
+
+void CPacketParser::Delete(){
+}
+
+
+bool CPacketParser::ProcessPacket(CPacketIndication * pkt_indication,
+ CCapPktRaw * raw_packet){
+ BP_ASSERT(pkt_indication);
+ pkt_indication->ProcessPacket(this,raw_packet);
+ if (pkt_indication->m_desc.IsValidPkt()) {
+ return (true);
+ }
+ return (false);
+}
+
+void CPacketParser::Dump(FILE *fd){
+ fprintf(fd," parser statistic \n");
+ fprintf(fd," ===================== \n");
+ m_counter.Dump(fd);
+}
+
+
+void CPacketIndication::SetKey(void){
+ uint32_t ip_src, ip_dst;
+
+ m_desc.SetIsValidPkt(true);
+ if (is_ipv6()){
+ uint16_t ipv6_src[8];
+ uint16_t ipv6_dst[8];
+
+ l3.m_ipv6->getSourceIpv6(&ipv6_src[0]);
+ l3.m_ipv6->getDestIpv6(&ipv6_dst[0]);
+ ip_src=(ipv6_src[6] << 16) | ipv6_src[7];
+ ip_dst=(ipv6_dst[6] << 16) | ipv6_dst[7];
+ m_flow_key.m_ip_proto = l3.m_ipv6->getNextHdr();
+ }else{
+ ip_src=l3.m_ipv4->getSourceIp();
+ ip_dst=l3.m_ipv4->getDestIp();
+ m_flow_key.m_ip_proto = l3.m_ipv4->getProtocol();
+ }
+
+ /* UDP/TCP has same place */
+ uint16_t src_port = l4.m_udp->getSourcePort();
+ uint16_t dst_port = l4.m_udp->getDestPort();
+ if (ip_src > ip_dst ) {
+ m_flow_key.m_ipaddr1 =ip_dst;
+ m_flow_key.m_ipaddr2 =ip_src;
+ m_flow_key.m_port1 = dst_port;
+ m_flow_key.m_port2 = src_port;
+ }else{
+ m_desc.SetSwapTuple(true);
+ m_flow_key.m_ipaddr1 = ip_src;
+ m_flow_key.m_ipaddr2 = ip_dst;
+ m_flow_key.m_port1 = src_port;
+ m_flow_key.m_port2 = dst_port;
+ }
+ m_flow_key.m_l2_proto = 0;
+ m_flow_key.m_vrfid = 0;
+}
+
+uint8_t CPacketIndication::ProcessIpPacketProtocol(CCPacketParserCounters *m_cnt,
+ uint8_t protocol, int *offset){
+
+ char * packetBase = m_packet->raw;
+ TCPHeader * tcp=0;
+ UDPHeader * udp=0;
+ uint16_t tcp_header_len=0;
+
+ switch (protocol) {
+ case IPHeader::Protocol::TCP :
+ m_desc.SetIsTcp(true);
+ tcp =(TCPHeader *)(packetBase +*offset);
+ l4.m_tcp = tcp;
+
+ tcp_header_len = tcp->getHeaderLength();
+ if ( tcp_header_len > (5*4) ){
+ // we have ip option
+ m_cnt->m_tcp_header_options++;
+ }
+ *offset += tcp_header_len;
+ m_cnt->m_tcp++;
+ break;
+ case IPHeader::Protocol::UDP :
+ m_desc.SetIsUdp(true);
+ udp =(UDPHeader *)(packetBase +*offset);
+ l4.m_udp = udp;
+ *offset += 8;
+ m_cnt->m_udp++;
+ break;
+ case IPHeader::Protocol::AH:
+ m_cnt->m_non_tcp_udp_ah++;
+ return (1);
+ break;
+ case IPHeader::Protocol::ESP:
+ m_cnt->m_non_tcp_udp_esp++;
+ return (1);
+ break;
+ case IPHeader::Protocol::ICMP:
+ case IPHeader::Protocol::IPV6_ICMP:
+ m_cnt->m_non_tcp_udp_icmp++;
+ return (1);
+ break;
+ case IPHeader::Protocol::GRE:
+ m_cnt->m_non_tcp_udp_gre++;
+ return (1);
+ break;
+ case IPHeader::Protocol::IP:
+ m_cnt->m_non_ip++;
+ return (1);
+ break;
+
+ default:
+ m_cnt->m_non_tcp_udp++;
+ return (1);
+ break;
+ }
+
+ /* out of packet */
+ if ( *offset > m_packet->getTotalLen() ) {
+ m_cnt->m_tcp_udp_pkt_length_error++;
+ return (1);
+ }
+ return (0);
+}
+
+
+void CPacketIndication::ProcessIpPacket(CPacketParser *parser,
+ int offset){
+
+ char * packetBase;
+ CCPacketParserCounters * m_cnt=&parser->m_counter;
+ packetBase = m_packet->raw;
+ uint8_t protocol;
+ BP_ASSERT(l3.m_ipv4);
+
+ parser->m_counter.m_pkt++;
+
+ if ( l3.m_ipv4->getVersion() == 4 ){
+ m_cnt->m_ipv4++;
+ }else{
+ m_cnt->m_non_valid_ipv4_ver++;
+ return;
+ }
+ // check the IP Length
+ if( (uint32_t)(l3.m_ipv4->getTotalLength()+offset) > (uint32_t)( m_packet->getTotalLen()) ){
+ m_cnt->m_ip_length_error++;
+ return;
+ }
+
+ uint16_t ip_offset=offset;
+ uint16_t ip_header_length = l3.m_ipv4->getHeaderLength();
+
+ if ( ip_header_length >(5*4) ){
+ m_cnt->m_ip_header_options++;
+ }
+
+ if ( (uint32_t)(ip_header_length + offset) > (uint32_t)m_packet->getTotalLen() ) {
+ m_cnt->m_ip_length_error++;
+ return;
+ }
+ offset += ip_header_length;
+
+
+ if( l3.m_ipv4->getTimeToLive() ==0 ){
+ m_cnt->m_ip_ttl_is_zero_error++;
+ return;
+ }
+
+ if( l3.m_ipv4->isNotFirstFragment() ) {
+ m_cnt->m_ip_not_first_fragment_error++;
+ return;
+ }
+
+ protocol = l3.m_ipv4->getProtocol();
+ if (ProcessIpPacketProtocol(m_cnt,protocol,&offset) != 0) {
+ return;
+ };
+
+ uint16_t payload_offset_from_ip = offset-ip_offset;
+ if ( payload_offset_from_ip > l3.m_ipv4->getTotalLength() ) {
+ m_cnt->m_tcp_udp_pkt_length_error++;
+ return;
+ }
+
+ if ( m_packet->pkt_len > MAX_PKT_SIZE ){
+ m_cnt->m_tcp_udp_pkt_length_error++;
+ printf("ERROR packet is too big, not supported jumbo packets that larger than %d \n",MAX_PKT_SIZE);
+ return;
+ }
+
+ // Set packet length and include padding if needed
+ m_packet->pkt_len = l3.m_ipv4->getTotalLength() + getIpOffset();
+ if (m_packet->pkt_len < 60) { m_packet->pkt_len = 60; }
+
+ m_cnt->m_valid_udp_tcp++;
+ m_payload_len = l3.m_ipv4->getTotalLength() - (payload_offset_from_ip);
+ m_payload = (uint8_t *)(packetBase +offset);
+ UpdatePacketPadding();
+ SetKey();
+}
+
+
+
+void CPacketIndication::ProcessIpv6Packet(CPacketParser *parser,
+ int offset){
+
+ char * packetBase = m_packet->raw;
+ CCPacketParserCounters * m_cnt=&parser->m_counter;
+ uint16_t src_ipv6[6];
+ uint16_t dst_ipv6[6];
+ uint16_t idx;
+ uint8_t protocol;
+ BP_ASSERT(l3.m_ipv6);
+
+ parser->m_counter.m_pkt++;
+
+ if ( l3.m_ipv6->getVersion() == 6 ){
+ m_cnt->m_ipv6++;
+ }else{
+ m_cnt->m_non_valid_ipv6_ver++;
+ return;
+ }
+
+ // Check length
+ if ((uint32_t)(l3.m_ipv6->getPayloadLen()+offset+l3.m_ipv6->getHeaderLength()) >
+ (uint32_t)( m_packet->getTotalLen()) ){
+ m_cnt->m_ipv6_length_error++;
+ return;
+ }
+
+ for (idx=0; idx<6; idx++){
+ src_ipv6[idx] = CGlobalInfo::m_options.m_src_ipv6[idx];
+ dst_ipv6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
+ }
+ l3.m_ipv6->updateMSBIpv6Src(&src_ipv6[0]);
+ l3.m_ipv6->updateMSBIpv6Dst(&dst_ipv6[0]);
+
+ offset += l3.m_ipv6->getHeaderLength();
+ protocol = l3.m_ipv6->getNextHdr();
+ if (ProcessIpPacketProtocol(m_cnt,protocol,&offset) != 0) {
+ return;
+ };
+
+ // Set packet length and include padding if needed
+ uint16_t real_pkt_size = l3.m_ipv6->getPayloadLen()+ getIpOffset() + l3.m_ipv6->getHeaderLength();
+ m_packet->pkt_len = real_pkt_size;
+ if (m_packet->pkt_len < 60) { m_packet->pkt_len = 60; }
+
+ m_cnt->m_valid_udp_tcp++;
+ m_payload_len = l3.m_ipv6->getPayloadLen();
+ m_payload = (uint8_t *)(packetBase +offset);
+
+ m_packet_padding = m_packet->getTotalLen() - real_pkt_size;
+ assert( m_packet->getTotalLen()>= real_pkt_size );
+ SetKey();
+}
+
+
+static uint8_t cbuff[MAX_PKT_SIZE];
+
+bool CPacketIndication::ConvertPacketToIpv6InPlace(CCapPktRaw * pkt,
+ int offset){
+
+ // Copy l2 data and set l2 type to ipv6
+ memcpy(cbuff, pkt->raw, offset);
+ *(uint16_t*)(cbuff+12) = PKT_HTONS(EthernetHeader::Protocol::IPv6);
+
+ // Create the ipv6 header
+ IPHeader *ipv4 = (IPHeader *) (pkt->raw+offset);
+ IPv6Header *ipv6 = (IPv6Header *) (cbuff+offset);
+ uint8_t ipv6_hdrlen = ipv6->getHeaderLength();
+ memset(ipv6,0,ipv6_hdrlen);
+ ipv6->setVersion(6);
+ if (ipv4->getTotalLength() < ipv4->getHeaderLength()) {
+ return(false);
+ }
+ // Calculate the payload length
+ uint16_t p_len = ipv4->getTotalLength() - ipv4->getHeaderLength();
+ ipv6->setPayloadLen(p_len);
+ uint8_t l4_proto = ipv4->getProtocol();
+ ipv6->setNextHdr(l4_proto);
+ ipv6->setHopLimit(64);
+
+ // Update the least signficant 32-bits of ipv6 address
+ // using the ipv4 address
+ ipv6->updateLSBIpv6Src(ipv4->getSourceIp());
+ ipv6->updateLSBIpv6Dst(ipv4->getDestIp());
+
+ // Copy rest of packet
+ uint16_t ipv4_offset = offset + ipv4->getHeaderLength();
+ uint16_t ipv6_offset = offset + ipv6_hdrlen;
+ memcpy(cbuff+ipv6_offset,pkt->raw+ipv4_offset,p_len);
+
+ ipv6_offset+=p_len;
+ memcpy(pkt->raw,cbuff,ipv6_offset);
+
+ // Set packet length
+ pkt->pkt_len = ipv6_offset;
+ m_is_ipv6 = true;
+
+ return (true);
+}
+
+void CPacketIndication::ProcessPacket(CPacketParser *parser,
+ CCapPktRaw * pkt){
+ _ProcessPacket(parser,pkt);
+ if ( m_desc.IsValidPkt() ){
+ UpdateOffsets(); /* update fast offsets */
+ }
+}
+
+/* process packet */
+void CPacketIndication::_ProcessPacket(CPacketParser *parser,
+ CCapPktRaw * pkt){
+
+ BP_ASSERT(pkt);
+ m_packet =pkt;
+ Clean();
+ CCPacketParserCounters * m_cnt=&parser->m_counter;
+
+ int offset = 0;
+ char * packetBase;
+ packetBase = m_packet->raw;
+ BP_ASSERT(packetBase);
+ m_ether = (EthernetHeader *)packetBase;
+ m_is_ipv6 = false;
+
+ // IP
+ switch( m_ether->getNextProtocol() ) {
+ case EthernetHeader::Protocol::IP :
+ offset = 14;
+ l3.m_ipv4 =(IPHeader *)(packetBase+offset);
+ break;
+ case EthernetHeader::Protocol::IPv6 :
+ offset = 14;
+ l3.m_ipv6 =(IPv6Header *)(packetBase+offset);
+ m_is_ipv6 = true;
+ break;
+ case EthernetHeader::Protocol::VLAN :
+ m_cnt->m_vlan++;
+ switch ( m_ether->getVlanProtocol() ){
+ case EthernetHeader::Protocol::IP:
+ offset = 18;
+ l3.m_ipv4 =(IPHeader *)(packetBase+offset);
+ break;
+ case EthernetHeader::Protocol::IPv6 :
+ offset = 18;
+ l3.m_ipv6 =(IPv6Header *)(packetBase+offset);
+ m_is_ipv6 = true;
+ break;
+ case EthernetHeader::Protocol::MPLS_Multicast :
+ case EthernetHeader::Protocol::MPLS_Unicast :
+ m_cnt->m_mpls++;
+ return;
+
+ case EthernetHeader::Protocol::ARP :
+ m_cnt->m_arp++;
+ return;
+
+ default:
+ m_cnt->m_non_ip++;
+ return ; /* Non IP */
+ }
+ break;
+ case EthernetHeader::Protocol::ARP :
+ m_cnt->m_arp++;
+ return; /* Non IP */
+ break;
+
+ case EthernetHeader::Protocol::MPLS_Multicast :
+ case EthernetHeader::Protocol::MPLS_Unicast :
+ m_cnt->m_mpls++;
+ return; /* Non IP */
+ break;
+
+ default:
+ m_cnt->m_non_ip++;
+ return; /* Non IP */
+ }
+
+ if (is_ipv6() == false) {
+ if( (14+20) > (uint32_t)( m_packet->getTotalLen()) ){
+ m_cnt->m_ip_length_error++;
+ return;
+ }
+ }
+
+ // For now, we can not mix ipv4 and ipv4 packets
+ // so we require --ipv6 option be set for ipv6 packets
+ if ((m_is_ipv6) && (CGlobalInfo::is_ipv6_enable() == false)){
+ fprintf(stderr,"ERROR --ipv6 must be set to process ipv6 packets\n");
+ exit(-1);
+ }
+
+ // Convert to Ipv6 if requested and not already Ipv6
+ if ((CGlobalInfo::is_ipv6_enable()) && (is_ipv6() == false )) {
+ if (ConvertPacketToIpv6InPlace(pkt, offset) == false){
+ /* Move to next packet as this was erroneous */
+ printf(" unable to convert packet to IPv6, skipping...\n");
+ return;
+ }
+ }
+
+ if (is_ipv6()){
+ ProcessIpv6Packet(parser,offset);
+ }else{
+ ProcessIpPacket(parser,offset);
+ }
+}
+
+
+
+void CFlowTableStats::Clear(){
+ m_lookup=0;
+ m_found=0;
+ m_fif=0;
+ m_add=0;
+ m_remove=0;
+ m_fif_err=0;
+ m_active=0;
+}
+
+void CFlowTableStats::Dump(FILE *fd){
+ DP (m_lookup);
+ DP (m_found);
+ DP (m_fif);
+ DP (m_add);
+ DP (m_remove);
+ DP (m_fif_err);
+ DP (m_active);
+}
+
+
+void CFlow::Dump(FILE *fd){
+ fprintf(fd," fif is swap : %d \n",is_fif_swap);
+}
+
+
+void CFlowTableManagerBase::Dump(FILE *fd){
+ m_stats.Dump(fd);
+}
+
+// Return flow that has given key. If flow does not exist, create one, and add to CFlow data structure.
+// key - key to lookup by.
+// is_fif - return: true if flow did not exist (This is the first packet we see in this flow).
+// false if flow already existed
+CFlow * CFlowTableManagerBase::process(const CFlowKey & key, bool & is_fif) {
+ m_stats.m_lookup++;
+ is_fif=false;
+ CFlow * lp=lookup(key);
+ if ( lp ) {
+ m_stats.m_found++;
+ return (lp);
+ }else{
+ m_stats.m_fif++;
+ m_stats.m_active++;
+ m_stats.m_add++;
+ is_fif=true;
+ lp= add(key );
+ if (lp) {
+ }else{
+ m_stats.m_fif_err++;
+ }
+ }
+ return (lp);
+}
+
+bool CFlowTableMap::Create(int max_size){
+ m_stats.Clear();
+ return (true);
+}
+
+void CFlowTableMap::Delete(){
+ remove_all();
+}
+
+void CFlowTableMap::remove(const CFlowKey & key ) {
+ CFlow *lp=lookup(key);
+ if ( lp ) {
+ delete lp;
+ m_stats.m_remove++;
+ m_stats.m_active--;
+ m_map.erase(key);
+ }else{
+ BP_ASSERT(0);
+ }
+}
+
+
+CFlow * CFlowTableMap::lookup(const CFlowKey & key ) {
+ flow_map_t::iterator iter;
+ iter = m_map.find(key);
+ if (iter != m_map.end() ) {
+ return ( (*iter).second );
+ }else{
+ return (( CFlow*)0);
+ }
+}
+
+CFlow * CFlowTableMap::add(const CFlowKey & key ) {
+ CFlow * flow = new CFlow();
+ m_map.insert(flow_map_t::value_type(key,flow));
+ return (flow);
+}
+
+void CFlowTableMap::remove_all(){
+ if ( m_map.empty() )
+ return;
+ flow_map_iter_t it;
+ for (it= m_map.begin(); it != m_map.end(); ++it) {
+ CFlow *lp = it->second;
+ delete lp;
+ }
+ m_map.clear();
+}
+
+uint64_t CFlowTableMap::count(){
+ return ( m_map.size());
+}
+
+
+/*
+ * This function will insert an IP option header containing metadata for the
+ * rx-check feature.
+ *
+ * An mbuf is created to hold the new option header plus the portion of the
+ * packet after the base IP header (includes any IP options header that might
+ * exist). This mbuf is then linked into the existing mbufs (becoming the
+ * second mbuf).
+ *
+ * Note that the rxcheck option header is inserted as the first option header,
+ * and any existing IP option headers are placed after it.
+ */
+void CFlowPktInfo::do_generate_new_mbuf_rxcheck(rte_mbuf_t * m,
+ CGenNode * node,
+ bool single_port){
+
+ /* retrieve size of rx-check header, must be multiple of 8 */
+ uint16_t opt_len = RX_CHECK_LEN;
+ uint16_t current_opt_len = 0;
+ assert( (opt_len % 8) == 0 );
+
+ /* determine starting move location */
+ char *mp1 = rte_pktmbuf_mtod(m, char*);
+ uint16_t mp1_offset = m_pkt_indication.getFastIpOffsetFast();
+ if (unlikely (m_pkt_indication.is_ipv6()) ) {
+ mp1_offset += IPv6Header::DefaultSize;
+ }else{
+ mp1_offset += IPHeader::DefaultSize;
+ }
+ char *move_from = mp1 + mp1_offset;
+
+ /* determine size of new mbuf required */
+ uint16_t move_len = m->data_len - mp1_offset;
+ uint16_t new_mbuf_size = move_len + opt_len;
+ uint16_t mp2_offset = opt_len;
+
+ /* obtain a new mbuf */
+ rte_mbuf_t * new_mbuf = CGlobalInfo::pktmbuf_alloc(node->get_socket_id(), new_mbuf_size);
+ assert(new_mbuf);
+ char * mp2 = rte_pktmbuf_append(new_mbuf, new_mbuf_size);
+ char * move_to = mp2 + mp2_offset;
+
+ /* move part of packet from first mbuf to new mbuf */
+ memmove(move_to, move_from, move_len);
+
+ /* trim first mbuf and set pointer to option header*/
+ CRx_check_header *rxhdr;
+ uint16_t buf_adjust = move_len;
+ rxhdr = (CRx_check_header *)mp2;
+ m->data_len -= buf_adjust;
+
+ /* insert rx-check data as an IPv4 option header or IPv6 extension header*/
+ CFlowPktInfo * lp=node->m_pkt_info;
+ CPacketDescriptor * desc=&lp->m_pkt_indication.m_desc;
+
+ /* set option type and update ip header length */
+ IPHeader * ipv4=(IPHeader *)(mp1 + 14);
+ if (unlikely (m_pkt_indication.is_ipv6()) ) {
+ IPv6Header * ipv6=(IPv6Header *)(mp1 + 14);
+ uint8_t save_header= ipv6->getNextHdr();
+ ipv6->setNextHdr(RX_CHECK_V6_OPT_TYPE);
+ ipv6->setHopLimit(TTL_RESERVE_DUPLICATE);
+ ipv6->setPayloadLen( ipv6->getPayloadLen() +
+ sizeof(CRx_check_header));
+ rxhdr->m_option_type = save_header;
+ rxhdr->m_option_len = RX_CHECK_V6_OPT_LEN;
+ }else{
+ current_opt_len = ipv4->getHeaderLength();
+ ipv4->setHeaderLength(current_opt_len+opt_len);
+ ipv4->setTotalLength(ipv4->getTotalLength()+opt_len);
+ ipv4->setTimeToLive(TTL_RESERVE_DUPLICATE);
+ rxhdr->m_option_type = RX_CHECK_V4_OPT_TYPE;
+ rxhdr->m_option_len = RX_CHECK_V4_OPT_LEN;
+ }
+
+ /* fill in the rx-check metadata in the options header */
+ if ( CGlobalInfo::m_options.is_rxcheck_const_ts() ){
+ /* Runtime flag to use a constant value for the timestamp field. */
+ /* This is used by simulation to provide consistency across runs. */
+ rxhdr->m_time_stamp = 0xB3B2B1B0;
+ }else{
+ rxhdr->m_time_stamp = os_get_hr_tick_32();
+ }
+ rxhdr->m_magic = RX_CHECK_MAGIC;
+ rxhdr->m_flow_id = node->m_flow_id | ( ( (uint64_t)(desc->getFlowId() & 0xf))<<52 ) ; // include thread_id, node->flow_id, sub_flow in case of multi-flow template
+ rxhdr->m_flags = 0;
+ rxhdr->m_aging_sec = desc->GetMaxFlowTimeout();
+ rxhdr->m_template_id = (uint8_t)desc->getId();
+
+ /* add the flow packets goes to the same port */
+ if (single_port) {
+ rxhdr->m_pkt_id = desc->getFlowPktNum();
+ rxhdr->m_flow_size = desc->GetMaxPktsPerFlow();
+
+ }else{
+ rxhdr->m_pkt_id = desc->GetDirInfo()->GetPktNum();
+ rxhdr->m_flow_size = desc->GetDirInfo()->GetMaxPkts();
+ /* set dir */
+ rxhdr->set_dir(desc->IsInitSide()?1:0);
+ rxhdr->set_both_dir(desc->IsBiDirectionalFlow()?1:0);
+ }
+
+ /* update checksum for IPv4, split across 2 mbufs */
+ if (likely ( ! m_pkt_indication.is_ipv6()) ) {
+ ipv4->updateCheckSum2((uint8_t *)ipv4, current_opt_len, (uint8_t *)rxhdr, opt_len);
+ }
+
+ /* link new mbuf */
+ new_mbuf->next = m->next;
+ new_mbuf->nb_segs++;
+ m->next = new_mbuf;
+ m->nb_segs++;
+ m->pkt_len += opt_len;
+}
+
+
+char * CFlowPktInfo::push_ipv4_option_offline(uint8_t bytes){
+ /* must be align by 4*/
+ assert( (bytes % 4)== 0 );
+ assert(m_pkt_indication.is_ipv6()==false);
+ if ( m_pkt_indication.l3.m_ipv4->getHeaderLength()+bytes>60 ){
+ printf(" ERROR ipv4 options size is too big, should be able to add %d bytes for internal info \n",bytes);
+ return((char *)0);
+ }
+ /* now we can do that !*/
+
+ /* add more bytes to the packet */
+ m_packet->append(bytes);
+ uint8_t ip_offset_to_move= m_pkt_indication.getFastIpOffsetFast()+IPHeader::DefaultSize;
+ char *p=m_packet->raw+ip_offset_to_move;
+ uint16_t bytes_to_move= m_packet->pkt_len - ip_offset_to_move -bytes;
+
+ /* move the start of ipv4 options */
+ memmove(p+bytes ,p, bytes_to_move);
+
+ /* fix all other stuff */
+ if ( m_pkt_indication.m_udp_tcp_offset ){
+ m_pkt_indication.m_udp_tcp_offset+=bytes;
+ }
+ if ( m_pkt_indication.m_payload_offset ) {
+ m_pkt_indication.m_payload_offset+=bytes;
+ }
+
+ m_pkt_indication.RefreshPointers();
+ /* now pointer are updated we can manipulate ipv4 header */
+ IPHeader * ipv4=m_pkt_indication.l3.m_ipv4;
+
+ ipv4->setTotalLength(ipv4->getTotalLength()+bytes);
+ ipv4->setHeaderLength(ipv4->getHeaderLength()+(bytes));
+
+ m_pkt_indication.UpdatePacketPadding();
+
+ /* refresh the global mbuf */
+ free_const_mbuf();
+ alloc_const_mbuf();
+ return (p);
+}
+
+void CFlowPktInfo::mask_as_learn(){
+ CNatOption *lpNat;
+ if ( m_pkt_indication.is_ipv6() ) {
+ lpNat=(CNatOption *)push_ipv6_option_offline(CNatOption::noOPTION_LEN);
+ lpNat->set_init_ipv6_header();
+ lpNat->set_fid(0);
+ lpNat->set_thread_id(0);
+ } else {
+ if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_IP_OPTION)) {
+ // Make space in IPv4 header for NAT option
+ lpNat=(CNatOption *)push_ipv4_option_offline(CNatOption::noOPTION_LEN);
+ lpNat->set_init_ipv4_header();
+ lpNat->set_fid(0);
+ lpNat->set_thread_id(0);
+ m_pkt_indication.l3.m_ipv4->updateCheckSum();
+ }
+ /* learn is true */
+ m_pkt_indication.m_desc.SetLearn(true);
+ }
+}
+
+char * CFlowPktInfo::push_ipv6_option_offline(uint8_t bytes){
+
+ /* must be align by 8*/
+ assert( (bytes % 8)== 0 );
+ assert(m_pkt_indication.is_ipv6()==true);
+
+ /* add more bytes to the packet */
+ m_packet->append(bytes);
+ uint8_t ip_offset_to_move= m_pkt_indication.getFastIpOffsetFast()+IPv6Header::DefaultSize;
+ char *p=m_packet->raw+ip_offset_to_move;
+ uint16_t bytes_to_move= m_packet->pkt_len - ip_offset_to_move -bytes;
+
+ /* move the start of ipv4 options */
+ memmove(p+bytes ,p, bytes_to_move);
+
+ /* fix all other stuff */
+ if ( m_pkt_indication.m_udp_tcp_offset ){
+ m_pkt_indication.m_udp_tcp_offset+=bytes;
+ }
+ if ( m_pkt_indication.m_payload_offset ) {
+ m_pkt_indication.m_payload_offset+=bytes;
+ }
+
+ m_pkt_indication.RefreshPointers();
+ /* now pointer are updated we can manipulate ipv6 header */
+ IPv6Header * ipv6=m_pkt_indication.l3.m_ipv6;
+
+ ipv6->setPayloadLen(ipv6->getPayloadLen()+bytes);
+ uint8_t save_header= ipv6->getNextHdr();
+ *p=save_header; /* copy next header */
+ ipv6->setNextHdr(CNatOption::noIPV6_OPTION);
+
+ m_pkt_indication.UpdatePacketPadding();
+
+ /* refresh the global mbuf */
+ free_const_mbuf();
+ alloc_const_mbuf();
+ return (p);
+}
+
+
+void CFlowPktInfo::alloc_const_mbuf(){
+
+ if ( m_packet->pkt_len > FIRST_PKT_SIZE ) {
+ /* pkt size is bigger than FIRST_PKT_SIZE let's create an offline buffer */
+ int i;
+ for (i=0; i<MAX_SOCKETS_SUPPORTED; i++) {
+ if ( CGlobalInfo::m_socket.is_sockets_enable(i) ){
+
+ rte_mbuf_t * m;
+ uint16_t pkt_s=(m_packet->pkt_len - FIRST_PKT_SIZE);
+
+ m = CGlobalInfo::pktmbuf_alloc(i,pkt_s);
+ BP_ASSERT(m);
+ char *p=rte_pktmbuf_append(m, pkt_s);
+ rte_memcpy(p,(m_packet->raw+FIRST_PKT_SIZE),pkt_s);
+
+ assert(m_big_mbuf[i]==NULL);
+ m_big_mbuf[i]=m;
+ }
+ }
+ }
+}
+
+void CFlowPktInfo::free_const_mbuf(){
+ int i;
+ for (i=0; i<MAX_SOCKETS_SUPPORTED; i++) {
+ rte_mbuf_t * m=m_big_mbuf[i];
+ if (m) {
+ rte_pktmbuf_free(m );
+ m_big_mbuf[i]=NULL;
+ }
+ }
+}
+
+
+bool CFlowPktInfo::Create(CPacketIndication * pkt_ind){
+ /* clone the packet*/
+ m_packet = new CCapPktRaw(pkt_ind->m_packet);
+ /* clone of the offsets */
+ m_pkt_indication.Clone(pkt_ind,m_packet);
+
+ int i;
+ for (i=0; i<MAX_SOCKETS_SUPPORTED; i++) {
+ m_big_mbuf[i] = NULL;
+ }
+ alloc_const_mbuf();
+ return (true);
+}
+
+void CFlowPktInfo::Delete(){
+ free_const_mbuf();
+ delete m_packet;
+}
+
+void CFlowPktInfo::Dump(FILE *fd){
+ m_pkt_indication.Dump(fd,0);
+}
+
+
+
+
+void CCapFileFlowInfo::save_to_erf(std::string cap_file_name,int pcap){
+ if (Size() ==0) {
+ fprintf(stderr,"ERROR no info for this flow ");
+ return ;
+ }
+ capture_type_e file_type=ERF;
+ if ( pcap ){
+ file_type=LIBPCAP;
+ }
+
+
+ CFileWriterBase * lpWriter=CCapWriterFactory::CreateWriter(file_type,(char *)cap_file_name.c_str());
+ if (lpWriter == NULL) {
+ fprintf(stderr,"ERROR can't create cap file %s ",(char *)cap_file_name.c_str());
+ return ;
+ }
+ int i;
+
+ for (i=0; i<(int)Size(); i++) {
+ CFlowPktInfo * lp=GetPacket((uint32_t)i);
+ bool res=lpWriter->write_packet(lp->m_packet);
+ BP_ASSERT(res);
+ }
+ delete lpWriter;
+}
+
+
+
+struct CTmpFlowPerDirInfo {
+ CTmpFlowPerDirInfo(){
+ m_pkt_id=0;
+ }
+
+ uint16_t m_pkt_id;
+};
+
+class CTmpFlowInfo {
+public:
+ CTmpFlowInfo(){
+ m_max_pkts=0;
+ m_max_aging_sec=0.0;
+ m_last_pkt=0.0;
+
+ }
+ ~CTmpFlowInfo(){
+ }
+public:
+ uint32_t m_max_pkts;
+ dsec_t m_max_aging_sec;
+ dsec_t m_last_pkt;
+
+ CTmpFlowPerDirInfo m_per_dir[CS_NUM];
+};
+
+typedef CTmpFlowInfo * flow_tmp_t;
+typedef std::map<uint16_t, flow_tmp_t> flow_tmp_map_t;
+typedef flow_tmp_map_t::iterator flow_tmp_map_iter_t;
+
+enum CCapFileFlowInfo::load_cap_file_err CCapFileFlowInfo::is_valid_template_load_time(){
+ int i;
+ for (i=0; i<Size(); i++) {
+ CFlowPktInfo * lp= GetPacket((uint32_t)i);
+ CPacketIndication * lpd=&lp->m_pkt_indication;
+ if ( lpd->getEtherOffset() !=0 ){
+ fprintf(stderr, "Error: Bad CAP file. Ether offset start is not 0 in packet %d \n", i+1);
+ return kPktNotSupp;
+ }
+
+ if ( CGlobalInfo::is_learn_mode() ) {
+ // We change TCP ports. Need them to be in first 64 byte mbuf.
+ // Since we also add IP option, and rx-check feature might add another IP option, better not allow
+ // OP options in this mode. If needed this limitation can be refined a bit.
+ if ( lpd->getTcpOffset() - lpd->getIpOffset() != 20 ) {
+ fprintf(stderr, "Error: Bad CAP file. In learn (NAT) mode, no IP options allowed \n");
+ return kIPOptionNotAllowed;
+ }
+ if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_TCP)) {
+ if (lpd->getIpProto() != IPPROTO_TCP && !lpd->m_desc.IsInitSide()) {
+ fprintf(stderr, "Error: In the chosen learn mode, all packets from server to client in CAP file should be TCP.\n");
+ fprintf(stderr, " Please give different CAP file, or try different --learn-mode\n");
+ return kNoTCPFromServer;
+ }
+ }
+ }
+ }
+
+ if ( CGlobalInfo::is_learn_mode() ) {
+ CPacketIndication &pkt_0_indication = GetPacket(0)->m_pkt_indication;
+
+ if ( pkt_0_indication.m_desc.IsPluginEnable() ) {
+ fprintf(stderr, "Error: plugins are not supported with --learn mode \n");
+ return kPlugInWithLearn;
+ }
+
+ if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_TCP)) {
+ if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_TCP_ACK)) {
+ if (Size() < 3) {
+ fprintf(stderr
+ , "Error: In the chosen learn mode, need at least the 3 TCP handshake packets.\n");
+ fprintf(stderr
+ , " Please give different CAP file, or try different --learn-mode\n");
+ return kTCPLearnModeBadFlow;
+ }
+ }
+ CPacketIndication &pkt_1_indication = GetPacket(1)->m_pkt_indication;
+
+
+ // verify first packet is TCP SYN from client
+ TCPHeader *tcp = (TCPHeader *)(pkt_0_indication.getBasePtr() + pkt_0_indication.getTcpOffset());
+ if ( (! pkt_0_indication.m_desc.IsInitSide()) || (! tcp->getSynFlag()) ) {
+ fprintf(stderr, "Error: In the chosen learn mode, first TCP packet should be SYN from client side.\n");
+ fprintf(stderr, " In cap file, first packet side direction is %s. TCP header is:\n"
+ , pkt_0_indication.m_desc.IsInitSide() ? "outside":"inside");
+ tcp->dump(stderr);
+ fprintf(stderr, " Please give different CAP file, or try different --learn-mode\n");
+ return kNoSyn;
+ }
+
+ // We want at least the TCP flags to be inside first mbuf
+ if (pkt_0_indication.getTcpOffset() + 14 > FIRST_PKT_SIZE) {
+ fprintf(stderr
+ , "Error: In the chosen learn mode, TCP flags offset should be less than %d, but it is %d.\n"
+ , FIRST_PKT_SIZE, pkt_0_indication.getTcpOffset() + 14);
+ fprintf(stderr, " Please give different CAP file, or try different --learn-mode\n");
+ return kTCPOffsetTooBig;
+ }
+ if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_TCP_ACK)) {
+ // To support TCP seq randomization from server to client, we need second packet in flow to be the server SYN+ACK
+ bool error = false;
+ if (pkt_1_indication.getIpProto() != IPPROTO_TCP) {
+ error = true;
+ } else {
+ TCPHeader *tcp = (TCPHeader *)(pkt_1_indication.getBasePtr() + pkt_1_indication.getTcpOffset());
+ if ( (! tcp->getSynFlag()) || (! tcp->getAckFlag()) || ( pkt_1_indication.m_desc.IsInitSide())) {
+ error = true;
+ }
+ }
+ if (error) {
+ fprintf(stderr, "Error: In the chosen learn mode, second packet should be SYN+ACK from server.\n");
+ fprintf(stderr, " Please give different CAP file, or try different --learn-mode\n");
+ return kNoTCPSynAck;
+ }
+
+ CPacketIndication &pkt_2_indication = GetPacket(2)->m_pkt_indication;
+ if ( (! pkt_2_indication.m_desc.IsInitSide()) ) {
+ fprintf(stderr
+ , "Error: Wrong third packet. In the chosen learn mode, need at least the 3 TCP handshake packets.\n");
+ fprintf(stderr
+ , " Please give different CAP file, or try different --learn-mode\n");
+ return kTCPLearnModeBadFlow;
+ }
+ if ((pkt_0_indication.m_cap_ipg < (double)LEARN_MODE_MIN_IPG / 1000)
+ || (pkt_1_indication.m_cap_ipg < (double)LEARN_MODE_MIN_IPG / 1000)) {
+ fprintf(stderr
+ , "Error: Bad cap file timings. In the chosen learn mode");
+ fprintf(stderr, "IPG between TCP handshake packets should be at least %d msec.\n", LEARN_MODE_MIN_IPG);
+ fprintf(stderr, " Current delay is %f between second and first, %f between third and second"
+ , pkt_0_indication.m_cap_ipg, pkt_1_indication.m_cap_ipg);
+ fprintf(stderr
+ , " Please give different CAP file, try different --learn-mode, or edit ipg parameters in template file\n");
+ return kTCPIpgTooLow;
+ }
+ }
+ }
+ }
+
+ return(kOK);
+}
+
+
+/**
+ * update global info
+ * 1. maximum aging
+ * 2. per sub-flow pkt_num/max-pkt per dir and per global
+ */
+void CCapFileFlowInfo::update_info(){
+ flow_tmp_map_iter_t iter;
+ flow_tmp_map_t ft;
+ CTmpFlowInfo * lpFlow;
+ int i;
+ dsec_t ctime=0.0;
+
+ // first iteration, lern all the info into a temp flow table
+ for (i=0; i<Size(); i++) {
+ CFlowPktInfo * lp= GetPacket((uint32_t)i);
+ // extract flow_id
+ CPacketDescriptor * desc=&lp->m_pkt_indication.m_desc;
+ uint16_t flow_id = desc->getFlowId();
+ CPacketDescriptorPerDir * lpCurPacket = desc->GetDirInfo();
+ pkt_dir_t dir=desc->IsInitSide()?CLIENT_SIDE:SERVER_SIDE; // with respect to the first sub-flow in the template
+
+ //update lpFlow
+ iter = ft.find(flow_id);
+ if (iter != ft.end() ) {
+ lpFlow=(*iter).second;
+ }else{
+ lpFlow = new CTmpFlowInfo();
+ assert(lpFlow);
+ ft.insert(flow_tmp_map_t::value_type(flow_id,lpFlow));
+ //add it
+
+ }
+
+ // main info
+ lpCurPacket->SetPktNum(lpFlow->m_per_dir[dir].m_pkt_id);
+ lpFlow->m_max_pkts++;
+ lpFlow->m_per_dir[dir].m_pkt_id++;
+
+ dsec_t delta = ctime - lpFlow->m_last_pkt ;
+ lpFlow->m_last_pkt = ctime;
+ if (delta > lpFlow->m_max_aging_sec) {
+ lpFlow->m_max_aging_sec = delta;
+ }
+ // per direction info
+
+ if (i<Size()) {
+ ctime += lp->m_pkt_indication.m_cap_ipg;
+ }
+ }
+
+
+ for (i=0; i<Size(); i++) {
+ CFlowPktInfo * lp= GetPacket((uint32_t)i);
+
+ CPacketDescriptor * desc=&lp->m_pkt_indication.m_desc;
+ uint16_t flow_id = desc->getFlowId();
+ CPacketDescriptorPerDir * lpCurPacket = desc->GetDirInfo();
+ pkt_dir_t dir=desc->IsInitSide()?CLIENT_SIDE:SERVER_SIDE; // with respect to the first sub-flow in the template
+
+ iter = ft.find(flow_id);
+ assert( iter != ft.end() );
+ lpFlow=(*iter).second;
+
+ if ( (lpFlow->m_per_dir[0].m_pkt_id >0) &&
+ (lpFlow->m_per_dir[1].m_pkt_id >0) ) {
+ /* we have both dir */
+ lp->m_pkt_indication.m_desc.SetBiPluginEnable(true);
+ }
+
+
+ lpCurPacket->SetMaxPkts(lpFlow->m_per_dir[dir].m_pkt_id);
+ lp->m_pkt_indication.m_desc.SetMaxPktsPerFlow(lpFlow->m_max_pkts);
+ lp->m_pkt_indication.m_desc.SetMaxFlowTimeout(lpFlow->m_max_aging_sec);
+ }
+
+
+ /* in case of learn mode , we need to mark the first packet */
+ if ( CGlobalInfo::is_learn_mode() ) {
+ CFlowPktInfo * lp= GetPacket(0);
+ assert(lp);
+ /* only for bi directionl traffic mask the learn flag , only for the first packet */
+ if ( lp->m_pkt_indication.m_desc.IsBiDirectionalFlow() ){
+ lp->mask_as_learn();
+ }
+
+ if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_TCP_ACK)) {
+ // In this mode, we need to see the SYN+ACK as well.
+ lp = GetPacket(1);
+ assert(lp);
+ lp->m_pkt_indication.setTTL(TTL_RESERVE_DUPLICATE);
+ }
+ }
+
+ if ( ft.empty() )
+ return;
+
+ flow_tmp_map_iter_t it;
+ for (it= ft.begin(); it != ft.end(); ++it) {
+ CTmpFlowInfo *lp = it->second;
+ assert(lp);
+ delete lp;
+ }
+ ft.clear();
+}
+
+
+enum CCapFileFlowInfo::load_cap_file_err CCapFileFlowInfo::load_cap_file(std::string cap_file, uint16_t _id, uint8_t plugin_id) {
+ RemoveAll();
+
+ fprintf(stdout," -- loading cap file %s \n",cap_file.c_str());
+ CPacketParser parser;
+ CPacketIndication pkt_indication;
+ CCapReaderBase * lp=CCapReaderFactory::CreateReader((char *)cap_file.c_str(),0);
+
+ if (lp == 0) {
+ printf(" ERROR file %s does not exist or not supported \n",(char *)cap_file.c_str());
+ return kFileNotExist;
+ }
+ bool multi_flow_enable =( (plugin_id!=0)?true:false);
+
+
+ CFlowTableMap flow;
+
+ parser.Create();
+ flow.Create(0);
+ m_total_bytes=0;
+ m_total_flows=0;
+ m_total_errors=0;
+ CFlow * first_flow=0;
+ bool first_flow_fif_is_swap=false;
+ bool time_was_set=false;
+ double last_time=0.0;
+ CCapPktRaw raw_packet;
+ int cnt=0;
+ while ( true ) {
+ /* read packet */
+ if ( lp->ReadPacket(&raw_packet) ==false ){
+ break;
+ }
+ cnt++;
+
+ if ( !time_was_set ){
+ last_time=raw_packet.get_time();
+ time_was_set=true;
+ }else{
+ if (raw_packet.get_time()<last_time) {
+ fprintf(stderr, "Error: Non valid pcap file. Timestamp is negative at packet %d\n", cnt);
+ return kNegTimestamp;
+ }
+ last_time=raw_packet.get_time();
+ }
+
+ if ( parser.ProcessPacket(&pkt_indication, &raw_packet) ){
+
+ if ( pkt_indication.m_desc.IsValidPkt() ) {
+ pkt_indication.m_desc.SetPluginEnable(multi_flow_enable);
+ pkt_indication.m_desc.SetPluginId(plugin_id);
+
+ pkt_indication.m_desc.SetId(_id);
+ bool is_fif;
+ CFlow * lpflow=flow.process(pkt_indication.m_flow_key,is_fif);
+ m_total_bytes += pkt_indication.m_packet->pkt_len;
+ pkt_indication.m_cap_ipg = raw_packet.get_time();
+
+ pkt_indication.m_flow =lpflow;
+ pkt_indication.m_desc.SetFlowPktNum(lpflow->pkt_id);
+ /* inc pkt_id inside the flow */
+ lpflow->pkt_id++;
+
+ /* check that we don't have reserve TTL for duplication */
+ uint8_t ttl = pkt_indication.getTTL();
+ if ( (ttl == TTL_RESERVE_DUPLICATE) ||
+ (ttl == (TTL_RESERVE_DUPLICATE-1)) ) {
+ pkt_indication.setTTL(TTL_RESERVE_DUPLICATE-4);
+ }
+
+ // Validation for first packet in flow
+ if (is_fif) {
+ lpflow->flow_id = m_total_flows;
+ pkt_indication.m_desc.SetFlowId(lpflow->flow_id);
+
+ if (m_total_flows == 0) {
+ /* first flow */
+ first_flow =lpflow;/* save it for single flow support , to signal error */
+ lpflow->is_fif_swap =pkt_indication.m_desc.IsSwapTuple();
+ first_flow_fif_is_swap = pkt_indication.m_desc.IsSwapTuple();
+ pkt_indication.m_desc.SetInitSide(true);
+ Append(&pkt_indication);
+ m_total_flows++;
+ } else {
+ if ( multi_flow_enable ) {
+ lpflow->is_fif_swap = pkt_indication.m_desc.IsSwapTuple();
+ /* in respect to the first flow */
+ bool init_side_in_repect_to_first_flow =
+ ((first_flow_fif_is_swap?true:false) == lpflow->is_fif_swap)?true:false;
+ pkt_indication.m_desc.SetInitSide(init_side_in_repect_to_first_flow);
+ Append(&pkt_indication);
+ m_total_flows++;
+ } else {
+ printf("More than one flow in this cap. Ignoring it !! \n");
+ pkt_indication.m_flow_key.Dump(stderr);
+ m_total_errors++;
+ }
+ }
+ }else{ /* no FIF */
+ pkt_indication.m_desc.SetFlowId(lpflow->flow_id);
+
+ if ( multi_flow_enable ==false ){
+ if (lpflow == first_flow) {
+ // add to
+ bool init_side=
+ ((lpflow->is_fif_swap?true:false) == pkt_indication.m_desc.IsSwapTuple())?true:false;
+ pkt_indication.m_desc.SetInitSide( init_side );
+ Append(&pkt_indication);
+ }else{
+ //printf(" more than one flow in this cap ignot it !! \n");
+ m_total_errors++;
+ }
+ }else{
+ /* support multi-flow, */
+
+ /* work in respect to first flow */
+ bool init_side=
+ ((first_flow_fif_is_swap?true:false) == pkt_indication.m_desc.IsSwapTuple())?true:false;
+ pkt_indication.m_desc.SetInitSide( init_side );
+ Append(&pkt_indication);
+
+ }
+ }
+ }else{
+ fprintf(stderr, "ERROR packet %d is not supported, should be Ethernet/IP(0x0800)/(TCP|UDP) format try to convert it using Wireshark !\n",cnt);
+ return kPktNotSupp;
+ }
+ }else{
+ fprintf(stderr, "ERROR packet %d is not supported, should be Ethernet/IP(0x0800)/(TCP|UDP) format try to convert it using Wireshark !\n",cnt);
+ return kPktProcessFail;
+ }
+ }
+
+ /* set the last */
+ CFlowPktInfo * last_pkt =GetPacket((uint32_t)(Size()-1));
+ last_pkt->m_pkt_indication.m_desc.SetIsLastPkt(true);
+
+ int i;
+
+ for (i=1; i<Size(); i++) {
+ CFlowPktInfo * lp_prev= GetPacket((uint32_t)i-1);
+ CFlowPktInfo * lp= GetPacket((uint32_t)i);
+
+ lp_prev->m_pkt_indication.m_cap_ipg = lp->m_pkt_indication.m_cap_ipg-
+ lp_prev->m_pkt_indication.m_cap_ipg;
+ if ( lp->m_pkt_indication.m_desc.IsInitSide() !=
+ lp_prev->m_pkt_indication.m_desc.IsInitSide()) {
+ lp_prev->m_pkt_indication.m_desc.SetRtt(true);
+ }
+ }
+
+ GetPacket((uint32_t)Size()-1)->m_pkt_indication.m_cap_ipg=0.0;
+ m_total_errors += parser.m_counter.getTotalErrors();
+
+
+ /* dump the flow */
+ //Dump(stdout);
+
+ //flow.Dump(stdout);
+ flow.Delete();
+ //parser.Dump(stdout);
+ parser.Delete();
+ //fprintf(stdout," -- finish loading cap file \n");
+ //fprintf(stdout,"\n");
+ delete lp;
+ if ( m_total_errors > 0 ) {
+ parser.m_counter.Dump(stdout);
+ fprintf(stderr, " ERORR in one of the cap file, you should have one flow per cap file or valid plugin \n");
+ return kCapFileErr;
+ }
+ return kOK;
+}
+
+void CCapFileFlowInfo::update_pcap_mode(){
+ int i;
+ for (i=0; i<(int)Size(); i++) {
+ CFlowPktInfo * lp=GetPacket((uint32_t)i);
+ lp->m_pkt_indication.m_desc.SetPcapTiming(true);
+ }
+}
+
+void CCapFileFlowInfo::get_total_memory(CCCapFileMemoryUsage & memory){
+ memory.clear();
+ int i;
+ for (i=0; i<(int)Size(); i++) {
+ CFlowPktInfo * lp=GetPacket((uint32_t)i);
+ if ( lp->m_packet->pkt_len > FIRST_PKT_SIZE ) {
+ memory.add_size(lp->m_packet->pkt_len - FIRST_PKT_SIZE);
+ }
+ }
+}
+
+
+double CCapFileFlowInfo::get_cap_file_length_sec(){
+ dsec_t sum=0.0;
+ int i;
+ for (i=0; i<(int)Size(); i++) {
+ CFlowPktInfo * lp=GetPacket((uint32_t)i);
+ sum+=lp->m_pkt_indication.m_cap_ipg;
+ }
+ return (sum);
+}
+
+
+void CCapFileFlowInfo::update_min_ipg(dsec_t min_ipg,
+ dsec_t override_ipg){
+
+ int i;
+ for (i=0; i<(int)Size(); i++) {
+ CFlowPktInfo * lp=GetPacket((uint32_t)i);
+ if ( lp->m_pkt_indication.m_cap_ipg < min_ipg ){
+ lp->m_pkt_indication.m_cap_ipg=override_ipg;
+ }
+ if ( lp->m_pkt_indication.m_cap_ipg < override_ipg ){
+ lp->m_pkt_indication.m_cap_ipg=override_ipg;
+ }
+ }
+}
+
+
+void CCapFileFlowInfo::Dump(FILE *fd){
+
+
+ int i;
+ //CCapPacket::DumpHeader(fd);
+ for (i=0; i<(int)Size(); i++) {
+ fprintf(fd,"pkt_id : %d \n",i+1);
+ fprintf(fd,"-----------\n");
+ CFlowPktInfo * lp=GetPacket((uint32_t)i);
+ lp->Dump(fd);
+ }
+}
+
+// add pkt indication
+void CCapFileFlowInfo::Append(CPacketIndication * pkt_indication){
+
+ CFlowPktInfo * lp;
+ lp = new CFlowPktInfo();
+ lp->Create( pkt_indication );
+ m_flow_pkts.push_back(lp);
+}
+
+
+
+void CCCapFileMemoryUsage::Add(const CCCapFileMemoryUsage & obj){
+ int i;
+ for (i=0; i<CCCapFileMemoryUsage::MASK_SIZE; i++) {
+ m_buf[i] += obj.m_buf[i];
+ }
+ m_total_bytes +=obj.m_total_bytes;
+
+}
+
+
+void CCCapFileMemoryUsage::dump(FILE *fd){
+ fprintf(fd, " Memory usage \n");
+ int i;
+ int c_size=CCCapFileMemoryUsage::SIZE_MIN;
+ int c_total=0;
+
+ for (i=0; i<CCCapFileMemoryUsage::MASK_SIZE; i++) {
+ fprintf(fd," size_%-7d : %lu \n",c_size, (ulong)m_buf[i]);
+ c_total +=m_buf[i]*c_size;
+ c_size = c_size*2;
+ }
+ fprintf(fd," Total : %s %.0f%% util due to buckets \n",double_to_human_str(c_total,"bytes",KBYE_1024).c_str(),100.0*float(c_total)/float(m_total_bytes) );
+}
+
+
+bool CCapFileFlowInfo::Create(){
+ m_total_bytes=0;
+ m_total_errors = 0;
+ m_total_flows = 0;
+ return (true);
+}
+
+
+void CCapFileFlowInfo::dump_pkt_sizes(void){
+ int i;
+ for (i=0; i<(int)Size(); i++) {
+ flow_pkt_info_t lp=GetPacket((uint32_t)i);
+ CGenNode node;
+ node.m_dest_ip = 0x10000110;
+ node.m_src_ip = 0x20000110;
+ node.m_src_port = 12;
+ rte_mbuf_t * buf=lp->generate_new_mbuf(&node);
+ //rte_pktmbuf_dump(buf, buf->pkt_len);
+ rte_pktmbuf_free(buf);
+ }
+}
+
+void CCapFileFlowInfo::RemoveAll(){
+ int i;
+ m_total_bytes=0;
+ m_total_errors = 0;
+ m_total_flows = 0;
+ for (i=0; i<(int)Size(); i++) {
+ flow_pkt_info_t lp=GetPacket((uint32_t)i);
+ lp->Delete();
+ delete lp;
+ }
+ // free all the pointers
+ m_flow_pkts.clear();
+}
+
+void CCapFileFlowInfo::Delete(){
+ RemoveAll();
+}
+
+void operator >> (const YAML::Node& node, CFlowYamlDpPkt & fi) {
+ uint32_t val;
+ node["pkt_id"] >> val;
+ fi.m_pkt_id =(uint8_t)val;
+ node["pyld_offset"] >> val;
+ fi.m_pyld_offset =(uint8_t)val;
+ node["type"] >> val;
+ fi.m_type =(uint8_t)val;
+ node["len"] >> val;
+ fi.m_len =(uint8_t)val;
+ node["mask"] >> val;
+ fi.m_pkt_mask =val;
+}
+
+void operator >> (const YAML::Node& node, CVlanYamlInfo & fi) {
+
+ uint32_t tmp;
+ if ( node.FindValue("enable") ){
+ node["enable"] >> tmp ;
+ fi.m_enable=tmp;
+ node["vlan0"] >> tmp;
+ fi.m_vlan_per_port[0] = tmp;
+ node["vlan1"] >> tmp;
+ fi.m_vlan_per_port[1] = tmp;
+ }
+}
+
+
+
+void operator >> (const YAML::Node& node, CFlowYamlInfo & fi) {
+ node["name"] >> fi.m_name;
+
+ if ( node.FindValue("client_pool") ){
+ node["client_pool"] >> fi.m_client_pool_name;
+ }else{
+ fi.m_client_pool_name = "default";
+ }
+ if ( node.FindValue("server_pool") ){
+ node["server_pool"] >> fi.m_server_pool_name;
+ }else{
+ fi.m_server_pool_name = "default";
+ }
+
+ node["cps"] >> fi.m_k_cps;
+ fi.m_k_cps = fi.m_k_cps/1000.0;
+ double t;
+ node["ipg"] >> t;
+ fi.m_ipg_sec =t/1000000.0;
+ node["rtt"] >> t;
+ fi.m_rtt_sec = t/1000000.0;
+ node["w"] >> fi.m_w;
+
+ if ( node.FindValue("cap_ipg") ){
+ node["cap_ipg"] >> fi.m_cap_mode;
+ fi.m_cap_mode_was_set =true;
+ }else{
+ fi.m_cap_mode_was_set =false;
+ }
+
+ if ( node.FindValue("wlength") ){
+ node["wlength"] >> fi.m_wlength;
+ fi.m_wlength_set=true;
+ }else{
+ fi.m_wlength_set=false;
+ fi.m_wlength =500;
+ }
+
+ if ( node.FindValue("limit") ){
+ node["limit"] >> fi.m_limit;
+ fi.m_limit_was_set = true;
+ }else{
+ fi.m_limit_was_set = false;
+ fi.m_limit = 0;
+ }
+
+ if ( node.FindValue("plugin_id") ){
+ uint32_t plugin_val;
+ node["plugin_id"] >> plugin_val;
+ fi.m_plugin_id=plugin_val;
+ }else{
+ fi.m_plugin_id=0;
+ }
+
+
+ fi.m_one_app_server_was_set = false;
+ fi.m_one_app_server = false;
+ if ( utl_yaml_read_ip_addr(node,
+ "server_addr",
+ fi.m_server_addr) ){
+ try {
+ node["one_app_server"] >> fi.m_one_app_server;
+ fi.m_one_app_server_was_set=true;
+ } catch ( const std::exception& e ) {
+ fi.m_one_app_server_was_set = false;
+ fi.m_one_app_server = false;
+ }
+ }
+
+
+
+ if ( ( fi.m_limit_was_set ) && (fi.m_plugin_id !=0) ){
+ fprintf(stderr," limit can't be non zero when plugin is set, you must have only one of the options set");
+ exit(-1);
+ }
+
+
+ if ( node.FindValue("dyn_pyload") ){
+ const YAML::Node& dyn_pyload = node["dyn_pyload"];
+ for(unsigned i=0;i<dyn_pyload.size();i++) {
+ CFlowYamlDpPkt fd;
+ dyn_pyload[i] >> fd;
+ if ( fi.m_dpPkt == 0 ){
+ fi.m_dpPkt = new CFlowYamlDynamicPyloadPlugin();
+ if (fi.m_plugin_id == 0) {
+ fi.m_plugin_id = mpDYN_PYLOAD;
+ }else{
+ fprintf(stderr," plugin should be zero with dynamic pyload program");
+ exit(-1);
+ }
+ }
+ fi.m_dpPkt->Add(fd);
+ }
+ }else{
+ fi.m_dpPkt=0;
+ }
+}
+
+
+
+void operator >> (const YAML::Node& node, CFlowsYamlInfo & flows_info) {
+
+ node["duration"] >> flows_info.m_duration_sec;
+
+ if ( node.FindValue("generator") ) {
+ node["generator"] >> flows_info.m_tuple_gen;
+ flows_info.m_tuple_gen_was_set =true;
+ }else{
+ flows_info.m_tuple_gen_was_set =false;
+ }
+
+ // m_ipv6_set will be true if and only if both src_ipv6
+ // and dst_ipv6 are provided. These are used to set
+ // the most significant 96-bits of the IPv6 address; the
+ // least significant 32-bits come from the ipv4 address
+ // (what is set above).
+ //
+ // If the IPv6 src/dst is not provided in the yaml file,
+ // then the most significant 96-bits will be set to 0
+ // which represents an IPv4-compatible IPv6 address.
+ //
+ // If desired, an IPv4-mapped IPv6 address can be
+ // formed by providing src_ipv6,dst_ipv6 and specifying
+ // {0,0,0,0,0,0xffff}
+ flows_info.m_ipv6_set=true;
+
+ if ( node.FindValue("src_ipv6") ) {
+ const YAML::Node& src_ipv6_info = node["src_ipv6"];
+ if (src_ipv6_info.size() == 6 ){
+ for(unsigned i=0;i<src_ipv6_info.size();i++) {
+ uint32_t fi;
+ const YAML::Node & node =src_ipv6_info;
+ node[i] >> fi;
+ flows_info.m_src_ipv6.push_back(fi);
+ }
+ }
+ }else{
+ flows_info.m_ipv6_set=false;
+ }
+
+
+ if ( node.FindValue("dst_ipv6") ) {
+ const YAML::Node& dst_ipv6_info = node["dst_ipv6"];
+ if (dst_ipv6_info.size() == 6 ){
+ for(unsigned i=0;i<dst_ipv6_info.size();i++) {
+ uint32_t fi;
+ const YAML::Node & node =dst_ipv6_info;
+ node[i] >> fi;
+ flows_info.m_dst_ipv6.push_back(fi);
+ }
+ }
+ }else{
+ flows_info.m_ipv6_set=false;
+ }
+
+ if ( node.FindValue("cap_ipg") ) {
+ node["cap_ipg"] >> flows_info.m_cap_mode;
+ flows_info.m_cap_mode_set=true;
+ }else{
+ flows_info.m_cap_mode=false;
+ flows_info.m_cap_mode_set=false;
+ }
+
+ double t=0.0;
+
+ if ( node.FindValue("cap_ipg_min") ) {
+ node["cap_ipg_min"] >> t ;
+ flows_info.m_cap_ipg_min = t/1000000.0;
+ flows_info.m_cap_ipg_min_set=true;
+ }else{
+ flows_info.m_cap_ipg_min_set=false;
+ flows_info.m_cap_ipg_min = 20;
+ }
+
+ if ( node.FindValue("cap_override_ipg") ) {
+ node["cap_override_ipg"] >> t;
+ flows_info.m_cap_overide_ipg = t/1000000.0;
+ flows_info.m_cap_overide_ipg_set = true;
+ }else{
+ flows_info.m_cap_overide_ipg_set = false;
+ flows_info.m_cap_overide_ipg = 0;
+ }
+
+ if (node.FindValue("wlength")) {
+ node["wlength"] >> flows_info.m_wlength;
+ flows_info.m_wlength_set=true;
+ }else{
+ flows_info.m_wlength_set=false;
+ flows_info.m_wlength =100;
+ }
+
+ if (node.FindValue("one_app_server")) {
+ printf("one_app_server should be configured per template. \n"
+ "Will ignore this configuration\n");
+ }
+ flows_info.m_one_app_server =false;
+ flows_info.m_one_app_server_was_set=false;
+
+ if (node.FindValue("vlan")) {
+ node["vlan"] >> flows_info.m_vlan_info;
+ }
+
+ if (node.FindValue("mac_override_by_ip")) {
+ node["mac_override_by_ip"] >> flows_info.m_mac_replace_by_ip;
+ }else{
+ flows_info.m_mac_replace_by_ip =false;
+ }
+
+ const YAML::Node& mac_info = node["mac"];
+ for(unsigned i=0;i<mac_info.size();i++) {
+ uint32_t fi;
+ const YAML::Node & node =mac_info;
+ node[i] >> fi;
+ flows_info.m_mac_base.push_back(fi);
+ }
+
+ const YAML::Node& cap_info = node["cap_info"];
+ for(unsigned i=0;i<cap_info.size();i++) {
+ CFlowYamlInfo fi;
+ cap_info[i] >> fi;
+ fi.m_client_pool_idx =
+ flows_info.m_tuple_gen.get_client_pool_id(fi.m_client_pool_name);
+ fi.m_server_pool_idx =
+ flows_info.m_tuple_gen.get_server_pool_id(fi.m_server_pool_name);
+ flows_info.m_vec.push_back(fi);
+ }
+}
+
+void CVlanYamlInfo::Dump(FILE *fd){
+ fprintf(fd," vlan enable : %d \n",m_enable);
+ fprintf(fd," vlan val : %d ,%d \n",m_vlan_per_port[0],m_vlan_per_port[1]);
+}
+
+
+void CFlowsYamlInfo::Dump(FILE *fd){
+ fprintf(fd," duration : %f sec \n",m_duration_sec);
+
+ fprintf(fd,"\n");
+ if (CGlobalInfo::is_ipv6_enable()) {
+ int idx;
+ fprintf(fd," src_ipv6 : ");
+ for (idx=0; idx<5; idx++){
+ fprintf(fd,"%04x:", CGlobalInfo::m_options.m_src_ipv6[idx]);
+ }
+ fprintf(fd,"%04x\n", CGlobalInfo::m_options.m_src_ipv6[5]);
+ fprintf(fd," dst_ipv6 : ");
+ for (idx=0; idx<5; idx++){
+ fprintf(fd,"%04x:", CGlobalInfo::m_options.m_dst_ipv6[idx]);
+ }
+ fprintf(fd,"%04x\n", CGlobalInfo::m_options.m_dst_ipv6[5]);
+ }
+ if ( !m_cap_mode_set ) {
+ fprintf(fd," cap_ipg : wasn't set \n");
+ }else{
+ fprintf(fd," cap_ipg : %d \n",m_cap_mode?1:0);
+ }
+
+ if ( !m_cap_ipg_min_set ){
+ fprintf(fd," cap_ipg_min : wasn't set \n");
+ }else{
+ fprintf(fd," cap_ipg_min : %f \n",m_cap_ipg_min);
+ }
+
+ if ( !m_cap_overide_ipg_set ){
+ fprintf(fd," cap_override_ipg : wasn't set \n");
+ }else{
+ fprintf(fd," cap_override_ipg : %f \n",m_cap_overide_ipg);
+ }
+
+ if ( !m_wlength_set ){
+ fprintf(fd," wlength : wasn't set \n");
+ }else{
+ fprintf(fd," m_wlength : %d \n",m_wlength);
+ }
+ fprintf(fd," one_server_for_application : %d \n",m_one_app_server?1:0);
+ fprintf(fd," one_server_for_application_was_set : %d \n",m_one_app_server_was_set?1:0);
+
+ m_vlan_info.Dump(fd);
+
+ fprintf(fd," mac base : ");
+ int i;
+ for (i=0; i<(int)m_mac_base.size(); i++) {
+ if (i< (int)(m_mac_base.size()-1) ) {
+ fprintf(fd,"0x%02x,",m_mac_base[i]);
+ }else{
+ fprintf(fd,"0x%02x",m_mac_base[i]);
+ }
+ }
+ fprintf(fd,"\n");
+
+ fprintf(fd," cap file info \n");
+ fprintf(fd," ------------- \n");
+ for (i=0; i<(int)m_vec.size(); i++) {
+ m_vec[i].Dump(fd);
+ }
+}
+
+
+/*
+
+example for YAML file
+
+- duration : 10.0
+ cap_info :
+ - name: hey1.pcap
+ cps : 12.0
+ ipg : 0.0001
+ - name: hey2.pcap
+ cps : 11.0
+ ipg : 0.0001
+
+
+*/
+
+bool CFlowsYamlInfo::verify_correctness(uint32_t num_threads) {
+ if ( m_tuple_gen_was_set ==false ){
+ printf(" ERROR there must be a generator field in YAML , the old format is deprecated \n");
+ printf(" This is not supported : \n");
+ printf(" min_src_ip : 0x10000001 \n");
+ printf(" max_src_ip : 0x50000001 \n");
+ printf(" min_dst_ip : 0x60000001 \n");
+ printf(" max_dst_ip : 0x60000010 \n");
+ printf(" This is supported : \n");
+ printf("generator : \n");
+ printf(" distribution : \"seq\" \n");
+ printf(" clients_start : \"16.0.0.1\" \n");
+ printf(" clients_end : \"16.0.1.255\" \n");
+ printf(" servers_start : \"48.0.0.1\" \n");
+ printf(" servers_end : \"48.0.0.255\" \n");
+ printf(" clients_per_gb : 201 \n");
+ printf(" min_clients : 101 \n");
+ printf(" dual_port_mask : \"1.0.0.0\" \n");
+ printf(" tcp_aging : 1 \n");
+ printf(" udp_aging : 1 \n");
+ return(false);
+ }
+ if ( !m_tuple_gen.is_valid(num_threads,is_any_plugin_configured()) ){
+ return (false);
+ }
+ /* patch defect trex-54 */
+ if ( is_any_plugin_configured() ){
+ /*Plugin is configured. in that case due to a limitation ( defect trex-54 )
+ the number of servers should be bigger than number of clients */
+
+ int i;
+ for (i=0; i<(int)m_vec.size(); i++) {
+ CFlowYamlInfo * lp=&m_vec[i];
+ if ( lp->m_plugin_id ){
+ uint8_t c_idx = lp->m_client_pool_idx;
+ uint8_t s_idx = lp->m_server_pool_idx;
+ uint32_t total_clients = m_tuple_gen.m_client_pool[c_idx].getTotalIps();
+ uint32_t total_servers = m_tuple_gen.m_server_pool[s_idx].getTotalIps();
+ if ( total_servers < total_clients ){
+ printf(" Plugin is configured. in that case due to a limitation ( defect trex-54 ) \n");
+ printf(" the number of servers should be bigger than number of clients \n");
+ printf(" client_pool_name : %s \n", lp->m_client_pool_name.c_str());
+ printf(" server_pool_name : %s \n", lp->m_server_pool_name.c_str());
+ return (false);
+ }
+ uint32_t mul = total_servers / total_clients;
+ uint32_t new_server_num = mul * total_clients;
+ if ( new_server_num != total_servers ) {
+ printf(" Plugin is configured. in that case due to a limitation ( defect trex-54 ) \n");
+ printf(" the number of servers should be exact multiplication of the number of clients \n");
+ printf(" client_pool_name : %s clients %d \n", lp->m_client_pool_name.c_str(),total_clients);
+ printf(" server_pool_name : %s servers %d should be %d \n", lp->m_server_pool_name.c_str(),total_servers,new_server_num);
+ return (false);
+ }
+ }
+ }
+ }
+
+ return(true);
+}
+
+
+
+int CFlowsYamlInfo::load_from_yaml_file(std::string file_name){
+ m_vec.clear();
+
+ if ( !utl_is_file_exists (file_name) ){
+ printf(" ERROR file %s does not exist \n",file_name.c_str());
+ exit(-1);
+ }
+
+ try {
+ std::ifstream fin((char *)file_name.c_str());
+ YAML::Parser parser(fin);
+ YAML::Node doc;
+
+ parser.GetNextDocument(doc);
+ for(unsigned i=0;i<doc.size();i++) {
+ doc[i] >> *this;
+ break;
+ }
+ } catch ( const std::exception& e ) {
+ std::cout << e.what() << "\n";
+ exit(-1);
+ }
+
+ /* update from user input */
+ if (CGlobalInfo::m_options.m_duration > 0.1) {
+ m_duration_sec = CGlobalInfo::m_options.m_duration;
+ }
+ int i;
+ m_is_plugin_configured=false;
+ for (i=0; i<(int)m_vec.size(); i++) {
+ m_vec[i].m_k_cps =m_vec[i].m_k_cps*CGlobalInfo::m_options.m_factor;
+ if (( ! m_vec[i].m_cap_mode_was_set ) && (m_cap_mode_set ) ){
+ m_vec[i].m_cap_mode = m_cap_mode;
+ }
+ if (( ! m_vec[i].m_wlength_set ) && (m_wlength_set ) ){
+ m_vec[i].m_wlength = m_wlength;
+ }
+
+ if (( ! m_vec[i].m_one_app_server_was_set ) && (m_one_app_server_was_set ) ){
+ m_vec[i].m_one_app_server = m_one_app_server;
+ }
+
+ if ( m_cap_overide_ipg_set ){
+ m_vec[i].m_ipg_sec = m_cap_overide_ipg;
+ m_vec[i].m_rtt_sec = m_cap_overide_ipg;
+ }
+
+ if ( m_vec[i].m_plugin_id ){
+ m_is_plugin_configured=true;
+ }
+ }
+ return 0;
+}
+
+
+
+void CFlowStats::Clear(){
+
+ m_id=0;
+ m_name="";
+ m_pkt=0.0;
+ m_bytes=0.0;
+ m_cps=0.0;
+ m_mb_sec=0.0;
+ m_mB_sec=0.0;
+ m_c_flows=0.0;
+ m_pps =0.0;
+ m_total_Mbytes=00 ;
+ m_errors =0;
+ m_flows =0 ;
+ m_memory.clear();
+}
+
+void CFlowStats::Add(const CFlowStats & obj){
+
+ m_pkt += obj.m_pkt ;
+ m_bytes += obj.m_bytes ;
+ m_cps += obj.m_cps ;
+ m_mb_sec += obj.m_mb_sec ;
+ m_mB_sec += obj.m_mB_sec ;
+ m_c_flows += obj.m_c_flows ;
+ m_pps += obj.m_pps ;
+ m_total_Mbytes +=obj.m_total_Mbytes ;
+ m_errors +=obj.m_errors;
+ m_flows +=obj.m_flows ;
+
+ m_memory.Add(obj.m_memory);
+}
+
+
+void CFlowStats::DumpHeader(FILE *fd){
+ fprintf(fd," %2s,%-40s,%4s,%4s,%5s,%7s,%9s,%9s,%9s,%10s,%5s,%7s,%4s,%4s \n",
+ "id","name","tps","cps","f-pkts","f-bytes","duration","Mb/sec","MB/sec","c-flows","PPS","total-Mbytes-duration","errors","flows");
+}
+void CFlowStats::Dump(FILE *fd){
+ //"name","cps","f-pkts","f-bytes","Mb/sec","MB/sec","c-flows","PPS","total-Mbytes-duration","errors","flows"
+ fprintf(fd," %02d, %-40s ,%4.2f,%4.2f, %5.0f , %7.0f ,%7.2f ,%7.2f , %7.2f , %10.0f , %5.0f , %7.0f , %llu , %llu \n",
+ m_id,
+ m_name.c_str(),
+ m_cps,
+ get_normal_cps(),
+ m_pkt,
+ m_bytes,
+ duration_sec,
+ m_mb_sec,
+ m_mB_sec,
+ m_c_flows,
+ m_pps,
+ m_total_Mbytes,
+ (unsigned long long)m_errors,
+ (unsigned long long)m_flows);
+}
+
+bool CFlowGeneratorRecPerThread::Create(CTupleGeneratorSmart * global_gen,
+ CFlowYamlInfo * info,
+ CFlowsYamlInfo * yaml_flow_info,
+ CCapFileFlowInfo * flow_info,
+ uint16_t _id,
+ uint32_t thread_id){
+
+ BP_ASSERT(info);
+ m_thread_id =thread_id ;
+
+ tuple_gen.Create(global_gen, info->m_client_pool_idx,
+ info->m_server_pool_idx);
+ CTupleGenYamlInfo * lpt;
+ lpt = &yaml_flow_info->m_tuple_gen;
+
+ tuple_gen.SetSingleServer(info->m_one_app_server,
+ info->m_server_addr,
+ getDualPortId(thread_id),
+ lpt->m_client_pool[info->m_client_pool_idx].getDualMask()
+ );
+
+ tuple_gen.SetW(info->m_w);
+
+
+
+ m_id =_id;
+ m_info =info;
+ m_flows_info = yaml_flow_info;
+ // set policer give bucket size for bursts
+ m_policer.set_cir(info->m_k_cps*1000.0);
+ m_policer.set_level(0.0);
+ m_policer.set_bucket_size(100.0);
+ /* pointer to global */
+ m_flow_info = flow_info;
+ return (true);
+}
+
+
+void CFlowGeneratorRecPerThread::Delete(){
+ tuple_gen.Delete();
+}
+
+
+
+
+void CFlowGeneratorRecPerThread::Dump(FILE *fd){
+ fprintf(fd," configuration info ");
+ fprintf(fd," -----------------");
+ m_info->Dump(fd);
+ fprintf(fd," -----------------");
+ m_flow_info->Dump(fd);
+}
+
+
+void CFlowGeneratorRecPerThread::getFlowStats(CFlowStats * stats){
+
+ double t_pkt=(double)m_flow_info->Size();
+ double t_bytes=(double)m_flow_info->get_total_bytes();
+ double cps=m_info->m_k_cps *1000.0;
+ double mb_sec = (cps*t_bytes*8.0)/(_1Mb_DOUBLE);
+ double mB_sec = (cps*t_bytes)/(_1Mb_DOUBLE);
+
+ double c_flow_windows_sec=0.0;
+
+ if (m_info->m_cap_mode) {
+ c_flow_windows_sec = m_flow_info->get_cap_file_length_sec();
+ }else{
+ c_flow_windows_sec = t_pkt * m_info->m_ipg_sec;
+ }
+
+
+ double c_flows = cps*c_flow_windows_sec*m_flow_info->get_total_flows();
+ double pps =cps*t_pkt;
+ double total_Mbytes = mB_sec * m_flows_info->m_duration_sec;
+ uint64_t errors = m_flow_info->get_total_errors();
+ uint64_t flows = m_flow_info->get_total_flows();
+
+
+ stats->m_id = m_id;
+ stats->m_pkt = t_pkt;
+ stats->m_bytes = t_bytes;
+ stats->duration_sec = c_flow_windows_sec;
+ stats->m_name = m_info->m_name.c_str();
+ stats->m_cps = cps;
+ stats->m_mb_sec = mb_sec;
+ stats->m_mB_sec = mB_sec;
+ stats->m_c_flows = c_flows;
+ stats->m_pps = pps;
+ stats->m_total_Mbytes = total_Mbytes;
+ stats->m_errors = errors;
+ stats->m_flows = flows;
+}
+
+
+
+void CFlowGeneratorRec::Dump(FILE *fd){
+ fprintf(fd," configuration info ");
+ fprintf(fd," -----------------");
+ m_info->Dump(fd);
+ fprintf(fd," -----------------");
+ m_flow_info.Dump(fd);
+}
+
+
+void CFlowGeneratorRec::getFlowStats(CFlowStats * stats){
+
+ double t_pkt=(double)m_flow_info.Size();
+ double t_bytes=(double)m_flow_info.get_total_bytes();
+ double cps=m_info->m_k_cps *1000.0;
+ double mb_sec = (cps*t_bytes*8.0)/(_1Mb_DOUBLE);
+ double mB_sec = (cps*t_bytes)/(_1Mb_DOUBLE);
+
+ double c_flow_windows_sec=0.0;
+
+ if (m_info->m_cap_mode) {
+ c_flow_windows_sec = m_flow_info.get_cap_file_length_sec();
+ }else{
+ c_flow_windows_sec = t_pkt * m_info->m_ipg_sec;
+ }
+
+ m_flow_info.get_total_memory(stats->m_memory);
+
+
+ double c_flows = cps*c_flow_windows_sec;
+ double pps =cps*t_pkt;
+ double total_Mbytes = mB_sec * m_flows_info->m_duration_sec;
+ uint64_t errors = m_flow_info.get_total_errors();
+ uint64_t flows = m_flow_info.get_total_flows();
+
+
+ stats->m_id = m_id;
+ stats->m_pkt = t_pkt;
+ stats->m_bytes = t_bytes;
+ stats->duration_sec = c_flow_windows_sec;
+ stats->m_name = m_info->m_name.c_str();
+ stats->m_cps = cps;
+ stats->m_mb_sec = mb_sec;
+ stats->m_mB_sec = mB_sec;
+ stats->m_c_flows = c_flows;
+ stats->m_pps = pps;
+ stats->m_total_Mbytes = total_Mbytes;
+ stats->m_errors = errors;
+ stats->m_flows = flows;
+}
+
+
+void CFlowGeneratorRec::fixup_ipg_if_needed(void){
+ if ( m_flows_info->m_cap_mode ) {
+ m_flow_info.update_pcap_mode();
+ }
+
+ if ( (m_flows_info->m_cap_mode) &&
+ (m_flows_info->m_cap_ipg_min_set) &&
+ (m_flows_info->m_cap_overide_ipg_set)
+ ){
+ m_flow_info.update_min_ipg(m_flows_info->m_cap_ipg_min,
+ m_flows_info->m_cap_overide_ipg);
+ }
+}
+
+
+bool CFlowGeneratorRec::Create(CFlowYamlInfo * info,
+ CFlowsYamlInfo * flows_info,
+ uint16_t _id){
+ BP_ASSERT(info);
+ m_id=_id;
+ m_info=info;
+ m_flows_info=flows_info;
+ m_flow_info.Create();
+
+ // set policer give bucket size for bursts
+ m_policer.set_cir(info->m_k_cps*1000.0);
+ m_policer.set_level(0.0);
+ m_policer.set_bucket_size(100.0);
+
+ int res=m_flow_info.load_cap_file(info->m_name.c_str(),_id,m_info->m_plugin_id);
+ if ( res==0 ) {
+ fixup_ipg_if_needed();
+
+ if (m_flow_info.is_valid_template_load_time() != 0) {
+ return (false);
+ }
+ m_flow_info.update_info();
+ return (true);
+ }else{
+ return (false);
+ }
+}
+
+void CFlowGeneratorRec::Delete(){
+ m_flow_info.Delete();
+}
+
+
+void CGenNode::DumpHeader(FILE *fd){
+ fprintf(fd," pkt_id,time,fid,pkt_info,pkt,len,type,is_init,is_last,type,thread_id,src_ip,dest_ip,src_port \n");
+}
+
+
+void CGenNode::free_gen_node(){
+ rte_mbuf_t * m=get_cache_mbuf();
+ if ( unlikely(m != NULL) ) {
+ rte_pktmbuf_free(m);
+ m_plugin_info=0;
+ }
+}
+
+
+void CGenNode::Dump(FILE *fd){
+ fprintf(fd,"%.6f,%llx,%p,%llu,%d,%d,%d,%d,%d,%d,%x,%x,%d\n",
+ m_time,
+ (unsigned long long)m_flow_id,
+ m_pkt_info,
+ (unsigned long long)m_pkt_info->m_pkt_indication.m_packet->pkt_cnt,
+ m_pkt_info->m_pkt_indication.m_packet->pkt_len,
+ m_pkt_info->m_pkt_indication.m_desc.getId(),
+ (m_pkt_info->m_pkt_indication.m_desc.IsInitSide()?1:0),
+ m_pkt_info->m_pkt_indication.m_desc.IsLastPkt(),
+ m_type,
+ m_thread_id,
+ m_src_ip,
+ m_dest_ip,
+ m_src_port);
+
+}
+
+void CNodeGenerator::set_vif(CVirtualIF * v_if){
+ m_v_if = v_if;
+}
+
+bool CNodeGenerator::Create(CFlowGenListPerThread * parent){
+ m_v_if =0;
+ m_parent=parent;
+ m_socket_id =0;
+ m_realtime_his.Create();
+ m_last_sync_time_sec = 0;
+
+ return(true);
+}
+
+void CNodeGenerator::Delete(){
+ m_realtime_his.Delete();
+}
+
+
+void CNodeGenerator::add_node(CGenNode * mynode){
+ m_p_queue.push(mynode);
+}
+
+
+
+void CNodeGenerator::remove_all(CFlowGenListPerThread * thread){
+ CGenNode *node;
+ while (!m_p_queue.empty()) {
+ node = m_p_queue.top();
+ m_p_queue.pop();
+ /* sanity check */
+ if (node->m_type == CGenNode::STATELESS_PKT) {
+ CGenNodeStateless * p=(CGenNodeStateless *)node;
+ /* need to be changed in Pause support */
+ assert(p->is_mask_for_free());
+ }
+
+ thread->free_node( node);
+ }
+}
+
+int CNodeGenerator::open_file(std::string file_name,
+ CPreviewMode * preview_mode){
+ BP_ASSERT(m_v_if);
+ m_preview_mode =*preview_mode;
+ /* ser preview mode */
+ m_v_if->set_review_mode(preview_mode);
+ m_v_if->open_file(file_name);
+ m_cnt = 0;
+ m_non_active = 0;
+ m_limit = 0;
+ return (0);
+}
+
+
+int CNodeGenerator::close_file(CFlowGenListPerThread * thread){
+ remove_all(thread);
+ BP_ASSERT(m_v_if);
+ m_v_if->close_file();
+ return (0);
+}
+
+int CNodeGenerator::update_stl_stats(CGenNodeStateless *node_sl){
+ m_cnt++;
+ if (!node_sl->is_node_active()) {
+ m_non_active++;
+ }
+ #ifdef _DEBUG
+ if ( m_preview_mode.getVMode() >2 ){
+ fprintf(stdout," %4lu ,", (ulong)m_cnt);
+ fprintf(stdout," %4lu ,", (ulong)m_non_active);
+ node_sl->Dump(stdout);
+ }
+ #endif
+
+ return (0);
+}
+
+
+int CNodeGenerator::update_stats(CGenNode * node){
+ if ( m_preview_mode.getVMode() >2 ){
+ fprintf(stdout," %llu ,", (unsigned long long)m_cnt);
+ node->Dump(stdout);
+ m_cnt++;
+ }
+ return (0);
+}
+
+bool CNodeGenerator::has_limit_reached() {
+ /* do we have a limit and has it passed ? */
+ return ( (m_limit > 0) && (m_cnt >= m_limit) );
+}
+
+bool CFlowGenListPerThread::Create(uint32_t thread_id,
+ uint32_t core_id,
+ CFlowGenList * flow_list,
+ uint32_t max_threads){
+
+
+ m_non_active_nodes = 0;
+ m_terminated_by_master=false;
+ m_flow_list =flow_list;
+ m_core_id= core_id;
+ m_tcp_dpc= 0;
+ m_udp_dpc=0;
+ m_max_threads=max_threads;
+ m_thread_id=thread_id;
+
+ m_cpu_cp_u.Create(&m_cpu_dp_u);
+
+ uint32_t socket_id=rte_lcore_to_socket_id(m_core_id);
+
+ char name[100];
+ sprintf(name,"nodes-%d",m_core_id);
+
+ //printf(" create thread %d %s socket: %d \n",m_core_id,name,socket_id);
+
+ m_node_pool = utl_rte_mempool_create_non_pkt(name,
+ CGlobalInfo::m_memory_cfg.get_each_core_dp_flows(),
+ sizeof(CGenNode),
+ 128,
+ 0 ,
+ socket_id);
+
+ //printf(" pool %p \n",m_node_pool);
+
+ m_node_gen.Create(this);
+ m_flow_id_to_node_lookup.Create();
+
+ /* split the clients to threads */
+ CTupleGenYamlInfo * tuple_gen = &m_flow_list->m_yaml_info.m_tuple_gen;
+
+ m_smart_gen.Create(0,m_thread_id);
+
+ /* split the clients to threads using the mask */
+ CIpPortion portion;
+ for (int i=0;i<tuple_gen->m_client_pool.size();i++) {
+ split_ips(m_thread_id, m_max_threads, getDualPortId(),
+ tuple_gen->m_client_pool[i],
+ portion);
+
+ m_smart_gen.add_client_pool(tuple_gen->m_client_pool[i].m_dist,
+ portion.m_ip_start,
+ portion.m_ip_end,
+ get_longest_flow(i,true),
+ get_total_kcps(i,true)*1000,
+ m_flow_list->m_client_config_info,
+ tuple_gen->m_client_pool[i].m_tcp_aging_sec,
+ tuple_gen->m_client_pool[i].m_udp_aging_sec
+ );
+ }
+ for (int i=0;i<tuple_gen->m_server_pool.size();i++) {
+ split_ips(m_thread_id, m_max_threads, getDualPortId(),
+ tuple_gen->m_server_pool[i],
+ portion);
+ m_smart_gen.add_server_pool(tuple_gen->m_server_pool[i].m_dist,
+ portion.m_ip_start,
+ portion.m_ip_end,
+ get_longest_flow(i,false),
+ get_total_kcps(i,false)*1000,
+ tuple_gen->m_server_pool[i].m_is_bundling);
+ }
+
+
+ init_from_global(portion);
+
+ CMessagingManager * rx_dp=CMsgIns::Ins()->getRxDp();
+
+ m_ring_from_rx = rx_dp->getRingCpToDp(thread_id);
+ m_ring_to_rx =rx_dp->getRingDpToCp(thread_id);
+
+ assert(m_ring_from_rx);
+ assert(m_ring_to_rx);
+
+ /* create the info required for stateless DP core */
+ m_stateless_dp_info.create(thread_id, this);
+
+ return (true);
+}
+
+/* return the client ip , port */
+FORCE_NO_INLINE void CFlowGenListPerThread::handler_defer_job(CGenNode *p){
+ CGenNodeDeferPort * defer=(CGenNodeDeferPort *)p;
+ int i;
+ for (i=0; i<defer->m_cnt; i++) {
+ m_smart_gen.FreePort(defer->m_pool_idx[i],
+ defer->m_clients[i],defer->m_ports[i]);
+ }
+}
+
+FORCE_NO_INLINE void CFlowGenListPerThread::handler_defer_job_flush(void){
+ /* flush the pending job of free ports */
+ if (m_tcp_dpc) {
+ handler_defer_job((CGenNode *)m_tcp_dpc);
+ free_node((CGenNode *)m_tcp_dpc);
+ m_tcp_dpc=0;
+ }
+ if (m_udp_dpc) {
+ handler_defer_job((CGenNode *)m_udp_dpc);
+ free_node((CGenNode *)m_udp_dpc);
+ m_udp_dpc=0;
+ }
+}
+
+
+void CFlowGenListPerThread::defer_client_port_free(bool is_tcp,
+ uint32_t c_idx,
+ uint16_t port,
+ uint8_t c_pool_idx,
+ CTupleGeneratorSmart * gen){
+ /* free is not required in this case */
+ if (!gen->IsFreePortRequired(c_pool_idx) ){
+ return;
+ }
+ CGenNodeDeferPort * defer;
+ if (is_tcp) {
+ if (gen->get_tcp_aging(c_pool_idx)==0) {
+ gen->FreePort(c_pool_idx,c_idx,port);
+ return;
+ }
+ defer=get_tcp_defer();
+ }else{
+ if (gen->get_udp_aging(c_pool_idx)==0) {
+ gen->FreePort(c_pool_idx, c_idx,port);
+ return;
+ }
+ defer=get_udp_defer();
+ }
+ if ( defer->add_client(c_pool_idx, c_idx,port) ){
+ if (is_tcp) {
+ m_node_gen.schedule_node((CGenNode *)defer,gen->get_tcp_aging(c_pool_idx));
+ m_tcp_dpc=0;
+ }else{
+ m_node_gen.schedule_node((CGenNode *)defer,gen->get_udp_aging(c_pool_idx));
+ m_udp_dpc=0;
+ }
+ }
+}
+
+
+void CFlowGenListPerThread::defer_client_port_free(CGenNode *p){
+ defer_client_port_free(p->m_pkt_info->m_pkt_indication.m_desc.IsTcp(),
+ p->m_src_idx,p->m_src_port,p->m_template_info->m_client_pool_idx,
+ p->m_tuple_gen);
+}
+
+
+
+/* copy all info from global and div by num of threads */
+void CFlowGenListPerThread::init_from_global(CIpPortion& portion){
+ /* copy generator , it is the same */
+ m_yaml_info =m_flow_list->m_yaml_info;
+
+ /* copy first the flow info */
+ int i;
+ for (i=0; i<(int)m_flow_list->m_cap_gen.size(); i++) {
+ CFlowGeneratorRec * lp=m_flow_list->m_cap_gen[i];
+ CFlowGeneratorRecPerThread * lp_thread=new CFlowGeneratorRecPerThread();
+ /* TBD leak of memory */
+ CFlowYamlInfo * yaml_info =new CFlowYamlInfo();
+
+ yaml_info->m_name = lp->m_info->m_name;
+ yaml_info->m_k_cps = lp->m_info->m_k_cps/(double)m_max_threads;
+ yaml_info->m_ipg_sec = lp->m_info->m_ipg_sec;
+ yaml_info->m_rtt_sec = lp->m_info->m_rtt_sec;
+ yaml_info->m_w = lp->m_info->m_w;
+ yaml_info->m_cap_mode =lp->m_info->m_cap_mode;
+ yaml_info->m_wlength =lp->m_info->m_wlength;
+ yaml_info->m_plugin_id = lp->m_info->m_plugin_id;
+ yaml_info->m_one_app_server = lp->m_info->m_one_app_server;
+ yaml_info->m_server_addr = lp->m_info->m_server_addr;
+ yaml_info->m_dpPkt =lp->m_info->m_dpPkt;
+ yaml_info->m_server_pool_idx=lp->m_info->m_server_pool_idx;
+ yaml_info->m_client_pool_idx=lp->m_info->m_client_pool_idx;
+ yaml_info->m_server_pool_name=lp->m_info->m_server_pool_name;
+ yaml_info->m_client_pool_name=lp->m_info->m_client_pool_name;
+ /* fix this */
+ assert(m_max_threads>0);
+ if ( m_max_threads == 1 ) {
+ /* we have one thread the limit */
+ yaml_info->m_limit = lp->m_info->m_limit;
+ }else{
+ yaml_info->m_limit = lp->m_info->m_limit/m_max_threads;
+ /* thread is zero base */
+ if ( m_thread_id == 0){
+ yaml_info->m_limit += lp->m_info->m_limit % m_max_threads;
+ }
+ if (yaml_info->m_limit==0) {
+ yaml_info->m_limit=1;
+ }
+ }
+
+ yaml_info->m_limit_was_set = lp->m_info->m_limit_was_set;
+ yaml_info->m_flowcnt = 0;
+ yaml_info->m_restart_time = ( yaml_info->m_limit_was_set ) ?
+ (yaml_info->m_limit / (yaml_info->m_k_cps * 1000.0)) : 0;
+
+ lp_thread->Create(&m_smart_gen,
+ yaml_info,
+ lp->m_flows_info,
+ &lp->m_flow_info,
+ lp->m_id,
+ m_thread_id);
+
+ m_cap_gen.push_back(lp_thread);
+ }
+}
+
+static void free_map_flow_id_to_node(CGenNode *p){
+ CGlobalInfo::free_node(p);
+}
+
+
+void CFlowGenListPerThread::Delete(){
+
+ // free all current maps
+ m_flow_id_to_node_lookup.remove_all(free_map_flow_id_to_node);
+ // free object
+ m_flow_id_to_node_lookup.Delete();
+
+ m_smart_gen.Delete();
+ m_node_gen.Delete();
+ Clean();
+ m_cpu_cp_u.Delete();
+
+ utl_rte_mempool_delete(m_node_pool);
+}
+
+
+
+void CFlowGenListPerThread::Clean(){
+ int i;
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
+ if (lp->m_tuple_gen_was_set) {
+ CTupleGeneratorSmart *gen;
+ gen = lp->tuple_gen.get_gen();
+ gen->Delete();
+ delete gen;
+ }
+ lp->Delete();
+ delete lp;
+ }
+ m_cap_gen.clear();
+}
+
+//uint64_t _start_time;
+
+void CNodeGenerator::dump_json(std::string & json){
+
+ json="{\"name\":\"tx-gen\",\"type\":0,\"data\":{";
+ m_realtime_his.dump_json("realtime-hist",json);
+ json+="\"unknown\":0}}" ;
+}
+
+void CNodeGenerator::add_exit_node(CFlowGenListPerThread * thread,
+ dsec_t max_time){
+
+ if ( max_time > 0 ) {
+ CGenNode *exit_node = thread->create_node();
+ exit_node->m_type = CGenNode::EXIT_SCHED;
+ exit_node->m_time = max_time;
+ add_node(exit_node);
+ }
+}
+
+inline bool CNodeGenerator::handle_stl_node(CGenNode * node,
+ CFlowGenListPerThread * thread){
+ uint8_t type=node->m_type;
+
+ if ( likely( type == CGenNode::STATELESS_PKT ) ) {
+ m_p_queue.pop();
+ CGenNodeStateless *node_sl = (CGenNodeStateless *)node;
+ /* if the stream has been deactivated - end */
+ if ( unlikely( node_sl->is_mask_for_free() ) ) {
+ thread->free_node(node);
+ } else {
+ /* count before handle - node might be destroyed */
+ #ifdef TREX_SIM
+ update_stl_stats(node_sl);
+ #endif
+
+ node_sl->handle(thread);
+
+ #ifdef TREX_SIM
+ if (has_limit_reached()) {
+ thread->m_stateless_dp_info.stop_traffic(node_sl->get_port_id(), false, 0);
+ }
+ #endif
+ }
+ return (true);
+ }
+ return(false);
+}
+
+
+inline bool CNodeGenerator::do_work_stl(CGenNode * node,
+ CFlowGenListPerThread * thread,
+ bool always){
+
+ if ( handle_stl_node(node,thread)){
+ return (false);
+ }else{
+ return (handle_slow_messages(node->m_type,node,thread,always));
+ }
+}
+
+inline bool CNodeGenerator::do_work_both(CGenNode * node,
+ CFlowGenListPerThread * thread,
+ dsec_t d_time,
+ bool always
+ ){
+
+ bool exit_scheduler=false;
+ uint8_t type=node->m_type;
+ bool done;
+
+ if ( handle_stl_node (node,thread) ){
+ }else{
+ if ( likely( type == CGenNode::FLOW_PKT ) ) {
+ /* PKT */
+ if ( !(node->is_repeat_flow()) || (always==false)) {
+ flush_one_node_to_file(node);
+ #ifdef _DEBUG
+ update_stats(node);
+ #endif
+ }
+ m_p_queue.pop();
+ if ( node->is_last_in_flow() ) {
+ if ((node->is_repeat_flow()) && (always==false)) {
+ /* Flow is repeated, reschedule it */
+ thread->reschedule_flow( node);
+ }else{
+ /* Flow will not be repeated, so free node */
+ thread->free_last_flow_node( node);
+ }
+ }else{
+ node->update_next_pkt_in_flow();
+ m_p_queue.push(node);
+ }
+ }else{
+ if ((type == CGenNode::FLOW_FIF)) {
+ /* callback to our method */
+ m_p_queue.pop();
+ if ( always == false) {
+ thread->m_cur_time_sec = node->m_time ;
+
+ thread->generate_flows_roundrobin(&done);
+
+ if (!done) {
+ node->m_time +=d_time;
+ m_p_queue.push(node);
+ }else{
+ thread->free_node(node);
+ }
+ }else{
+ thread->free_node(node);
+ }
+
+ }else{
+ exit_scheduler = handle_slow_messages(type,node,thread,always);
+ }
+ }
+ }
+
+ return (exit_scheduler);
+}
+
+
+
+template<int SCH_MODE>
+inline bool CNodeGenerator::do_work(CGenNode * node,
+ CFlowGenListPerThread * thread,
+ dsec_t d_time,
+ bool always
+ ){
+ /* template filter in compile time */
+ if ( SCH_MODE == smSTATELESS ) {
+ return ( do_work_stl(node,thread,always) );
+ }else{
+ /* smSTATEFUL */
+ return ( do_work_both(node,thread,d_time,always) );
+ }
+}
+
+
+inline void CNodeGenerator::do_sleep(dsec_t & cur_time,
+ CFlowGenListPerThread * thread,
+ dsec_t n_time){
+ thread->m_cpu_dp_u.commit1();
+ dsec_t dt;
+
+ /* TBD make this better using calculation, minimum now_sec() */
+ while ( true ) {
+ cur_time = now_sec();
+ dt = cur_time - n_time ;
+
+ if (dt> WAIT_WINDOW_SIZE ) {
+ break;
+ }
+
+ rte_pause();
+ }
+
+ thread->m_cpu_dp_u.start_work1();
+}
+
+
+inline int CNodeGenerator::teardown(CFlowGenListPerThread * thread,
+ bool always,
+ double &old_offset,
+ double offset){
+
+ thread->m_cpu_dp_u.commit1();
+
+ /* to do */
+ if ( thread->is_terminated_by_master() ) {
+ return (0);
+ }
+
+ if (!always) {
+ old_offset =offset;
+ }else{
+ // free the left other
+ thread->handler_defer_job_flush();
+ }
+ return (0);
+}
+
+
+
+template<int SCH_MODE>
+inline int CNodeGenerator::flush_file_realtime(dsec_t max_time,
+ dsec_t d_time,
+ bool always,
+ CFlowGenListPerThread * thread,
+ double &old_offset) {
+ CGenNode * node;
+ dsec_t offset=0.0;
+ dsec_t cur_time;
+ dsec_t n_time;
+ if (always) {
+ offset=old_offset;
+ }else{
+ add_exit_node(thread,max_time);
+ }
+
+ thread->m_cpu_dp_u.start_work1();
+
+ sch_state_t state = scINIT;
+ node = m_p_queue.top();
+ n_time = node->m_time + offset;
+ cur_time = now_sec();
+
+ while (state!=scTERMINATE) {
+
+ switch (state) {
+ case scINIT:
+ cur_time = now_sec();
+ {
+ dsec_t dt = cur_time - n_time ;
+
+ if (dt > BURST_OFFSET_DTIME) {
+ state = scSTRECH;
+ } else if (dt > 0) {
+ state = scWORK;
+ } else {
+ state = scWAIT;
+ }
+
+ }
+ break;
+
+ case scWORK:
+ {
+ int node_count = 0;
+ do {
+
+ bool s=do_work<SCH_MODE>(node,thread,d_time,always);
+ if (s) { // can we remove this IF ?
+ state=scTERMINATE;
+ break;
+ }
+ node = m_p_queue.top();
+ n_time = node->m_time + offset;
+ node_count++;
+
+ /* we either out of the time frame or every 1024 nodes we get out for time checking */
+ if ( ( (n_time - cur_time) > EAT_WINDOW_DTIME ) || (node_count > 1024) ) {
+ state = scINIT;
+ break;
+ }
+
+ } while (true);
+ break;
+ }
+
+ case scWAIT:
+ do_sleep(cur_time,thread,n_time); // estimate loop
+ state=scWORK;
+ break;
+
+
+ default:
+ handle_slow_operations(state, node, cur_time, n_time, offset, thread);
+ break;
+ } /* switch */
+
+ }/* while*/
+
+ return (teardown(thread,always,old_offset,offset));
+}
+
+
+FORCE_NO_INLINE void CNodeGenerator::handle_slow_operations(sch_state_t &state,
+ CGenNode * &node,
+ dsec_t &cur_time,
+ dsec_t &n_time,
+ dsec_t &offset,
+ CFlowGenListPerThread *thread) {
+ switch (state) {
+ case scSTRECH:
+ {
+ handle_time_strech(node, cur_time, n_time, offset, thread);
+
+ /* go back to work */
+ state = scWORK;
+
+ }
+ break;
+
+ default:
+ assert(0);
+ }
+
+}
+
+/**
+ * when time is streched - the flow_sync node
+ * might be postpond too much
+ * this can result a watchdog crash and lack
+ * of responsivness from the DP core
+ * (no handling of messages)
+ *
+ * @author imarom (7/31/2016)
+ *
+ */
+void CNodeGenerator::handle_time_strech(CGenNode * &node,
+ dsec_t &cur_time,
+ dsec_t &n_time,
+ dsec_t &offset,
+ CFlowGenListPerThread *thread) {
+
+
+ /* fix the time offset */
+ dsec_t dt = cur_time - n_time;
+ offset += dt;
+
+ /* check if flow sync message was delayed too much */
+ if ( (cur_time - m_last_sync_time_sec) > SYNC_TIME_OUT ) {
+ handle_maintenance(thread);
+
+ /* re-read the top of the queue - it might have changed with messaging */
+ node = m_p_queue.top();
+ n_time = node->m_time + offset;
+ }
+
+}
+
+int CNodeGenerator::flush_file_sim(dsec_t max_time,
+ dsec_t d_time,
+ bool always,
+ CFlowGenListPerThread * thread,
+ double &old_offset){
+ CGenNode * node;
+
+ if (!always) {
+ add_exit_node(thread,max_time);
+ }
+
+ while (true) {
+ node = m_p_queue.top();
+
+ bool do_exit;
+ if ( get_is_stateless() ) {
+ do_exit=do_work<smSTATELESS>(node,thread,d_time,always);
+ }else{
+ do_exit=do_work<smSTATEFUL>(node,thread,d_time,always);
+ }
+ if ( do_exit ){
+ break;
+ }
+ }
+ return (teardown(thread,always,old_offset,0));
+}
+
+int CNodeGenerator::flush_file(dsec_t max_time,
+ dsec_t d_time,
+ bool always,
+ CFlowGenListPerThread * thread,
+ double &old_offset){
+ #ifdef TREX_SIM
+ return ( flush_file_sim(max_time, d_time,always,thread,old_offset) );
+ #else
+ if ( get_is_stateless() ) {
+ return ( flush_file_realtime<smSTATELESS>(max_time, d_time,always,thread,old_offset) );
+ }else{
+ return ( flush_file_realtime<smSTATEFUL>(max_time, d_time,always,thread,old_offset) );
+ }
+
+ #endif
+}
+
+
+
+void CNodeGenerator::handle_flow_pkt(CGenNode *node, CFlowGenListPerThread *thread) {
+
+ /*repeat and NAT is not supported together */
+ if ( node->is_nat_first_state() ) {
+ node->set_nat_wait_state();
+ flush_one_node_to_file(node);
+ #ifdef _DEBUG
+ update_stats(node);
+ #endif
+ } else {
+ if ( node->is_nat_wait_state() ) {
+ if (node->is_responder_pkt()) {
+ m_p_queue.pop();
+ /* time out, need to free the flow and remove the association , we didn't get conversion yet*/
+ thread->terminate_nat_flows(node);
+ return;
+
+ } else {
+ flush_one_node_to_file(node);
+ #ifdef _DEBUG
+ update_stats(node);
+ #endif
+ }
+ } else {
+ if ( node->is_nat_wait_ack_state() ) {
+ if (node->is_initiator_pkt()) {
+ m_p_queue.pop();
+ /* time out, need to free the flow and remove the association , we didn't get conversion yet*/
+ thread->terminate_nat_flows(node);
+ return;
+
+ } else {
+ flush_one_node_to_file(node);
+#ifdef _DEBUG
+ update_stats(node);
+#endif
+ }
+ } else {
+ assert(0);
+ }
+ }
+ }
+ m_p_queue.pop();
+ if ( node->is_last_in_flow() ) {
+ thread->free_last_flow_node( node);
+ } else {
+ node->update_next_pkt_in_flow();
+ m_p_queue.push(node);
+ }
+}
+
+void CNodeGenerator::handle_flow_sync(CGenNode *node, CFlowGenListPerThread *thread, bool &exit_scheduler) {
+
+
+ /* flow sync message is a sync point for time */
+ thread->m_cur_time_sec = node->m_time;
+
+ /* first pop the node */
+ m_p_queue.pop();
+
+ /* call all the maintenance required */
+ handle_maintenance(thread);
+
+ /* exit in case this is the last node*/
+ if ( m_p_queue.size() == m_parent->m_non_active_nodes ) {
+ thread->free_node(node);
+ exit_scheduler = true;
+ } else {
+ /* schedule for next maintenace */
+ node->m_time += SYNC_TIME_OUT;
+ m_p_queue.push(node);
+ }
+
+}
+
+void
+CNodeGenerator::handle_maintenance(CFlowGenListPerThread *thread) {
+
+ thread->tickle(); /* tickle the watchdog */
+ thread->check_msgs(); /* check messages */
+ m_v_if->flush_tx_queue(); /* flush pkt each timeout */
+
+ /* save last sync time as realtime */
+ m_last_sync_time_sec = now_sec();
+}
+
+
+void CNodeGenerator::handle_command(CGenNode *node, CFlowGenListPerThread *thread, bool &exit_scheduler) {
+ m_p_queue.pop();
+ CGenNodeCommand *node_cmd = (CGenNodeCommand *)node;
+ TrexStatelessCpToDpMsgBase * cmd=node_cmd->m_cmd;
+ cmd->handle(&thread->m_stateless_dp_info);
+ exit_scheduler = cmd->is_quit();
+ thread->free_node((CGenNode *)node_cmd);/* free the node */
+}
+
+void CNodeGenerator::handle_pcap_pkt(CGenNode *node, CFlowGenListPerThread *thread) {
+ m_p_queue.pop();
+
+ CGenNodePCAP *node_pcap = (CGenNodePCAP *)node;
+
+ /* might have been marked for free */
+ if ( unlikely( node_pcap->is_marked_for_free() ) ) {
+ thread->free_node(node);
+ } else {
+ node_pcap->handle(thread);
+ }
+}
+
+bool
+CNodeGenerator::handle_slow_messages(uint8_t type,
+ CGenNode * node,
+ CFlowGenListPerThread * thread,
+ bool always){
+
+ /* should we continue after */
+ bool exit_scheduler = false;
+
+ switch (type) {
+ case CGenNode::PCAP_PKT:
+ handle_pcap_pkt(node, thread);
+ break;
+
+ case CGenNode::FLOW_DEFER_PORT_RELEASE:
+ m_p_queue.pop();
+ thread->handler_defer_job(node);
+ thread->free_node(node);
+ break;
+
+ case CGenNode::FLOW_PKT_NAT:
+ handle_flow_pkt(node, thread);
+ break;
+
+ case CGenNode::FLOW_SYNC:
+ handle_flow_sync(node, thread, exit_scheduler);
+ break;
+
+ case CGenNode::EXIT_SCHED:
+ m_p_queue.pop();
+ thread->free_node(node);
+ exit_scheduler = true;
+ break;
+
+
+ case CGenNode::COMMAND:
+ handle_command(node, thread, exit_scheduler);
+ break;
+
+ default:
+ assert(0);
+ }
+
+ return (exit_scheduler);
+
+}
+
+void CFlowGenListPerThread::Dump(FILE *fd){
+ fprintf(fd,"yaml info ");
+ m_yaml_info.Dump(fd);
+
+ fprintf(fd,"\n");
+ fprintf(fd,"cap file info");
+ int i;
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
+ lp->Dump(stdout);
+ }
+}
+
+
+
+
+void CFlowGenListPerThread::DumpStats(FILE *fd){
+ m_stats.dump(fd);
+}
+
+
+void CFlowGenListPerThread::DumpCsv(FILE *fd){
+ CFlowStats::DumpHeader(fd);
+
+ CFlowStats stats;
+ CFlowStats sum;
+ int i;
+
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
+ lp->getFlowStats(&stats);
+ stats.Dump(fd);
+ sum.Add(stats);
+ }
+ fprintf(fd,"\n");
+ sum.m_name= "sum";
+ sum.Dump(fd);
+}
+
+
+uint32_t CFlowGenListPerThread::getDualPortId(){
+ return ( ::getDualPortId(m_thread_id) );
+}
+
+double CFlowGenListPerThread::get_longest_flow(uint8_t pool_idx, bool is_client){
+ int i;
+ double longest_flow = 0.0;
+ for (i=0;i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
+ if (is_client &&
+ lp->m_info->m_client_pool_idx != pool_idx)
+ continue;
+ if (!is_client &&
+ lp->m_info->m_server_pool_idx != pool_idx)
+ continue;
+ double tmp_len;
+ tmp_len = lp->m_flow_info->get_cap_file_length_sec();
+ if (longest_flow < tmp_len ) {
+ longest_flow = tmp_len;
+ }
+ }
+ return longest_flow;
+}
+
+
+double CFlowGenListPerThread::get_longest_flow(){
+ int i;
+ double longest_flow = 0.0;
+ for (i=0;i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
+ double tmp_len;
+ tmp_len = lp->m_flow_info->get_cap_file_length_sec();
+ if (longest_flow < tmp_len ) {
+ longest_flow = tmp_len;
+ }
+ }
+ return longest_flow;
+}
+
+double CFlowGenListPerThread::get_total_kcps(uint8_t pool_idx, bool is_client){
+ int i;
+ double total=0.0;
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
+ if (is_client &&
+ lp->m_info->m_client_pool_idx != pool_idx)
+ continue;
+ if (!is_client &&
+ lp->m_info->m_server_pool_idx != pool_idx)
+ continue;
+ total +=lp->m_info->m_k_cps;
+ }
+ return (total);
+}
+
+double CFlowGenListPerThread::get_total_kcps(){
+ int i;
+ double total=0.0;
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRecPerThread * lp=m_cap_gen[i];
+ total +=lp->m_info->m_k_cps;
+ }
+ return (total);
+}
+
+double CFlowGenListPerThread::get_delta_flow_is_sec(){
+ return (1.0/(1000.0*get_total_kcps()));
+}
+
+
+void CFlowGenListPerThread::inc_current_template(void){
+ m_cur_template++;
+ if (m_cur_template == m_cap_gen.size()) {
+ m_cur_template=0;
+ }
+}
+
+
+int CFlowGenListPerThread::generate_flows_roundrobin(bool *done){
+ // round robin
+
+ CFlowGeneratorRecPerThread * cur;
+ bool found=false;
+ // try current
+ int i;
+ *done = true;
+ for (i=0;i<(int)m_cap_gen.size();i++ ) {
+ cur=m_cap_gen[m_cur_template];
+ if (!(cur->m_info->m_limit_was_set) ||
+ (cur->m_info->m_flowcnt < cur->m_info->m_limit)) {
+ *done = false;
+ if ( cur->m_policer.update(1.0,m_cur_time_sec) ){
+ cur->m_info->m_flowcnt++;
+ found=true;
+ break;
+ }
+ }
+ inc_current_template();
+ }
+
+ if (found) {
+ /* generate the flow into the generator*/
+ CGenNode * node= create_node() ;
+
+ cur->generate_flow(&m_node_gen,m_cur_time_sec,m_cur_flow_id,node);
+ m_cur_flow_id++;
+
+ /* this is estimation */
+ m_stats.m_total_open_flows += cur->m_flow_info->get_total_flows();
+ m_stats.m_total_bytes += cur->m_flow_info->get_total_bytes();
+ m_stats.m_total_pkt += cur->m_flow_info->Size();
+ inc_current_template();
+ }
+ return (0);
+}
+
+
+int CFlowGenListPerThread::reschedule_flow(CGenNode *node){
+
+ // Re-schedule the node
+ node->reset_pkt_in_flow();
+ node->m_time += node->m_template_info->m_restart_time;
+ m_node_gen.add_node(node);
+
+ m_stats.m_total_bytes += node->m_flow_info->get_total_bytes();
+ m_stats.m_total_pkt += node->m_flow_info->Size();
+
+ return (0);
+}
+
+void CFlowGenListPerThread::terminate_nat_flows(CGenNode *p){
+ m_stats.m_nat_flow_timeout++;
+ m_stats.m_nat_lookup_remove_flow_id++;
+ if (p->is_nat_wait_ack_state()) {
+ m_stats.m_nat_flow_timeout_wait_ack++;
+ } else {
+ m_stats.m_nat_lookup_wait_ack_state++;
+ }
+ m_flow_id_to_node_lookup.remove_no_lookup(p->get_short_fid());
+ free_last_flow_node( p);
+}
+
+
+void CFlowGenListPerThread::handle_latency_pkt_msg(CGenNodeLatencyPktInfo * msg){
+ /* send the packet */
+ #ifdef RX_QUEUE_TRACE_
+ printf(" latency msg dir %d\n",msg->m_dir);
+ struct rte_mbuf * m;
+ m=msg->m_pkt;
+ rte_pktmbuf_dump(stdout,m, rte_pktmbuf_pkt_len(m));
+ #endif
+
+ /* update timestamp */
+ struct rte_mbuf * m;
+ m=msg->m_pkt;
+ uint8_t *p=rte_pktmbuf_mtod(m, uint8_t*);
+ latency_header * h=(latency_header *)(p+msg->m_latency_offset);
+ h->time_stamp = os_get_hr_tick_64();
+
+ m_node_gen.m_v_if->send_one_pkt((pkt_dir_t)msg->m_dir,msg->m_pkt);
+}
+
+void CFlowGenListPerThread::handle_nat_msg(CGenNodeNatInfo * msg){
+ int i;
+ bool first = true, second = true;
+
+ for (i=0; i<msg->m_cnt; i++) {
+ first = true;
+ second = true;
+ CNatFlowInfo * nat_msg=&msg->m_data[i];
+ CGenNode * node=m_flow_id_to_node_lookup.lookup(nat_msg->m_fid);
+ if (!node) {
+ /* this should be moved to a notification module */
+#ifdef NAT_TRACE_
+ printf(" ERORR not valid flow_id %d probably flow was aged \n",nat_msg->m_fid);
+#endif
+ m_stats.m_nat_lookup_no_flow_id++;
+ continue;
+ }
+
+ // Calculate diff between tcp seq of SYN packet, and TCP ack of SYN+ACK packet
+ // For supporting firewalls who do TCP seq num randomization
+ if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_TCP)) {
+ if (node->is_nat_wait_state()) {
+ char *syn_pkt = node->m_flow_info->GetPacket(0)->m_packet->raw;
+ TCPHeader *tcp = (TCPHeader *)(syn_pkt + node->m_pkt_info->m_pkt_indication.getFastTcpOffset());
+ node->set_nat_tcp_seq_diff_client(nat_msg->m_tcp_seq - tcp->getSeqNumber());
+ if (CGlobalInfo::is_learn_mode(CParserOption::LEARN_MODE_TCP_ACK)) {
+ node->set_nat_wait_ack_state();
+ m_stats.m_nat_lookup_wait_ack_state++;
+ second = false;
+ } else {
+ node->set_nat_learn_state();
+ }
+ } else {
+ char *syn_ack_pkt = node->m_flow_info->GetPacket(1)->m_packet->raw;
+ TCPHeader *tcp = (TCPHeader *)(syn_ack_pkt + node->m_pkt_info->m_pkt_indication.getFastTcpOffset());
+ node->set_nat_tcp_seq_diff_server(nat_msg->m_tcp_seq - tcp->getSeqNumber());
+ assert(node->is_nat_wait_ack_state());
+ node->set_nat_learn_state();
+ first = false;
+ }
+ } else {
+ assert(node->is_nat_wait_state());
+ node->set_nat_learn_state();
+ }
+
+ if (first) {
+#ifdef NAT_TRACE_
+ printf(" %.03f RX :set node %p:%x %x:%x TCP diff %x\n"
+ , now_sec(), node,nat_msg->m_fid, nat_msg->m_external_ip, nat_msg->m_external_port
+ , node->get_nat_tcp_seq_diff_client());
+#endif
+
+ node->set_nat_ipv4_addr(nat_msg->m_external_ip);
+ node->set_nat_ipv4_port(nat_msg->m_external_port);
+
+ if ( CGlobalInfo::is_learn_verify_mode() ){
+ if (!node->is_external_is_eq_to_internal_ip() ||
+ node->get_nat_tcp_seq_diff_client() != 0) {
+ m_stats.m_nat_flow_learn_error++;
+ }
+ }
+ }
+
+ if (second) {
+ /* remove from the hash */
+ m_flow_id_to_node_lookup.remove_no_lookup(nat_msg->m_fid);
+ m_stats.m_nat_lookup_remove_flow_id++;
+ }
+ }
+}
+
+void CFlowGenListPerThread::check_msgs(void) {
+
+ /* inlined for performance */
+ m_stateless_dp_info.periodic_check_for_cp_messages();
+
+ if ( likely ( m_ring_from_rx->isEmpty() ) ) {
+ return;
+ }
+
+ #ifdef NAT_TRACE_
+ printf(" %.03f got message from RX \n",now_sec());
+ #endif
+ while ( true ) {
+ CGenNode * node;
+ if ( m_ring_from_rx->Dequeue(node)!=0 ){
+ break;
+ }
+ assert(node);
+ //printf ( " message: thread %d, node->m_flow_id : %d \n", m_thread_id,node->m_flow_id);
+ /* only one message is supported right now */
+
+ CGenNodeMsgBase * msg=(CGenNodeMsgBase *)node;
+
+ uint8_t msg_type = msg->m_msg_type;
+ switch (msg_type ) {
+ case CGenNodeMsgBase::NAT_FIRST:
+ handle_nat_msg((CGenNodeNatInfo * )msg);
+ break;
+
+ case CGenNodeMsgBase::LATENCY_PKT:
+ handle_latency_pkt_msg((CGenNodeLatencyPktInfo *) msg);
+ break;
+
+ default:
+ printf("ERROR pkt-thread message type is not valid %d \n",msg_type);
+ assert(0);
+ }
+
+ CGlobalInfo::free_node(node);
+ }
+}
+
+
+
+void CFlowGenListPerThread::start_stateless_simulation_file(std::string erf_file_name,
+ CPreviewMode &preview,
+ uint64_t limit){
+ m_preview_mode = preview;
+ m_node_gen.open_file(erf_file_name,&m_preview_mode);
+ m_node_gen.set_packet_limit(limit);
+}
+
+void CFlowGenListPerThread::stop_stateless_simulation_file(){
+ m_node_gen.m_v_if->close_file();
+}
+
+void CFlowGenListPerThread::start_stateless_daemon_simulation(){
+
+ m_cur_time_sec = 0;
+
+ /* if no pending CP messages - the core will simply be stuck forever */
+ if (m_stateless_dp_info.are_any_pending_cp_messages()) {
+ m_stateless_dp_info.run_once();
+ }
+}
+
+
+/* return true if we need to shedule next_stream, */
+
+bool CFlowGenListPerThread::set_stateless_next_node( CGenNodeStateless * cur_node,
+ CGenNodeStateless * next_node){
+ return ( m_stateless_dp_info.set_stateless_next_node(cur_node,next_node) );
+}
+
+
+void CFlowGenListPerThread::start_stateless_daemon(CPreviewMode &preview){
+ m_cur_time_sec = 0;
+ /* set per thread global info, for performance */
+ m_preview_mode = preview;
+ m_node_gen.open_file("",&m_preview_mode);
+
+ m_stateless_dp_info.start();
+}
+
+
+void CFlowGenListPerThread::start_generate_stateful(std::string erf_file_name,
+ CPreviewMode & preview){
+ /* now we are ready to generate*/
+ if ( m_cap_gen.size()==0 ){
+ fprintf(stderr," nothing to generate no template loaded \n");
+ return;
+ }
+
+ m_preview_mode = preview;
+ m_node_gen.open_file(erf_file_name,&m_preview_mode);
+ dsec_t d_time_flow=get_delta_flow_is_sec();
+
+ m_cur_time_sec = 0.01 + m_thread_id*m_flow_list->get_delta_flow_is_sec();
+
+
+ if ( CGlobalInfo::is_realtime() ){
+ if (m_cur_time_sec > 0.2 ) {
+ m_cur_time_sec = 0.01 + m_thread_id*0.01;
+ }
+ m_cur_time_sec += now_sec() + 0.1 ;
+ }
+ dsec_t c_stop_sec = m_cur_time_sec + m_yaml_info.m_duration_sec;
+ m_stop_time_sec =c_stop_sec;
+ m_cur_flow_id =1;
+ m_cur_template =(m_thread_id % m_cap_gen.size());
+ m_stats.clear();
+
+ fprintf(stdout," Generating erf file ... \n");
+ CGenNode * node= create_node() ;
+ /* add periodic */
+ node->m_type = CGenNode::FLOW_FIF;
+ node->m_time = m_cur_time_sec;
+ m_node_gen.add_node(node);
+
+ double old_offset=0.0;
+
+ node= create_node() ;
+ node->m_type = CGenNode::FLOW_SYNC;
+ node->m_time = m_cur_time_sec + SYNC_TIME_OUT ;
+
+ m_node_gen.add_node(node);
+
+ #ifdef _DEBUG
+ if ( m_preview_mode.getVMode() >2 ){
+
+ CGenNode::DumpHeader(stdout);
+ }
+ #endif
+
+ m_node_gen.flush_file(c_stop_sec,d_time_flow, false,this,old_offset);
+
+
+#ifdef VALG
+ CALLGRIND_STOP_INSTRUMENTATION;
+ printf (" %llu \n",os_get_hr_tick_64()-_start_time);
+#endif
+ if ( !CGlobalInfo::m_options.preview.getNoCleanFlowClose() && (is_terminated_by_master()==false) ){
+ /* clean close */
+ m_node_gen.flush_file(m_cur_time_sec, d_time_flow, true,this,old_offset);
+ }
+
+ if (m_preview_mode.getVMode() > 1 ) {
+ fprintf(stdout,"\n\n");
+ fprintf(stdout,"\n\n");
+ fprintf(stdout,"file stats \n");
+ fprintf(stdout,"=================\n");
+ m_stats.dump(stdout);
+ }
+ m_node_gen.close_file(this);
+}
+
+void CFlowGenList::Delete(){
+ clean_p_thread_info();
+ Clean();
+ if (CPluginCallback::callback) {
+ delete CPluginCallback::callback;
+ CPluginCallback::callback = NULL;
+ }
+}
+
+
+bool CFlowGenList::Create(){
+ check_objects_sizes();
+ CPluginCallback::callback= new CPluginCallbackSimple();
+ return (true);
+}
+
+
+void CFlowGenList::generate_p_thread_info(uint32_t num_threads){
+ clean_p_thread_info();
+ BP_ASSERT(num_threads < 64);
+ int i;
+ for (i=0; i<(int)num_threads; i++) {
+ CFlowGenListPerThread * lp= new CFlowGenListPerThread();
+ lp->Create(i,i,this,num_threads);
+ m_threads_info.push_back(lp);
+ }
+}
+
+
+void CFlowGenList::clean_p_thread_info(void){
+ int i;
+ for (i=0; i<(int)m_threads_info.size(); i++) {
+ CFlowGenListPerThread * lp=m_threads_info[i];
+ lp->Delete();
+ delete lp;
+ }
+ m_threads_info.clear();
+}
+
+int CFlowGenList::load_client_config_file(std::string file_name) {
+ m_client_config_info.load_yaml_file(file_name);
+ return (0);
+}
+
+int CFlowGenList::load_from_yaml(std::string file_name,
+ uint32_t num_threads){
+ uint8_t idx;
+ m_yaml_info.load_from_yaml_file(file_name);
+ if (m_yaml_info.verify_correctness(num_threads) ==false){
+ exit(0);
+ }
+
+ /* move it to global info, better CPU D-cache usage */
+ CGlobalInfo::m_options.preview.set_vlan_mode_enable(m_yaml_info.m_vlan_info.m_enable);
+ CGlobalInfo::m_options.m_vlan_port[0] = m_yaml_info.m_vlan_info.m_vlan_per_port[0];
+ CGlobalInfo::m_options.m_vlan_port[1] = m_yaml_info.m_vlan_info.m_vlan_per_port[1];
+ CGlobalInfo::m_options.preview.set_mac_ip_overide_enable(m_yaml_info.m_mac_replace_by_ip);
+
+ if ( m_yaml_info.m_mac_base.size() != 6 ){
+ printf(" mac addr is not valid \n");
+ exit(0);
+ }
+
+ if (m_yaml_info.m_ipv6_set == true) {
+ // Copy the most significant 96-bits from yaml data
+ for (idx=0; idx<6; idx++){
+ CGlobalInfo::m_options.m_src_ipv6[idx] = m_yaml_info.m_src_ipv6[idx];
+ CGlobalInfo::m_options.m_dst_ipv6[idx] = m_yaml_info.m_dst_ipv6[idx];
+ }
+ }else{
+ // Set the most signifcant 96-bits to zero which represents an
+ // IPv4-compatible IPv6 address
+ for (idx=0; idx<6; idx++){
+ CGlobalInfo::m_options.m_src_ipv6[idx] = 0;
+ CGlobalInfo::m_options.m_dst_ipv6[idx] = 0;
+ }
+ }
+
+ int i=0;
+ Clean();
+ bool all_template_has_one_direction=true;
+ for (i=0; i<(int)m_yaml_info.m_vec.size(); i++) {
+ CFlowGeneratorRec * lp=new CFlowGeneratorRec();
+ if ( lp->Create(&m_yaml_info.m_vec[i],&m_yaml_info,i) == false){
+ fprintf(stdout,"\n ERROR reading YAML template files, please verify that they are valid \n\n");
+ exit(-1);
+ return (-1);
+ }
+ m_cap_gen.push_back(lp);
+
+ if (lp->m_flow_info.GetPacket(0)->m_pkt_indication.m_desc.IsBiDirectionalFlow() ) {
+ all_template_has_one_direction=false;
+ }
+ }
+
+ if ( CGlobalInfo::is_learn_mode() && all_template_has_one_direction ) {
+ fprintf(stdout,"\n Warning --learn mode has nothing to do when all templates are one directional, please remove it \n");
+ }
+ return (0);
+}
+
+
+
+void CFlowGenList::Clean(){
+ int i;
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRec * lp=m_cap_gen[i];
+ lp->Delete();
+ delete lp;
+ }
+ m_cap_gen.clear();
+}
+
+double CFlowGenList::GetCpuUtil(){
+ int i;
+ double c=0.0;
+ for (i=0; i<(int)m_threads_info.size(); i++) {
+ CFlowGenListPerThread * lp=m_threads_info[i];
+ c+=lp->m_cpu_cp_u.GetVal();
+ }
+ return (c/m_threads_info.size());
+}
+
+double CFlowGenList::GetCpuUtilRaw(){
+ int i;
+ double c=0.0;
+ for (i=0; i<(int)m_threads_info.size(); i++) {
+ CFlowGenListPerThread * lp=m_threads_info[i];
+ c+=lp->m_cpu_cp_u.GetValRaw();
+ }
+ return (c/m_threads_info.size());
+}
+
+
+void CFlowGenList::UpdateFast(){
+
+ for (int i=0; i<(int)m_threads_info.size(); i++) {
+ CFlowGenListPerThread * lp=m_threads_info[i];
+ lp->Update();
+ }
+}
+
+
+
+void CFlowGenList::Dump(FILE *fd){
+ fprintf(fd,"yaml info \n");
+ fprintf(fd,"--------------\n");
+ m_yaml_info.Dump(fd);
+
+ fprintf(fd,"\n");
+ fprintf(fd,"cap file info \n");
+ fprintf(fd,"----------------------\n");
+ int i;
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRec * lp=m_cap_gen[i];
+ lp->Dump(fd);
+ }
+}
+
+
+void CFlowGenList::DumpPktSize(){
+
+ int i;
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRec * lp=m_cap_gen[i];
+ lp->m_flow_info.dump_pkt_sizes();
+ }
+}
+
+
+void CFlowGenList::DumpCsv(FILE *fd){
+ CFlowStats::DumpHeader(fd);
+
+ CFlowStats stats;
+ CFlowStats sum;
+ int i;
+
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRec * lp=m_cap_gen[i];
+ lp->getFlowStats(&stats);
+ stats.Dump(fd);
+ sum.Add(stats);
+ }
+ fprintf(fd,"\n");
+ sum.m_name= "sum";
+ sum.Dump(fd);
+ sum.m_memory.dump(fd);
+}
+
+
+uint32_t CFlowGenList::get_total_repeat_flows(){
+ uint32_t flows=0;
+ int i;
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRec * lp=m_cap_gen[i];
+ flows+=lp->m_info->m_limit ;
+ }
+ return (flows);
+}
+
+
+double CFlowGenList::get_total_tx_bps(){
+ CFlowStats stats;
+ double total=0.0;
+ int i;
+
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRec * lp=m_cap_gen[i];
+ lp->getFlowStats(&stats);
+ total+=(stats.m_mb_sec);
+ }
+ return (_1Mb_DOUBLE*total);
+}
+
+double CFlowGenList::get_total_pps(){
+
+ CFlowStats stats;
+ double total=0.0;
+ int i;
+
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRec * lp=m_cap_gen[i];
+ lp->getFlowStats(&stats);
+ total+=stats.m_pps;
+ }
+ return (total);
+}
+
+
+double CFlowGenList::get_total_kcps(){
+
+ CFlowStats stats;
+ double total=0.0;
+ int i;
+
+ for (i=0; i<(int)m_cap_gen.size(); i++) {
+ CFlowGeneratorRec * lp=m_cap_gen[i];
+ lp->getFlowStats(&stats);
+ total+= stats.get_normal_cps();
+ }
+ return ((total/1000.0));
+}
+
+double CFlowGenList::get_delta_flow_is_sec(){
+ return (1.0/(1000.0*get_total_kcps()));
+}
+
+
+
+bool CPolicer::update(double dsize,double now_sec){
+ if ( m_last_time ==0.0 ) {
+ /* first time */
+ m_last_time = now_sec;
+ return (true);
+ }
+ if (m_cir == 0.0) {
+ return (false);
+ }
+
+ // check if there is a need to add tokens
+ if(now_sec > m_last_time) {
+ dsec_t dtime=(now_sec - m_last_time);
+ dsec_t dsize =dtime*m_cir;
+ m_level +=dsize;
+ if (m_level > m_bucket_size) {
+ m_level = m_bucket_size;
+ }
+ m_last_time = now_sec;
+ }
+
+ if (m_level > dsize) {
+ m_level -= dsize;
+ return (true);
+ }else{
+ return (false);
+ }
+}
+
+
+float CPPSMeasure::add(uint64_t pkts){
+ if ( false == m_start ){
+ m_start=true;
+ m_last_time_msec = os_get_time_msec() ;
+ m_last_pkts=pkts;
+ return (0.0);
+ }
+
+ uint32_t ctime=os_get_time_msec();
+ if ((ctime - m_last_time_msec) <os_get_time_freq() ) {
+ return (m_last_result);
+ }
+
+ uint32_t dtime_msec = ctime-m_last_time_msec;
+ uint32_t dpkts = (pkts - m_last_pkts);
+
+ m_last_time_msec = ctime;
+ m_last_pkts = pkts;
+
+ m_last_result= 0.5*calc_pps(dtime_msec,dpkts) +0.5*(m_last_result);
+ return ( m_last_result );
+}
+
+
+
+CBwMeasure::CBwMeasure() {
+ reset();
+}
+
+void CBwMeasure::reset(void) {
+ m_start=false;
+ m_last_time_msec=0;
+ m_last_bytes=0;
+ m_last_result=0.0;
+};
+
+double CBwMeasure::calc_MBsec(uint32_t dtime_msec,
+ uint64_t dbytes){
+ double rate=0.000008*( ( (double)dbytes*(double)os_get_time_freq())/((double)dtime_msec) );
+ return (rate);
+}
+
+double CBwMeasure::add(uint64_t size) {
+ if ( false == m_start ){
+ m_start=true;
+ m_last_time_msec = os_get_time_msec() ;
+ m_last_bytes=size;
+ return (0.0);
+ }
+
+ uint32_t ctime=os_get_time_msec();
+ if ((ctime - m_last_time_msec) <os_get_time_freq() ) {
+ return (m_last_result);
+ }
+
+ uint32_t dtime_msec = ctime-m_last_time_msec;
+ uint64_t dbytes = size - m_last_bytes;
+
+ m_last_time_msec = ctime;
+ m_last_bytes = size;
+
+ m_last_result= 0.5*calc_MBsec(dtime_msec,dbytes) +0.5*(m_last_result);
+ return ( m_last_result );
+}
+
+
+
+/*
+ * Test if option value is within allowed range.
+ * val - Value to test
+ * min, max - minimum, maximum allowed values.
+ * opt_name - option name for error report.
+ */
+bool CParserOption::is_valid_opt_val(int val, int min, int max, const std::string &opt_name) {
+ if (val < min || val > max) {
+ std::cerr << "Value " << val << " for option " << opt_name << " is out of range. Should be (" << min << "-" << max << ")." << std::endl;
+ return false;
+ }
+
+ return true;
+}
+
+void CParserOption::dump(FILE *fd){
+ preview.Dump(fd);
+ fprintf(fd," cfg file : %s \n",cfg_file.c_str());
+ fprintf(fd," mac file : %s \n",client_cfg_file.c_str());
+ fprintf(fd," out file : %s \n",out_file.c_str());
+ fprintf(fd," client cfg file : %s \n",out_file.c_str());
+ fprintf(fd," duration : %.0f \n",m_duration);
+ fprintf(fd," factor : %.0f \n",m_factor);
+ fprintf(fd," mbuf_factor : %.0f \n",m_mbuf_factor);
+ fprintf(fd," latency : %d pkt/sec \n",m_latency_rate);
+ fprintf(fd," zmq_port : %d \n",m_zmq_port);
+ fprintf(fd," telnet_port : %d \n",m_telnet_port);
+ fprintf(fd," expected_ports : %d \n",m_expected_portd);
+ if (preview.get_vlan_mode_enable() ) {
+ fprintf(fd," vlans : [%d,%d] \n",m_vlan_port[0],m_vlan_port[1]);
+ }
+
+ int i;
+ for (i = 0; i < TREX_MAX_PORTS; i++) {
+ fprintf(fd," port : %d dst:",i);
+ CMacAddrCfg * lp=&m_mac_addr[i];
+ dump_mac_addr(fd,lp->u.m_mac.dest);
+ fprintf(fd," src:");
+ dump_mac_addr(fd,lp->u.m_mac.src);
+ fprintf(fd,"\n");
+ }
+}
+
+void CParserOption::verify() {
+ /* check for mutual exclusion options */
+ if (preview.get_is_client_cfg_enable()) {
+ if (preview.get_vlan_mode_enable() || preview.get_mac_ip_overide_enable()) {
+ throw std::runtime_error("VLAN / MAC override cannot be combined with client configuration");
+ }
+ }
+}
+
+#if 0
+
+void CTupleGlobalGenerator::Dump(FILE *fd){
+ fprintf(fd," src:%x dest: %x \n",m_result_src_ip,m_result_dest_ip);
+}
+
+bool CTupleGlobalGenerator::Create(){
+ was_generated=false;
+ return (true);
+}
+
+
+void CTupleGlobalGenerator::Copy(CTupleGlobalGenerator * gen){
+ was_generated=false;
+ m_min_src_ip = gen->m_min_src_ip;
+ m_max_src_ip = gen->m_max_src_ip;
+ m_min_dest_ip = gen->m_min_dest_ip;
+ m_max_dest_ip = gen->m_max_dest_ip;
+}
+
+
+void CTupleGlobalGenerator::Delete(){
+ was_generated=false;
+}
+
+#endif
+
+static uint32_t get_rand_32(uint32_t MinimumRange ,
+ uint32_t MaximumRange );
+
+
+#if 0
+void CTupleGlobalGenerator::Generate(uint32_t thread_id,
+ uint32_t num_addr ){
+ if ( was_generated == false) {
+ /* first time */
+ was_generated = true;
+ cur_src_ip = m_min_src_ip;
+ cur_dst_ip = m_min_dest_ip;
+ }
+
+ if ( ( cur_src_ip + num_addr ) > m_max_src_ip ) {
+ cur_src_ip = m_min_src_ip;
+ }
+
+ /* copy the results */
+ m_result_src_ip = cur_src_ip;
+ m_result_dest_ip = cur_dst_ip;
+ cur_src_ip += num_addr;
+ cur_dst_ip += 1;
+ if (cur_dst_ip > m_max_dest_ip ) {
+ cur_dst_ip = m_min_dest_ip;
+ }
+}
+
+
+
+
+void CTupleTemplateGenerator::Dump(FILE *fd){
+ fprintf(fd," id: %x, %x:%x - %x \n",m_id,m_result_src_ip,m_result_dest_ip,m_result_src_port);
+}
+
+
+bool CTupleTemplateGenerator::Create(CTupleGlobalGenerator * global_gen,
+ uint16_t w,
+ uint16_t wlength,
+ uint32_t _id,
+ uint32_t thread_id){
+ m_was_generated = false;
+ m_thread_id = thread_id;
+ m_lp_global_gen = global_gen;
+ BP_ASSERT(m_lp_global_gen);
+ m_cur_src_port = 1;
+ m_cur_src_port_cnt=0;
+
+ m_w = w;
+ m_wlength = wlength;
+
+ m_id = _id;
+ m_was_init=true;
+ return(true);
+}
+
+void CTupleTemplateGenerator::Delete(){
+ m_was_generated = false;
+ m_was_init=false;
+}
+
+void CTupleTemplateGenerator::Generate_src_dest(){
+ /* TBD need to fix the 100*/
+ m_lp_global_gen->Generate(m_thread_id,m_wlength);
+ m_result_src_ip = m_lp_global_gen->m_result_src_ip;
+
+ m_dest_ip = m_lp_global_gen->m_result_dest_ip;
+ m_result_dest_ip = update_dest_ip(m_dest_ip );
+ m_cnt=0;
+}
+
+uint16_t CTupleTemplateGenerator::GenerateOneSourcePort(){
+ /* handle port */
+ m_cur_src_port++;
+ /* do not use port zero */
+ if (m_cur_src_port == 0) {
+ m_cur_src_port=1;
+ }
+ m_result_src_port=m_cur_src_port;
+ return (m_cur_src_port);
+}
+
+void CTupleTemplateGenerator::Generate(){
+ BP_ASSERT(m_was_init);
+ if ( m_was_generated == false ) {
+ /* first time */
+ Generate_src_dest();
+ m_was_generated = true;
+ }else{
+ /* ip+cnt,dest+cnt*/
+ m_cnt++;
+ if ( m_cnt >= m_wlength ) {
+ m_cnt =0;
+ m_result_src_ip -=m_wlength;
+ m_result_dest_ip = m_dest_ip;
+ m_cur_src_port_cnt++;
+ if (m_cur_src_port_cnt >= m_w ) {
+ Generate_src_dest();
+ m_cur_src_port_cnt=0;
+ }
+ }
+ m_result_src_ip += 1;
+ m_result_dest_ip = update_dest_ip(m_dest_ip +m_cnt );
+ }
+
+
+ /* handle port */
+ m_cur_src_port++;
+ /* do not use port zero */
+ if (m_cur_src_port == 0) {
+ m_cur_src_port=1;
+ }
+ m_result_src_ip =update_src_ip( m_result_src_ip );
+ m_result_src_port=m_cur_src_port;
+}
+
+#endif
+
+static uint32_t get_rand_32(uint32_t MinimumRange,
+ uint32_t MaximumRange) __attribute__ ((unused));
+
+static uint32_t get_rand_32(uint32_t MinimumRange,
+ uint32_t MaximumRange) {
+ enum {RANDS_NUM = 2 , RAND_MAX_BITS = 0xf , UNSIGNED_INT_BITS = 0x20 , TWO_BITS_MASK = 0x3};
+ const double TWO_POWER_32_BITS = 0x10000000 * (double)0x10;
+ uint32_t RandomNumber = 0;
+
+ for (int i = 0 ; i < RANDS_NUM;i++) {
+ RandomNumber = (RandomNumber<<RAND_MAX_BITS) + rand();
+ }
+ RandomNumber = (RandomNumber<<(UNSIGNED_INT_BITS - RAND_MAX_BITS * RANDS_NUM)) + (rand() | TWO_BITS_MASK);
+
+ uint32_t Range;
+ if ((Range = MaximumRange - MinimumRange) == 0xffffffff) {
+ return RandomNumber;
+ }
+ return (uint32_t)(((Range + 1) / TWO_POWER_32_BITS * RandomNumber) + MinimumRange );
+}
+
+
+
+int CNullIF::send_node(CGenNode * node){
+ #if 0
+ CFlowPktInfo * lp=node->m_pkt_info;
+ rte_mbuf_t * buf=lp->generate_new_mbuf(node);
+ //rte_pktmbuf_dump(buf, buf->pkt_len);
+ //sending it ??
+ // free it here as if driver does
+ rte_pktmbuf_free(buf);
+ #endif
+ return (0);
+}
+
+
+
+void CErfIF::fill_raw_packet(rte_mbuf_t * m,CGenNode * node,pkt_dir_t dir){
+
+ fill_pkt(m_raw,m);
+
+ CPktNsecTimeStamp t_c(node->m_time);
+ m_raw->time_nsec = t_c.m_time_nsec;
+ m_raw->time_sec = t_c.m_time_sec;
+ uint8_t p_id = (uint8_t)dir;
+ m_raw->setInterface(p_id);
+}
+
+
+pkt_dir_t CErfIFStl::port_id_to_dir(uint8_t port_id) {
+ return ((pkt_dir_t)(port_id&1));
+}
+
+
+int CErfIFStl::update_mac_addr_from_global_cfg(pkt_dir_t dir, uint8_t * p){
+ memcpy(p,CGlobalInfo::m_options.get_dst_src_mac_addr(dir),12);
+ return (0);
+}
+
+int CErfIFStl::send_sl_node(CGenNodeStateless *node_sl) {
+ pkt_dir_t dir=(pkt_dir_t)node_sl->get_mbuf_cache_dir();
+
+ rte_mbuf_t * m;
+ if ( likely(node_sl->is_cache_mbuf_array()) ) {
+ m=node_sl->cache_mbuf_array_get_cur();
+ fill_raw_packet(m,(CGenNode *)node_sl,dir);
+ }else{
+ m=node_sl->get_cache_mbuf();
+ bool is_const = false;
+ if (m) {
+ is_const = true;
+ rte_pktmbuf_refcnt_update(m,1);
+ }else{
+ m=node_sl->alloc_node_with_vm();
+ assert(m);
+ }
+
+ if (node_sl->is_stat_needed() && (node_sl->get_stat_hw_id() >= MAX_FLOW_STATS) ) {
+ /* latency packet. flow stat without latency handled like normal packet in simulation */
+ uint16_t hw_id = node_sl->get_stat_hw_id();
+ rte_mbuf_t *mi;
+ struct flow_stat_payload_header *fsp_head;
+ mi = node_sl->alloc_flow_stat_mbuf(m, fsp_head, is_const);
+ fsp_head->seq = 0x12345678;
+ fsp_head->hw_id = hw_id - MAX_FLOW_STATS;
+ fsp_head->magic = FLOW_STAT_PAYLOAD_MAGIC;
+ fsp_head->flow_seq = FLOW_STAT_PAYLOAD_INITIAL_FLOW_SEQ;
+ fsp_head->time_stamp = 0x8899aabbccddeeff;
+ fill_raw_packet(mi, (CGenNode *)node_sl, dir);
+ rte_pktmbuf_free(mi);
+ } else {
+ fill_raw_packet(m,(CGenNode *)node_sl,dir);
+ rte_pktmbuf_free(m);
+ }
+ }
+ /* check that we have mbuf */
+ int rc = write_pkt(m_raw);
+ BP_ASSERT(rc == 0);
+
+ return (rc);
+}
+
+int CErfIFStl::send_pcap_node(CGenNodePCAP *pcap_node) {
+ rte_mbuf_t *m = pcap_node->get_pkt();
+ if (!m) {
+ return (-1);
+ }
+
+ pkt_dir_t dir = (pkt_dir_t)pcap_node->get_mbuf_dir();
+ fill_raw_packet(m, (CGenNode*)pcap_node, dir);
+ rte_pktmbuf_free(m);
+
+ int rc = write_pkt(m_raw);
+ BP_ASSERT(rc == 0);
+
+ return (rc);
+}
+
+/*
+ * This is the simulation stateless send_node.
+ * in simulation (bp-sim-64) it is called instead of CCoreEthIFStateless::send_node
+ * Purpose is to test the mbuf manipulation functions which are the same in simulation and "real" code
+ */
+int CErfIFStl::send_node(CGenNode * _no_to_use){
+
+ if ( m_preview_mode->getFileWrite() ) {
+
+ switch (_no_to_use->m_type) {
+ case CGenNode::STATELESS_PKT:
+ return send_sl_node((CGenNodeStateless *) _no_to_use);
+
+ case CGenNode::PCAP_PKT:
+ return send_pcap_node((CGenNodePCAP *) _no_to_use);
+
+ default:
+ assert(0);
+ }
+ }
+ return (0);
+}
+
+void CErfIF::add_vlan(uint16_t vlan_id) {
+ uint8_t *buffer =(uint8_t *)m_raw->raw;
+
+ uint16_t vlan_protocol = EthernetHeader::Protocol::VLAN;
+ uint32_t vlan_tag = (vlan_protocol << 16) | vlan_id;
+ vlan_tag = PKT_HTONL(vlan_tag);
+
+ /* insert vlan tag and adjust packet size */
+ memcpy(cbuff+4, buffer + 12, m_raw->pkt_len - 12);
+ memcpy(cbuff, &vlan_tag, 4);
+ memcpy(buffer + 12, cbuff, m_raw->pkt_len - 8);
+
+ m_raw->pkt_len += 4;
+}
+
+void CErfIF::apply_client_config(const ClientCfg *cfg, pkt_dir_t dir) {
+ assert(cfg);
+ uint8_t *p = (uint8_t *)m_raw->raw;
+
+ const ClientCfgDir &cfg_dir = ( (dir == CLIENT_SIDE) ? cfg->m_initiator : cfg->m_responder);
+
+ /* dst mac */
+ if (cfg_dir.has_dst_mac_addr()) {
+ memcpy(p, cfg_dir.get_dst_mac_addr(), 6);
+ }
+
+ /* src mac */
+ if (cfg_dir.has_src_mac_addr()) {
+ memcpy(p + 6, cfg_dir.get_src_mac_addr(), 6);
+ }
+
+ /* VLAN */
+ if (cfg_dir.has_vlan()) {
+ add_vlan(cfg_dir.get_vlan());
+ }
+}
+
+int CErfIF::send_node(CGenNode *node){
+
+ if (!m_preview_mode->getFileWrite()) {
+ return (0);
+ }
+
+ CFlowPktInfo *lp = node->m_pkt_info;
+ rte_mbuf_t *m = lp->generate_new_mbuf(node);
+ pkt_dir_t dir = node->cur_interface_dir();
+
+ fill_raw_packet(m, node, dir);
+
+ /* update mac addr dest/src 12 bytes */
+ uint8_t *p=(uint8_t *)m_raw->raw;
+ int p_id=(int)dir;
+ memcpy(p,CGlobalInfo::m_options.get_dst_src_mac_addr(p_id),12);
+
+ /* if a client configuration was provided - apply the config */
+ if (CGlobalInfo::m_options.preview.get_is_client_cfg_enable()) {
+ apply_client_config(node->m_client_cfg, dir);
+
+ } else if (CGlobalInfo::m_options.preview.get_vlan_mode_enable()) {
+ uint8_t vlan_port = (node->m_src_ip & 1);
+ uint16_t vlan_id = CGlobalInfo::m_options.m_vlan_port[vlan_port];
+ add_vlan(vlan_id);
+ }
+
+ //utl_DumpBuffer(stdout,p, 12,0);
+
+ int rc = write_pkt(m_raw);
+ BP_ASSERT(rc == 0);
+
+ rte_pktmbuf_free(m);
+
+ return (0);
+}
+
+int CErfIF::flush_tx_queue(void){
+ return (0);
+}
+
+void CTcpSeq::update(uint8_t *p, CFlowPktInfo *pkt_info, int16_t s_size){
+ TCPHeader *tcp= (TCPHeader *)(p+pkt_info->m_pkt_indication.getFastTcpOffset());
+ uint32_t seqnum, acknum;
+
+ // This routine will adjust the TCP segment size for packets
+ // based on the modifications made by the plugins.
+ // Basically it will keep track of the size changes
+ // and adjust the TCP sequence numbers accordingly.
+
+ bool is_init=pkt_info->m_pkt_indication.m_desc.IsInitSide();
+
+ // Update TCP seq number
+ seqnum = tcp->getSeqNumber();
+ acknum = tcp->getAckNumber();
+ if (is_init) {
+ // Packet is from client
+ seqnum += client_seq_delta;
+ acknum += server_seq_delta;
+ } else {
+ // Packet is from server
+ seqnum += server_seq_delta;
+ acknum += client_seq_delta;
+ }
+ tcp->setSeqNumber(seqnum);
+ tcp->setAckNumber(acknum);
+
+ // Adjust delta being tracked
+ if (is_init) {
+ client_seq_delta += s_size;
+ } else {
+ server_seq_delta += s_size;
+ }
+}
+
+
+void on_node_first(uint8_t plugin_id,CGenNode * node,
+ CFlowYamlInfo * template_info,
+ CTupleTemplateGeneratorSmart * tuple_gen,
+ CFlowGenListPerThread * flow_gen){
+
+ if (CPluginCallback::callback) {
+ CPluginCallback::callback->on_node_first(plugin_id,node,template_info, tuple_gen,flow_gen);
+ }
+}
+
+void on_node_last(uint8_t plugin_id,CGenNode * node){
+ if (CPluginCallback::callback) {
+ CPluginCallback::callback->on_node_last(plugin_id,node);
+ }
+
+}
+
+rte_mbuf_t * on_node_generate_mbuf(uint8_t plugin_id,CGenNode * node,CFlowPktInfo * pkt_info){
+ rte_mbuf_t * m;
+ assert(CPluginCallback::callback);
+ m=CPluginCallback::callback->on_node_generate_mbuf(plugin_id,node,pkt_info);
+ assert(m);
+ return(m);
+}
+
+
+class CPlugin_rtsp : public CTcpSeq {
+public:
+ void * m_gen;
+ uint16_t rtp_client_0;
+ uint16_t rtp_client_1;
+};
+
+
+void CPluginCallbackSimple::on_node_first(uint8_t plugin_id,
+ CGenNode * node,
+ CFlowYamlInfo * template_info,
+ CTupleTemplateGeneratorSmart * tuple_gen,
+ CFlowGenListPerThread * flow_gen ){
+ //printf(" on on_node_first callback %d node %x! \n",(int)plugin_id,node);
+ /* generate 2 ports from client side */
+
+ if ( (plugin_id == mpRTSP) || (plugin_id == mpSIP_VOICE) ) {
+ CPlugin_rtsp * lpP=new CPlugin_rtsp();
+ assert(lpP);
+
+ /* TBD need to be fixed using new API */
+ lpP->rtp_client_0 = tuple_gen->GenerateOneSourcePort();
+ lpP->rtp_client_1 = tuple_gen->GenerateOneSourcePort();
+ lpP->m_gen=flow_gen;
+ node->m_plugin_info = (void *)lpP;
+ }else{
+ if (plugin_id ==mpDYN_PYLOAD) {
+ /* nothing to do */
+ }else{
+ if (plugin_id ==mpAVL_HTTP_BROWSIN) {
+ CTcpSeq * lpP=new CTcpSeq();
+ assert(lpP);
+ node->m_plugin_info = (void *)lpP;
+ }else{
+ /* do not support this */
+ assert(0);
+ }
+ }
+ }
+}
+
+void CPluginCallbackSimple::on_node_last(uint8_t plugin_id,CGenNode * node){
+ //printf(" on on_node_last callback %d %x! \n",(int)plugin_id,node);
+ if ( (plugin_id == mpRTSP) || (plugin_id == mpSIP_VOICE) ) {
+ CPlugin_rtsp * lpP=(CPlugin_rtsp * )node->m_plugin_info;
+ /* free the ports */
+ CFlowGenListPerThread * flow_gen=(CFlowGenListPerThread *) lpP->m_gen;
+ bool is_tcp=node->m_pkt_info->m_pkt_indication.m_desc.IsTcp();
+ flow_gen->defer_client_port_free(is_tcp,node->m_src_idx,lpP->rtp_client_0,
+ node->m_template_info->m_client_pool_idx,node->m_tuple_gen);
+ flow_gen->defer_client_port_free(is_tcp,node->m_src_idx,lpP->rtp_client_1,
+ node->m_template_info->m_client_pool_idx, node->m_tuple_gen);
+
+ assert(lpP);
+ delete lpP;
+ node->m_plugin_info=0;
+ }else{
+ if (plugin_id ==mpDYN_PYLOAD) {
+ /* nothing to do */
+ }else{
+ if (plugin_id ==mpAVL_HTTP_BROWSIN) {
+ /* nothing to do */
+ CTcpSeq * lpP=(CTcpSeq * )node->m_plugin_info;
+ delete lpP;
+ node->m_plugin_info=0;
+ }else{
+ /* do not support this */
+ assert(0);
+ }
+ }
+ }
+}
+
+rte_mbuf_t * CPluginCallbackSimple::http_plugin(uint8_t plugin_id,
+ CGenNode * node,
+ CFlowPktInfo * pkt_info){
+ CPacketDescriptor * lpd=&pkt_info->m_pkt_indication.m_desc;
+ assert(lpd->getFlowId()==0); /* only one flow */
+ CMiniVMCmdBase * program[2];
+ CMiniVMReplaceIP replace_cmd;
+ CMiniVMCmdBase eop_cmd;
+ CTcpSeq * lpP=(CTcpSeq * )node->m_plugin_info;
+ assert(lpP);
+ rte_mbuf_t *mbuf;
+ int16_t s_size=0;
+
+ if ( likely (lpd->getFlowPktNum() != 3) ){
+ if (unlikely (CGlobalInfo::is_ipv6_enable()) ) {
+ // Request a larger initial segment for IPv6
+ mbuf = pkt_info->do_generate_new_mbuf_big(node);
+ }else{
+ mbuf = pkt_info->do_generate_new_mbuf(node);
+ }
+
+ }else{
+ CFlowInfo flow_info;
+ flow_info.vm_program=0;
+
+ flow_info.client_ip = node->m_src_ip;
+ flow_info.server_ip = node->m_dest_ip;
+ flow_info.client_port = node->m_src_port;
+ flow_info.server_port = 0;
+ flow_info.replace_server_port =false;
+ flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
+ flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
+
+
+ replace_cmd.m_cmd = VM_REPLACE_IP_OFFSET;
+ replace_cmd.m_flags = 0;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest IP address. There is a single address
+ // string of 8 bytes that needs to be replaced.
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ // For IPv6, accomodate use of brackets (+2 bytes)
+ replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 8;
+
+ // Mark as IPv6 and set the upper 96-bits
+ replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
+ for (uint8_t idx=0; idx<6; idx++){
+ replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
+ }
+ } else {
+ replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 8;
+ }
+
+ // Set m_start_0/m_stop_1 at start/end of IP address to be replaced.
+ // For this packet we know the IP addr string length is 8 bytes.
+ replace_cmd.m_start_0 = 10+16;
+ replace_cmd.m_stop_1 = replace_cmd.m_start_0 + 8;
+
+ replace_cmd.m_server_ip.v4 = flow_info.server_ip;
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &replace_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+
+ mbuf = pkt_info->do_generate_new_mbuf_ex_vm(node,&flow_info, &s_size);
+ }
+
+ // Fixup the TCP sequence numbers
+ uint8_t *p=rte_pktmbuf_mtod(mbuf, uint8_t*);
+
+ // Update TCP sequence numbers
+ lpP->update(p, pkt_info, s_size);
+
+ return(mbuf);
+}
+
+rte_mbuf_t * CPluginCallbackSimple::dyn_pyload_plugin(uint8_t plugin_id,
+ CGenNode * node,
+ CFlowPktInfo * pkt_info){
+
+ CMiniVMCmdBase * program[2];
+
+ CMiniVMDynPyload dyn_cmd;
+ CMiniVMCmdBase eop_cmd;
+
+ CPacketDescriptor * lpd=&pkt_info->m_pkt_indication.m_desc;
+ CFlowYamlDynamicPyloadPlugin * lpt = node->m_template_info->m_dpPkt;
+ assert(lpt);
+ CFlowInfo flow_info;
+ flow_info.vm_program=0;
+ int16_t s_size=0;
+
+ // IPv6 packets are not supported
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ fprintf (stderr," IPv6 is not supported for dynamic pyload change\n");
+ exit(-1);
+ }
+
+ if ( lpd->getFlowId() == 0 ) {
+
+ flow_info.client_ip = node->m_src_ip;
+ flow_info.server_ip = node->m_dest_ip;
+ flow_info.client_port = node->m_src_port;
+ flow_info.server_port = 0;
+ flow_info.replace_server_port =false;
+ flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
+ flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
+
+ uint32_t pkt_num = lpd->getFlowPktNum();
+ if (pkt_num < 253) {
+ int i;
+ /* fast filter */
+ for (i=0; i<lpt->m_num; i++) {
+ if (lpt->m_pkt_ids[i] == pkt_num ) {
+ //add a program here
+ dyn_cmd.m_cmd = VM_DYN_PYLOAD;
+ dyn_cmd.m_ptr= &lpt->m_program[i];
+ dyn_cmd.m_flags = 0;
+ dyn_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 8;
+ dyn_cmd.m_ip.v4=node->m_src_ip;
+
+ eop_cmd.m_cmd = VM_EOP;
+ program[0] = &dyn_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ }
+ }
+ // only for the first flow
+ }else{
+ fprintf (stderr," only one flow is allowed for dynamic pyload change \n");
+ exit(-1);
+ }/* only for the first flow */
+
+ if ( unlikely( flow_info.vm_program != 0 ) ) {
+
+ return ( pkt_info->do_generate_new_mbuf_ex_vm(node,&flow_info, &s_size) );
+ }else{
+ return ( pkt_info->do_generate_new_mbuf_ex(node,&flow_info) );
+ }
+}
+
+rte_mbuf_t * CPluginCallbackSimple::sip_voice_plugin(uint8_t plugin_id,CGenNode * node,CFlowPktInfo * pkt_info){
+ CMiniVMCmdBase * program[2];
+
+ CMiniVMReplaceIP_PORT_IP_IP_Port via_replace_cmd;
+ CMiniVMCmdBase eop_cmd;
+
+ CPacketDescriptor * lpd=&pkt_info->m_pkt_indication.m_desc;
+ CPlugin_rtsp * lpP=(CPlugin_rtsp * )node->m_plugin_info;
+ assert(lpP);
+ // printf(" %d %d \n",lpd->getFlowId(),lpd->getFlowPktNum());
+ CFlowInfo flow_info;
+ flow_info.vm_program=0;
+ int16_t s_size=0;
+
+ switch ( lpd->getFlowId() ) {
+ /* flow - SIP , packet #0,#1 control */
+ case 0:
+ flow_info.client_ip = node->m_src_ip;
+ flow_info.server_ip = node->m_dest_ip;
+ flow_info.client_port = node->m_src_port;
+ flow_info.server_port = 0;
+ flow_info.replace_server_port =false;
+ flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
+ flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
+
+
+ /* program to replace ip server */
+ switch ( lpd->getFlowPktNum() ) {
+ case 0:
+ {
+ via_replace_cmd.m_cmd = VM_REPLACE_IPVIA_IP_IP_PORT;
+ via_replace_cmd.m_flags = 0;
+ via_replace_cmd.m_start_0 = 0;
+ via_replace_cmd.m_stop_1 = 0;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest IP address. There are 3 address
+ // strings (each 9 bytes) that needs to be replaced.
+ // We also need to accomodate IPv6 use of brackets
+ // (+2 bytes) in a URI.
+ // There are also 2 port strings that needs to be
+ // replaced (1 is 4 bytes the other is 5 bytes).
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ via_replace_cmd.m_add_pkt_len = (((INET6_ADDRSTRLEN + 2) - 9) * 3) +
+ ((INET_PORTSTRLEN * 2) - 9);
+
+ // Mark as IPv6 and set the upper 96-bits
+ via_replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
+ for (uint8_t idx=0; idx<6; idx++){
+ via_replace_cmd.m_ip.v6[idx] = CGlobalInfo::m_options.m_src_ipv6[idx];
+ via_replace_cmd.m_ip_via.v6[idx] = CGlobalInfo::m_options.m_src_ipv6[idx];
+ }
+ } else {
+ via_replace_cmd.m_add_pkt_len = ((INET_ADDRSTRLEN - 9) * 3) +
+ ((INET_PORTSTRLEN * 2) - 9);
+ }
+ via_replace_cmd.m_ip.v4 =node->m_src_ip;
+ via_replace_cmd.m_ip0_start = 377;
+ via_replace_cmd.m_ip0_stop = 377+9;
+
+ via_replace_cmd.m_ip1_start = 409;
+ via_replace_cmd.m_ip1_stop = 409+9;
+
+
+ via_replace_cmd.m_port =lpP->rtp_client_0;
+ via_replace_cmd.m_port_start = 435;
+ via_replace_cmd.m_port_stop = 435+5;
+
+ via_replace_cmd.m_ip_via.v4 = node->m_src_ip;
+ via_replace_cmd.m_port_via = node->m_src_port;
+
+ via_replace_cmd.m_ip_via_start = 208;
+ via_replace_cmd.m_ip_via_stop = 208+9+5;
+
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &via_replace_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+ case 1:
+ {
+ via_replace_cmd.m_cmd = VM_REPLACE_IPVIA_IP_IP_PORT;
+ via_replace_cmd.m_flags = 0;
+ via_replace_cmd.m_start_0 = 0;
+ via_replace_cmd.m_stop_1 = 0;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest IP address. There are 3 address
+ // strings (each 9 bytes) that needs to be replaced.
+ // We also need to accomodate IPv6 use of brackets
+ // (+2 bytes) in a URI.
+ // There are also 2 port strings that needs to be
+ // replaced (1 is 4 bytes the other is 5 bytes).
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ via_replace_cmd.m_add_pkt_len = (((INET6_ADDRSTRLEN + 2) - 9) * 3) +
+ ((INET_PORTSTRLEN * 2) - 9);
+
+ // Mark as IPv6 and set the upper 96-bits
+ via_replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
+ for (uint8_t idx=0; idx<6; idx++){
+ via_replace_cmd.m_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
+ via_replace_cmd.m_ip_via.v6[idx] = CGlobalInfo::m_options.m_src_ipv6[idx];
+ }
+ } else {
+ via_replace_cmd.m_add_pkt_len = ((INET_ADDRSTRLEN - 9) * 3) +
+ ((INET_PORTSTRLEN * 2) - 9);
+ }
+
+ via_replace_cmd.m_ip.v4 =node->m_dest_ip;
+ via_replace_cmd.m_ip0_start = 370;
+ via_replace_cmd.m_ip0_stop = 370+8;
+
+ via_replace_cmd.m_ip1_start = 401;
+ via_replace_cmd.m_ip1_stop = 401+8;
+
+
+ via_replace_cmd.m_port =lpP->rtp_client_0;
+ via_replace_cmd.m_port_start = 426;
+ via_replace_cmd.m_port_stop = 426+5;
+
+
+ via_replace_cmd.m_ip_via.v4 = node->m_src_ip;
+ via_replace_cmd.m_port_via = node->m_src_port;
+
+ via_replace_cmd.m_ip_via_start = 207;
+ via_replace_cmd.m_ip_via_stop = 207+9+5;
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &via_replace_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+
+
+ }/* end of big switch on packet */
+ break;
+
+ case 1:
+ flow_info.client_ip = node->m_src_ip ;
+ flow_info.server_ip = node->m_dest_ip;
+ flow_info.client_port = lpP->rtp_client_0;
+ /* this is tricky ..*/
+ flow_info.server_port = lpP->rtp_client_0;
+ flow_info.replace_server_port = true;
+ flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
+ flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
+
+ break;
+ default:
+ assert(0);
+ break;
+ };
+
+ //printf(" c_ip:%x s_ip:%x c_po:%x s_po:%x init:%x replace:%x \n",flow_info.client_ip,flow_info.server_ip,flow_info.client_port,flow_info.server_port,flow_info.is_init_dir,flow_info.replace_server_port);
+
+ //printf(" program %p \n",flow_info.vm_program);
+ if ( unlikely( flow_info.vm_program != 0 ) ) {
+
+ return ( pkt_info->do_generate_new_mbuf_ex_vm(node,&flow_info, &s_size) );
+ }else{
+ return ( pkt_info->do_generate_new_mbuf_ex(node,&flow_info) );
+ }
+}
+
+rte_mbuf_t * CPluginCallbackSimple::rtsp_plugin(uint8_t plugin_id,CGenNode * node,CFlowPktInfo * pkt_info){
+
+ CMiniVMCmdBase * program[2];
+
+ CMiniVMReplaceIP replace_cmd;
+ CMiniVMCmdBase eop_cmd;
+ CMiniVMReplaceIPWithPort replace_port_cmd;
+ rte_mbuf_t *mbuf;
+
+ CPacketDescriptor * lpd=&pkt_info->m_pkt_indication.m_desc;
+ CPlugin_rtsp * lpP=(CPlugin_rtsp * )node->m_plugin_info;
+
+ assert(lpP);
+ // printf(" %d %d \n",lpd->getFlowId(),lpd->getFlowPktNum());
+ CFlowInfo flow_info;
+ flow_info.vm_program=0;
+ int16_t s_size=0;
+
+ switch ( lpd->getFlowId() ) {
+ /* flow - control */
+ case 0:
+ flow_info.client_ip = node->m_src_ip;
+ flow_info.server_ip = node->m_dest_ip;
+ flow_info.client_port = node->m_src_port;
+ flow_info.server_port = 0;
+ flow_info.replace_server_port =false;
+ flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
+ flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
+
+
+ /* program to replace ip server */
+ switch ( lpd->getFlowPktNum() ) {
+ case 3:
+ {
+ replace_cmd.m_cmd = VM_REPLACE_IP_OFFSET;
+ replace_cmd.m_flags = 0;
+ replace_cmd.m_start_0 = 16;
+ replace_cmd.m_stop_1 = 16+9;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest IP address. There is a single address
+ // string of 9 bytes that needs to be replaced.
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ // For IPv6, accomodate use of brackets (+2 bytes)
+ replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
+
+ // Mark as IPv6 and set the upper 96-bits
+ replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
+ for (uint8_t idx=0; idx<6; idx++){
+ replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
+ }
+ } else {
+ replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
+ }
+ replace_cmd.m_server_ip.v4 = flow_info.server_ip;
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &replace_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+ case 4:
+ {
+ replace_cmd.m_cmd = VM_REPLACE_IP_OFFSET;
+ replace_cmd.m_flags = 0;
+ replace_cmd.m_start_0 = 46;
+ replace_cmd.m_stop_1 = 46+9;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest IP address. There is a single address
+ // string of 9 bytes that needs to be replaced.
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ // For IPv6, accomodate use of brackets (+2 bytes)
+ replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
+
+ // Mark as IPv6 and set the upper 96-bits
+ replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
+ for (uint8_t idx=0; idx<6; idx++){
+ replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
+ }
+ } else {
+ replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
+ }
+ replace_cmd.m_server_ip.v4 = flow_info.server_ip;
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &replace_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+
+ case 5:
+ {
+
+ replace_port_cmd.m_cmd = VM_REPLACE_IP_PORT_OFFSET;
+ replace_port_cmd.m_flags = 0;
+ replace_port_cmd.m_start_0 = 13;
+ replace_port_cmd.m_stop_1 = 13+9;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest IP address. There is a single address
+ // string of 9 bytes that needs to be replaced.
+ // There are also 2 port strings (8 bytes) that needs to be
+ // replaced.
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ // For IPv6, accomodate use of brackets (+2 bytes)
+ replace_port_cmd.m_add_pkt_len = ((INET6_ADDRSTRLEN + 2) - 9) +
+ ((INET_PORTSTRLEN * 2) - 8);
+
+ // Mark as IPv6 and set the upper 96-bits
+ replace_port_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
+ for (uint8_t idx=0; idx<6; idx++){
+ replace_port_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
+ }
+ } else {
+ replace_port_cmd.m_add_pkt_len = (INET_ADDRSTRLEN - 9) +
+ ((INET_PORTSTRLEN * 2) - 8);
+ }
+ replace_port_cmd.m_server_ip.v4 = flow_info.server_ip;
+ replace_port_cmd.m_start_port = 164;
+ replace_port_cmd.m_stop_port = 164+(4*2)+1;
+ replace_port_cmd.m_client_port = lpP->rtp_client_0;
+ replace_port_cmd.m_server_port =0;
+
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &replace_port_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+
+ case 6:
+ {
+
+ replace_port_cmd.m_cmd = VM_REPLACE_IP_PORT_RESPONSE_OFFSET;
+ replace_port_cmd.m_flags = 0;
+ replace_port_cmd.m_start_0 = 0;
+ replace_port_cmd.m_stop_1 = 0;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest port addresses. There are 4 port address
+ // strings (16 bytes) that needs to be replaced.
+ replace_port_cmd.m_add_pkt_len = ((INET_PORTSTRLEN * 4) - 16);
+
+ replace_port_cmd.m_server_ip.v4 = flow_info.server_ip;
+ replace_port_cmd.m_start_port = 247;
+ replace_port_cmd.m_stop_port = 247+(4*4)+2+13;
+ replace_port_cmd.m_client_port = lpP->rtp_client_0;
+ replace_port_cmd.m_server_port = lpP->rtp_client_0;
+
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &replace_port_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+
+
+ case 7:
+ {
+
+ replace_port_cmd.m_cmd = VM_REPLACE_IP_PORT_OFFSET;
+ replace_port_cmd.m_flags = 0;
+ replace_port_cmd.m_start_0 = 13;
+ replace_port_cmd.m_stop_1 = 13+9;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest IP address. There is a single address
+ // string of 9 bytes that needs to be replaced.
+ // There are also 2 port strings (8 bytes) that needs to be
+ // replaced.
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ // For IPv6, accomodate use of brackets (+2 bytes)
+ replace_port_cmd.m_add_pkt_len = ((INET6_ADDRSTRLEN + 2) - 9) +
+ ((INET_PORTSTRLEN * 2) - 8);
+
+ // Mark as IPv6 and set the upper 96-bits
+ replace_port_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
+ for (uint8_t idx=0; idx<6; idx++){
+ replace_port_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
+ }
+ } else {
+ replace_port_cmd.m_add_pkt_len = (INET_ADDRSTRLEN - 9) +
+ ((INET_PORTSTRLEN * 2) - 8);
+ }
+ replace_port_cmd.m_server_ip.v4 = flow_info.server_ip;
+ replace_port_cmd.m_start_port = 164;
+ replace_port_cmd.m_stop_port = 164+(4*2)+1;
+ replace_port_cmd.m_client_port = lpP->rtp_client_1;
+ replace_port_cmd.m_server_port =0;
+
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &replace_port_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+
+ case 8:
+
+ {
+
+ replace_port_cmd.m_cmd = VM_REPLACE_IP_PORT_RESPONSE_OFFSET;
+ replace_port_cmd.m_flags = 0;
+ replace_port_cmd.m_start_0 = 0;
+ replace_port_cmd.m_stop_1 = 0;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest port addresses. There are 4 port address
+ // strings (16 bytes) that needs to be replaced.
+ replace_port_cmd.m_add_pkt_len = ((INET_PORTSTRLEN * 4) - 16);
+
+ replace_port_cmd.m_server_ip.v4 = flow_info.server_ip;
+ replace_port_cmd.m_start_port = 247;
+ replace_port_cmd.m_stop_port = 247+(4*4)+2+13;
+ replace_port_cmd.m_client_port = lpP->rtp_client_1;
+ replace_port_cmd.m_server_port = lpP->rtp_client_1;
+
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &replace_port_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+
+ /* PLAY */
+ case 9:
+ {
+
+ replace_cmd.m_cmd = VM_REPLACE_IP_OFFSET;
+ replace_cmd.m_flags = 0;
+ replace_cmd.m_start_0 = 12;
+ replace_cmd.m_stop_1 = 12+9;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest IP address. There is a single address
+ // string of 9 bytes that needs to be replaced.
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ // For IPv6, accomodate use of brackets (+2 bytes)
+ replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
+
+ // Mark as IPv6 and set the upper 96-bits
+ replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
+ for (uint8_t idx=0; idx<6; idx++){
+ replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
+ }
+ } else {
+ replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
+ }
+ replace_cmd.m_server_ip.v4 = flow_info.server_ip;
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &replace_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+
+ /*OPTION 0*/
+ case 12:
+ {
+
+ replace_cmd.m_cmd = VM_REPLACE_IP_OFFSET;
+ replace_cmd.m_flags = 0;
+ replace_cmd.m_start_0 = 15;
+ replace_cmd.m_stop_1 = 15+9;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest IP address. There is a single address
+ // string of 9 bytes that needs to be replaced.
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ // For IPv6, accomodate use of brackets (+2 bytes)
+ replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
+
+ // Mark as IPv6 and set the upper 96-bits
+ replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
+ for (uint8_t idx=0; idx<6; idx++){
+ replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
+ }
+ } else {
+ replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
+ }
+ replace_cmd.m_server_ip.v4 = flow_info.server_ip;
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &replace_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+
+ /* option #2*/
+ case 15:
+ {
+
+ replace_cmd.m_cmd = VM_REPLACE_IP_OFFSET;
+ replace_cmd.m_flags = 0;
+ replace_cmd.m_start_0 = 15;
+ replace_cmd.m_stop_1 = 15+9;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest IP address. There is a single address
+ // string of 9 bytes that needs to be replaced.
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ // For IPv6, accomodate use of brackets (+2 bytes)
+ replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
+
+ // Mark as IPv6 and set the upper 96-bits
+ replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
+ for (uint8_t idx=0; idx<6; idx++){
+ replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
+ }
+ } else {
+ replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
+ }
+ replace_cmd.m_server_ip.v4 = flow_info.server_ip;
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &replace_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+
+ case 18:
+ {
+
+ replace_cmd.m_cmd = VM_REPLACE_IP_OFFSET;
+ replace_cmd.m_flags = 0;
+ replace_cmd.m_start_0 = 16;
+ replace_cmd.m_stop_1 = 16+9;
+
+ // Determine how much larger the packet needs to be to
+ // handle the largest IP address. There is a single address
+ // string of 9 bytes that needs to be replaced.
+ if (CGlobalInfo::is_ipv6_enable() ) {
+ // For IPv6, accomodate use of brackets (+2 bytes)
+ replace_cmd.m_add_pkt_len = (INET6_ADDRSTRLEN + 2) - 9;
+
+ // Mark as IPv6 and set the upper 96-bits
+ replace_cmd.m_flags |= CMiniVMCmdBase::MIN_VM_V6;
+ for (uint8_t idx=0; idx<6; idx++){
+ replace_cmd.m_server_ip.v6[idx] = CGlobalInfo::m_options.m_dst_ipv6[idx];
+ }
+ } else {
+ replace_cmd.m_add_pkt_len = INET_ADDRSTRLEN - 9;
+ }
+ replace_cmd.m_server_ip.v4 = flow_info.server_ip;
+
+ eop_cmd.m_cmd = VM_EOP;
+
+ program[0] = &replace_cmd;
+ program[1] = &eop_cmd;
+
+ flow_info.vm_program = program;
+ }
+ break;
+
+
+ }/* end of big switch on packet */
+ break;
+
+ case 1:
+ flow_info.client_ip = node->m_src_ip ;
+ flow_info.server_ip = node->m_dest_ip;
+ flow_info.client_port = lpP->rtp_client_0;
+ /* this is tricky ..*/
+ flow_info.server_port = lpP->rtp_client_0;
+ flow_info.replace_server_port = true;
+ flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
+ flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
+
+ break;
+ case 2:
+ flow_info.client_ip = node->m_src_ip ;
+ flow_info.server_ip = node->m_dest_ip;
+ flow_info.client_port = lpP->rtp_client_1;
+ /* this is tricky ..*/
+ flow_info.server_port = lpP->rtp_client_1;
+ flow_info.replace_server_port =true;
+ flow_info.is_init_ip_dir = (node->cur_pkt_ip_addr_dir() == CLIENT_SIDE?true:false);
+ flow_info.is_init_port_dir = (node->cur_pkt_port_addr_dir() ==CLIENT_SIDE?true:false);
+
+ break;
+ default:
+ assert(0);
+ break;
+ };
+
+ //printf(" c_ip:%x s_ip:%x c_po:%x s_po:%x init:%x replace:%x \n",flow_info.client_ip,flow_info.server_ip,flow_info.client_port,flow_info.server_port,flow_info.is_init_dir,flow_info.replace_server_port);
+
+ //printf(" program %p \n",flow_info.vm_program);
+ if ( unlikely( flow_info.vm_program != 0 ) ) {
+
+ mbuf = pkt_info->do_generate_new_mbuf_ex_vm(node,&flow_info, &s_size);
+ }else{
+ if (unlikely (CGlobalInfo::is_ipv6_enable()) ) {
+ // Request a larger initial segment for IPv6
+ mbuf = pkt_info->do_generate_new_mbuf_ex_big(node,&flow_info);
+ }else{
+ mbuf = pkt_info->do_generate_new_mbuf_ex(node,&flow_info);
+ }
+ }
+
+ // Fixup the TCP sequence numbers for the TCP flow
+ if ( lpd->getFlowId() == 0 ) {
+ uint8_t *p=rte_pktmbuf_mtod(mbuf, uint8_t*);
+
+ // Update TCP sequence numbers
+ lpP->update(p, pkt_info, s_size);
+ }
+
+ return(mbuf);
+}
+
+
+/* replace the tuples */
+rte_mbuf_t * CPluginCallbackSimple::on_node_generate_mbuf(uint8_t plugin_id,CGenNode * node,CFlowPktInfo * pkt_info){
+
+ rte_mbuf_t * m=NULL;
+ switch (plugin_id) {
+ case mpRTSP:
+ m=rtsp_plugin(plugin_id,node,pkt_info);
+ break;
+ case mpSIP_VOICE:
+ m=sip_voice_plugin(plugin_id,node,pkt_info);
+ break;
+ case mpDYN_PYLOAD:
+ m=dyn_pyload_plugin(plugin_id,node,pkt_info);
+ break;
+ case mpAVL_HTTP_BROWSIN:
+ m=http_plugin(plugin_id,node,pkt_info);
+ break;
+ default:
+ assert(0);
+ }
+ return (m);
+}
+
+
+int CMiniVM::mini_vm_run(CMiniVMCmdBase * cmds[]){
+
+ m_new_pkt_size=0;
+ bool need_to_stop=false;
+ int cnt=0;
+ CMiniVMCmdBase * cmd=cmds[cnt];
+ while (! need_to_stop) {
+ switch (cmd->m_cmd) {
+ case VM_REPLACE_IP_OFFSET:
+ mini_vm_replace_ip((CMiniVMReplaceIP *)cmd);
+ break;
+ case VM_REPLACE_IP_PORT_OFFSET:
+ mini_vm_replace_port_ip((CMiniVMReplaceIPWithPort *)cmd);
+ break;
+ case VM_REPLACE_IP_PORT_RESPONSE_OFFSET:
+ mini_vm_replace_ports((CMiniVMReplaceIPWithPort *)cmd);
+ break;
+
+ case VM_REPLACE_IP_IP_PORT:
+ mini_vm_replace_ip_ip_ports((CMiniVMReplaceIP_IP_Port * )cmd);
+ break;
+
+ case VM_REPLACE_IPVIA_IP_IP_PORT:
+ mini_vm_replace_ip_via_ip_ip_ports((CMiniVMReplaceIP_PORT_IP_IP_Port *)cmd);
+ break;
+
+ case VM_DYN_PYLOAD:
+ mini_vm_dyn_payload((CMiniVMDynPyload *)cmd);
+ break;
+
+ case VM_EOP:
+ need_to_stop=true;
+ break;
+ default:
+ printf(" vm cmd %d does not exist \n",cmd->m_cmd);
+ assert(0);
+ }
+ cnt++;
+ cmd=cmds[cnt];
+ }
+ return (0);
+}
+
+inline int cp_pkt_len(char *to,char *from,uint16_t from_offset,uint16_t len){
+ memcpy(to, from+from_offset , len);
+ return (len);
+}
+
+/* not including the to_offset
+
+ 0 1
+ x
+
+*/
+inline int cp_pkt_to_from(char *to,char *from,uint16_t from_offset,uint16_t to_offset){
+ memcpy(to, from+from_offset , to_offset-from_offset) ;
+ return (to_offset-from_offset);
+}
+
+
+int CMiniVM::mini_vm_dyn_payload( CMiniVMDynPyload * cmd){
+ /* copy all the packet */
+ CFlowYamlDpPkt * dyn=(CFlowYamlDpPkt *)cmd->m_ptr;
+ uint16_t l7_offset = m_pkt_info->m_pkt_indication.getFastPayloadOffset();
+ uint16_t len = m_pkt_info->m_packet->pkt_len - l7_offset ;
+ char * original_l7_ptr=m_pkt_info->m_packet->raw+l7_offset;
+ char * p=m_pyload_mbuf_ptr;
+ /* copy payload */
+ memcpy(p,original_l7_ptr,len);
+ if ( ( dyn->m_pyld_offset+ (dyn->m_len*4)) < ( len-4) ){
+ // we can change the packet
+ int i;
+ uint32_t *l=(uint32_t *)(p+dyn->m_pyld_offset);
+ for (i=0; i<dyn->m_len; i++) {
+ if ( dyn->m_type==0 ) {
+ *l=(rand() & dyn->m_pkt_mask);
+ }else if (dyn->m_type==1){
+ *l=(PKT_NTOHL(cmd->m_ip.v4) & dyn->m_pkt_mask);
+ }
+ l++;
+ }
+
+ }
+
+ // Return packet size which hasn't changed
+ m_new_pkt_size = m_pkt_info->m_packet->pkt_len;
+
+ return (0);
+}
+
+
+int CMiniVM::mini_vm_replace_ip_via_ip_ip_ports(CMiniVMReplaceIP_PORT_IP_IP_Port * cmd){
+ uint16_t l7_offset = m_pkt_info->m_pkt_indication.getFastPayloadOffset();
+ uint16_t len = m_pkt_info->m_packet->pkt_len - l7_offset;
+ char * original_l7_ptr=m_pkt_info->m_packet->raw+l7_offset;
+ char * p=m_pyload_mbuf_ptr;
+
+ p+=cp_pkt_to_from(p,original_l7_ptr,
+ 0,
+ cmd->m_ip_via_start);
+
+ if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
+ p+=ipv6_to_str(&cmd->m_ip_via,p);
+ } else {
+ p+=ip_to_str(cmd->m_ip_via.v4,p);
+ }
+ p+=sprintf(p,":%u",cmd->m_port_via);
+
+ /* up to the IP */
+ p+=cp_pkt_to_from(p,original_l7_ptr,
+ cmd->m_ip_via_stop,
+ cmd->m_ip0_start);
+ /*IP */
+ if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
+ p[-2] = '6';
+ p+=ipv6_to_str(&cmd->m_ip,p);
+ } else {
+ p+=ip_to_str(cmd->m_ip.v4,p);
+ }
+ /* up to IP 2 */
+ p+=cp_pkt_to_from(p, original_l7_ptr ,
+ cmd->m_ip0_stop,
+ cmd->m_ip1_start);
+ /* IP2 */
+ if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
+ p[-2] = '6';
+ p+=ipv6_to_str(&cmd->m_ip,p);
+ } else {
+ p+=ip_to_str(cmd->m_ip.v4,p);
+ }
+
+ /* up to port */
+ p+=cp_pkt_to_from(p, original_l7_ptr ,
+ cmd->m_ip1_stop,
+ cmd->m_port_start);
+ /* port */
+ p+=sprintf(p,"%u",cmd->m_port);
+
+ /* up to end */
+ p+=cp_pkt_to_from(p, original_l7_ptr ,
+ cmd->m_port_stop,
+ len);
+
+ // Determine new packet size
+ m_new_pkt_size= ((p+l7_offset) - m_pyload_mbuf_ptr);
+
+ return (0);
+}
+
+
+int CMiniVM::mini_vm_replace_ip_ip_ports(CMiniVMReplaceIP_IP_Port * cmd){
+ uint16_t l7_offset = m_pkt_info->m_pkt_indication.getFastPayloadOffset();
+ uint16_t len = m_pkt_info->m_packet->pkt_len - l7_offset;
+ char * original_l7_ptr=m_pkt_info->m_packet->raw+l7_offset;
+ char * p=m_pyload_mbuf_ptr;
+
+ /* up to the IP */
+ p+=cp_pkt_to_from(p,original_l7_ptr,
+ 0,
+ cmd->m_ip0_start);
+ /*IP */
+ if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
+ p+=ipv6_to_str(&cmd->m_ip,p);
+ } else {
+ p+=ip_to_str(cmd->m_ip.v4,p);
+ }
+ /* up to IP 2 */
+ p+=cp_pkt_to_from(p, original_l7_ptr ,
+ cmd->m_ip0_stop,
+ cmd->m_ip1_start);
+ /* IP2 */
+ if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
+ p+=ipv6_to_str(&cmd->m_ip,p);
+ } else {
+ p+=ip_to_str(cmd->m_ip.v4,p);
+ }
+
+ /* up to port */
+ p+=cp_pkt_to_from(p, original_l7_ptr ,
+ cmd->m_ip1_stop,
+ cmd->m_port_start);
+ /* port */
+ p+=sprintf(p,"%u",cmd->m_port);
+
+ /* up to end */
+ p+=cp_pkt_to_from(p, original_l7_ptr ,
+ cmd->m_port_stop,
+ len);
+
+ // Determine new packet size
+ m_new_pkt_size= ((p+l7_offset) - m_pyload_mbuf_ptr);
+
+ return (0);
+}
+
+int CMiniVM::mini_vm_replace_ports(CMiniVMReplaceIPWithPort * cmd){
+ uint16_t l7_offset = m_pkt_info->m_pkt_indication.getFastPayloadOffset();
+ uint16_t len = m_pkt_info->m_packet->pkt_len - l7_offset;
+ char * original_l7_ptr=m_pkt_info->m_packet->raw+l7_offset;
+
+ memcpy(m_pyload_mbuf_ptr, original_l7_ptr,cmd->m_start_port);
+ char * p=m_pyload_mbuf_ptr+cmd->m_start_port;
+ p+=sprintf(p,"%u-%u;server_port=%u-%u",cmd->m_client_port,cmd->m_client_port+1,cmd->m_server_port,cmd->m_server_port+1);
+ memcpy(p, original_l7_ptr+cmd->m_stop_port,len-cmd->m_stop_port);
+ p+=(len-cmd->m_stop_port);
+
+ // Determine new packet size
+ m_new_pkt_size= ((p+l7_offset) - m_pyload_mbuf_ptr);
+
+ return (0);
+}
+
+
+int CMiniVM::mini_vm_replace_port_ip(CMiniVMReplaceIPWithPort * cmd){
+ uint16_t l7_offset=m_pkt_info->m_pkt_indication.getFastPayloadOffset();
+ uint16_t len = m_pkt_info->m_packet->pkt_len - l7_offset - 0;
+ char * original_l7_ptr=m_pkt_info->m_packet->raw+l7_offset;
+
+ memcpy(m_pyload_mbuf_ptr, original_l7_ptr,cmd->m_start_0);
+ char *p=m_pyload_mbuf_ptr+cmd->m_start_0;
+ if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
+ p+=ipv6_to_str(&cmd->m_server_ip,p);
+ } else {
+ p+=ip_to_str(cmd->m_server_ip.v4,p);
+ }
+ /* copy until the port start offset */
+ int len1=cmd->m_start_port-cmd->m_stop_1 ;
+ memcpy(p, original_l7_ptr+cmd->m_stop_1,len1);
+ p+=len1;
+ p+=sprintf(p,"%u-%u",cmd->m_client_port,cmd->m_client_port+1);
+ memcpy(p, original_l7_ptr+cmd->m_stop_port,len-cmd->m_stop_port);
+ p+=len-cmd->m_stop_port;
+
+ // Determine new packet size
+ m_new_pkt_size= ((p+l7_offset) - m_pyload_mbuf_ptr);
+
+ return (0);
+}
+
+int CMiniVM::mini_vm_replace_ip(CMiniVMReplaceIP * cmd){
+ uint16_t l7_offset = m_pkt_info->m_pkt_indication.getFastPayloadOffset();
+ uint16_t len = m_pkt_info->m_packet->pkt_len - l7_offset;
+ char * original_l7_ptr = m_pkt_info->m_packet->raw+l7_offset;
+
+ memcpy(m_pyload_mbuf_ptr, original_l7_ptr,cmd->m_start_0);
+ char *p=m_pyload_mbuf_ptr+cmd->m_start_0;
+
+ int n_size=0;
+ if (cmd->m_flags & CMiniVMCmdBase::MIN_VM_V6) {
+ n_size=ipv6_to_str(&cmd->m_server_ip,p);
+ } else {
+ n_size=ip_to_str(cmd->m_server_ip.v4,p);
+ }
+ p+=n_size;
+ memcpy(p, original_l7_ptr+cmd->m_stop_1,len-cmd->m_stop_1);
+
+ // Determine new packet size
+ m_new_pkt_size= ((p+l7_offset+(len-cmd->m_stop_1)) - m_pyload_mbuf_ptr);
+
+ return (0);
+}
+
+
+void CFlowYamlDpPkt::Dump(FILE *fd){
+ fprintf(fd," pkt_id : %d \n",(int)m_pkt_id);
+ fprintf(fd," offset : %d \n",(int)m_pyld_offset);
+ fprintf(fd," offset : %d \n",(int)m_type);
+ fprintf(fd," len : %d \n",(int)m_len);
+ fprintf(fd," mask : 0x%x \n",(int)m_pkt_mask);
+}
+
+
+void CFlowYamlDynamicPyloadPlugin::Add(CFlowYamlDpPkt & fd){
+ if (m_num ==MAX_PYLOAD_PKT_CHANGE) {
+ fprintf (stderr,"ERROR can set only %d rules \n",MAX_PYLOAD_PKT_CHANGE);
+ exit(-1);
+ }
+ m_pkt_ids[m_num]=fd.m_pkt_id;
+ m_program[m_num]=fd;
+ m_num+=1;
+}
+
+void CFlowYamlDynamicPyloadPlugin::Dump(FILE *fd){
+ int i;
+ fprintf(fd," pkts :");
+ for (i=0; i<m_num; i++) {
+ fprintf(fd," %d ",m_pkt_ids[i]);
+ }
+ fprintf(fd,"\n");
+ for (i=0; i<m_num; i++) {
+ fprintf(fd," program : %d \n",i);
+ fprintf(fd,"---------------- \n");
+ m_program[i].Dump(fd);
+ }
+}
+
+/* free the right object.
+ it is classic to use virtual function but we can't do it here and we don't even want to use callback function
+ as we want to save space and in most cases there is nothing to free.
+ this might be changed in the future
+ */
+void CGenNodeBase::free_base(){
+ if ( m_type == FLOW_PKT ) {
+ CGenNode* p=(CGenNode*)this;
+ p->free_gen_node();
+ return;
+ }
+ if (m_type==STATELESS_PKT) {
+ CGenNodeStateless* p=(CGenNodeStateless*)this;
+ p->free_stl_node();
+ return;
+ }
+
+ if (m_type == PCAP_PKT) {
+ CGenNodePCAP *p = (CGenNodePCAP *)this;
+ p->destroy();
+ return;
+ }
+
+ if ( m_type == COMMAND ) {
+ CGenNodeCommand* p=(CGenNodeCommand*)this;
+ p->free_command();
+ }
+
+}