diff options
author | Hanoh Haim <hhaim@cisco.com> | 2016-12-07 15:24:38 +0200 |
---|---|---|
committer | Hanoh Haim <hhaim@cisco.com> | 2016-12-21 13:01:05 +0200 |
commit | eae78d4356b8834b78a91c52d869a7949f8f3e90 (patch) | |
tree | 184156f8e653adfa33eb0e70838f45d2a92355d0 /src/bp_sim.cpp | |
parent | 539de1c6af63071c1da9ed5db668c500f8993a03 (diff) |
improve Stateful scheduler
Signed-off-by: Hanoh Haim <hhaim@cisco.com>
Diffstat (limited to 'src/bp_sim.cpp')
-rwxr-xr-x | src/bp_sim.cpp | 381 |
1 files changed, 301 insertions, 80 deletions
diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp index 077bef63..080a6b5e 100755 --- a/src/bp_sim.cpp +++ b/src/bp_sim.cpp @@ -25,6 +25,7 @@ limitations under the License. #include "utl_yaml.h" #include "msg_manager.h" #include "trex_watchdog.h" +#include "utl_ipg_bucket.h" #include <common/basic_utils.h> @@ -1897,6 +1898,94 @@ void CFlowPktInfo::Dump(FILE *fd){ +void CCapFileFlowInfo::generate_flow(CTupleTemplateGeneratorSmart * tuple_gen, + CNodeGenerator * gen, + dsec_t time, + uint64_t flow_id, + CFlowYamlInfo * template_info, + CGenNode * node){ + dsec_t c_time = time; + + node->m_type=CGenNode::FLOW_PKT; + CTupleBase tuple; + tuple_gen->GenerateTuple(tuple); + + CFlowGenListPerThread * lpThread=gen->Parent(); + + /* add the first packet of the flow */ + CFlowPktInfo * lp=GetPacket((uint32_t)0); + + node->set_socket_id(gen->m_socket_id); + + node->m_thread_id = tuple_gen->GetThreadId(); + node->m_flow_id = (flow_id & (0x000fffffffffffffULL)) | + ( ((uint64_t)(tuple_gen->GetThreadId()& 0xff)) <<56 ) ; + + node->m_time = c_time; + node->m_pkt_info = lp; + node->m_flow_info = this; + node->m_flags=0; + node->m_template_info =template_info; + node->m_tuple_gen = tuple_gen->get_gen(); + node->m_src_ip= tuple.getClient(); + node->m_dest_ip = tuple.getServer(); + node->m_src_idx = tuple.getClientId(); + node->m_dest_idx = tuple.getServerId(); + node->m_src_port = tuple.getClientPort(); + node->m_client_cfg = tuple.getClientCfg(); + + node->m_plugin_info =(void *)0; + + if ( unlikely( CGlobalInfo::is_learn_mode() ) ){ + // check if flow is two direction + if ( lp->m_pkt_indication.m_desc.IsBiDirectionalFlow() ) { + /* we are in learn mode */ + lpThread->associate(((uint32_t)flow_id) & NAT_FLOW_ID_MASK, node); /* associate flow_id=>node */ + node->set_nat_first_state(); + } + } + + if ( unlikely( get_is_rx_check_mode()) ) { + if ( (CGlobalInfo::m_options.m_rx_check_sample == 1 ) || + ( ( rte_rand() % CGlobalInfo::m_options.m_rx_check_sample ) == 1 )){ + if (unlikely(!node->is_repeat_flow() )) { + node->set_rx_check(); + } + } + } + + if ( unlikely( CGlobalInfo::m_options.preview.getClientServerFlowFlipAddr() ) ){ + node->set_initiator_start_from_server_side_with_server_addr(node->is_eligible_from_server_side()); + }else{ + /* -p */ + if ( likely( CGlobalInfo::m_options.preview.getClientServerFlowFlip() ) ){ + node->set_initiator_start_from_server(node->is_eligible_from_server_side()); + node->set_all_flow_from_same_dir(true); + }else{ + /* --flip */ + if ( unlikely( CGlobalInfo::m_options.preview.getClientServerFlip() ) ){ + node->set_initiator_start_from_server(node->is_eligible_from_server_side()); + } + } + } + + + /* in case of plugin we need to call the callback */ + if ( template_info->m_plugin_id ) { + /* alloc the info , generate the ports */ + on_node_first(template_info->m_plugin_id,node,template_info,tuple_gen,gen->Parent() ); + } + + node->m_tmr.reset(); + + /* in case of noraml flow use TW */ + if (likely(node->m_type == CGenNode::FLOW_PKT)){ + lpThread->on_flow_tick<false>(node); /* tick packet */ + }else{ + gen->add_node(node); + } +} + void CCapFileFlowInfo::save_to_erf(std::string cap_file_name,int pcap){ if (Size() ==0) { @@ -2073,13 +2162,16 @@ enum CCapFileFlowInfo::load_cap_file_err CCapFileFlowInfo::is_valid_template_loa * 1. maximum aging * 2. per sub-flow pkt_num/max-pkt per dir and per global */ -void CCapFileFlowInfo::update_info(){ +void CCapFileFlowInfo::update_info(CFlowYamlInfo * flow_info){ flow_tmp_map_iter_t iter; flow_tmp_map_t ft; CTmpFlowInfo * lpFlow; int i; dsec_t ctime=0.0; + CCalcIpgDiff dtick_util(BUCKET_TIME_SEC); + + // first iteration, lern all the info into a temp flow table for (i=0; i<Size(); i++) { CFlowPktInfo * lp= GetPacket((uint32_t)i); @@ -2141,6 +2233,23 @@ void CCapFileFlowInfo::update_info(){ lpCurPacket->SetMaxPkts(lpFlow->m_per_dir[dir].m_pkt_id); lp->m_pkt_indication.m_desc.SetMaxPktsPerFlow(lpFlow->m_max_pkts); lp->m_pkt_indication.m_desc.SetMaxFlowTimeout(lpFlow->m_max_aging_sec); + + + + /* update dtick from ipg */ + double dtime=0; + + if ( likely ( lp->m_pkt_indication.m_desc.IsPcapTiming()) ){ + dtime = lp->m_pkt_indication.m_cap_ipg ; + }else{ + if ( lp->m_pkt_indication.m_desc.IsRtt() ){ + dtime = flow_info->m_rtt_sec ; + }else{ + dtime = flow_info->m_ipg_sec; + } + lp->m_pkt_indication.m_cap_ipg = dtime; + } + lp->m_pkt_indication.m_ticks = dtick_util.do_calc(dtime); } @@ -2351,6 +2460,8 @@ enum CCapFileFlowInfo::load_cap_file_err CCapFileFlowInfo::load_cap_file(std::st return kOK; } + + void CCapFileFlowInfo::update_pcap_mode(){ int i; for (i=0; i<(int)Size(); i++) { @@ -3188,7 +3299,7 @@ bool CFlowGeneratorRec::Create(CFlowYamlInfo * info, if (m_flow_info.is_valid_template_load_time() != 0) { return (false); } - m_flow_info.update_info(); + m_flow_info.update_info(m_info); return (true); }else{ return (false); @@ -3311,7 +3422,7 @@ int CNodeGenerator::update_stl_stats(CGenNodeStateless *node_sl){ } -int CNodeGenerator::update_stats(CGenNode * node){ +int CNodeGenerator::update_stats(CGenNode * node){ if ( m_preview_mode.getVMode() >2 ){ fprintf(stdout," %llu ,", (unsigned long long)m_cnt); node->Dump(stdout); @@ -3320,6 +3431,7 @@ int CNodeGenerator::update_stats(CGenNode * node){ return (0); } + bool CNodeGenerator::has_limit_reached() { /* do we have a limit and has it passed ? */ return ( (m_limit > 0) && (m_cnt >= m_limit) ); @@ -3347,7 +3459,6 @@ bool CFlowGenListPerThread::Create(uint32_t thread_id, char name[100]; sprintf(name,"nodes-%d",m_core_id); - //printf(" create thread %d %s socket: %d \n",m_core_id,name,socket_id); m_node_pool = utl_rte_mempool_create_non_pkt(name, CGlobalInfo::m_memory_cfg.get_each_core_dp_flows(), @@ -3356,7 +3467,8 @@ bool CFlowGenListPerThread::Create(uint32_t thread_id, 0 , socket_id); - //printf(" pool %p \n",m_node_pool); + m_tw.Create(TW_BUCKETS,3); + m_node_gen.Create(this); m_flow_id_to_node_lookup.Create(); @@ -3556,6 +3668,7 @@ void CFlowGenListPerThread::Delete(){ m_node_gen.Delete(); Clean(); m_cpu_cp_u.Delete(); + m_tw.Delete(); utl_rte_mempool_delete(m_node_pool); } @@ -3628,22 +3741,109 @@ inline bool CNodeGenerator::handle_stl_node(CGenNode * node, } + +#define unsafe_container_of(var,ptr, type, member) \ + ((type *) ((uint8_t *)(ptr) - offsetof(type, member))) + + +/*TEARDOWN is true for stateful in second phase we wait for all the flow to finish +with --nc there is no TEARDOWN + +first phase ==> TEARDOWN =false +last phase ==> TEARDOWN =true + +this is relevant for repeatable flows +*/ + +template<bool TEARDOWN> +inline void CFlowGenListPerThread::on_flow_tick(CGenNode *node){ + + #ifdef TREX_SIM + node->m_time=m_cur_time_sec; + #endif + #ifdef _DEBUG + m_node_gen.update_stats(node); + #endif + m_node_gen.flush_one_node_to_file(node); + + if ( likely (!node->is_repeat_flow()) ) { + if ( likely (!node->is_last_in_flow()) ) { + m_tw.timer_start(&node->m_tmr,node->update_next_pkt_in_flow_tw() ); + }else{ + free_last_flow_node( node); + } + }else{ + /* repeatable flow, we need to stop it in case of repeat */ + if ( node->is_last_in_flow() ) { + + if ( TEARDOWN == false ){ + node->m_time=m_cur_time_sec; /* update the node time as we schedule it */ + reschedule_flow(node); + }else{ + free_last_flow_node( node); + } + + }else{ + m_tw.timer_start(&node->m_tmr,node->update_next_pkt_in_flow_tw() ); + } + } +} + +#define GCC_DIAG_STR(s) #s +#define GCC_DIAG_JOINSTR(x,y) GCC_DIAG_STR(x ## y) +# define GCC_DIAG_DO_PRAGMA(x) _Pragma (#x) +# define GCC_DIAG_PRAGMA(x) GCC_DIAG_DO_PRAGMA(GCC diagnostic x) +#define GCC_DIAG_OFF(x) GCC_DIAG_PRAGMA(push) \ + GCC_DIAG_PRAGMA(ignored GCC_DIAG_JOINSTR(-W,x)) +#define GCC_DIAG_ON() GCC_DIAG_PRAGMA(pop) + +#define UNSAFE_CONTAINER_OF_PUSH GCC_DIAG_OFF(invalid-offsetof) +#define UNSAFE_CONTAINER_OF_POP GCC_DIAG_ON() + + + + +static void tw_on_tick_per_thread_cb_always(void *userdata, + CHTimerObj *tmr){ + CFlowGenListPerThread * thread=(CFlowGenListPerThread * )userdata; + UNSAFE_CONTAINER_OF_PUSH; + CGenNode * node=unsafe_container_of(node,tmr,CGenNode,m_tmr); + UNSAFE_CONTAINER_OF_POP; + + thread->on_flow_tick<true>(node); +} + + +void tw_on_tick_per_thread_cb(void *userdata, + CHTimerObj *tmr){ + CFlowGenListPerThread * thread=(CFlowGenListPerThread * )userdata; + + UNSAFE_CONTAINER_OF_PUSH; + CGenNode * node=unsafe_container_of(node,tmr,CGenNode,m_tmr); + UNSAFE_CONTAINER_OF_POP; + + thread->on_flow_tick<false>(node); +} + + inline bool CNodeGenerator::do_work_stl(CGenNode * node, - CFlowGenListPerThread * thread, - bool always){ + CFlowGenListPerThread * thread, + bool on_terminate){ if ( handle_stl_node(node,thread)){ return (false); }else{ - return (handle_slow_messages(node->m_type,node,thread,always)); + return (handle_slow_messages(node->m_type,node,thread,on_terminate)); } } + + + +template<bool ON_TERMINATE> inline bool CNodeGenerator::do_work_both(CGenNode * node, - CFlowGenListPerThread * thread, - dsec_t d_time, - bool always - ){ + CFlowGenListPerThread * thread, + dsec_t d_time){ bool exit_scheduler=false; uint8_t type=node->m_type; @@ -3651,48 +3851,53 @@ inline bool CNodeGenerator::do_work_both(CGenNode * node, if ( handle_stl_node (node,thread) ){ }else{ - if ( likely( type == CGenNode::FLOW_PKT ) ) { - /* PKT */ - if ( !(node->is_repeat_flow()) || (always==false)) { - flush_one_node_to_file(node); - #ifdef _DEBUG - update_stats(node); - #endif - } + if ( likely( type == CGenNode::TW_SYNC ) ) { m_p_queue.pop(); - if ( node->is_last_in_flow() ) { - if ((node->is_repeat_flow()) && (always==false)) { - /* Flow is repeated, reschedule it */ - thread->reschedule_flow( node); + /* update bucket time */ + thread->m_cur_time_sec = node->m_time; + if ( ON_TERMINATE ) { + thread->m_tw.on_tick((void*)thread,tw_on_tick_per_thread_cb_always); + if ( thread->m_tw.is_any_events_left() ){ + node->m_time += BUCKET_TIME_SEC; + m_p_queue.push(node); }else{ - /* Flow will not be repeated, so free node */ - thread->free_last_flow_node( node); + thread->free_node(node); } }else{ - node->update_next_pkt_in_flow(); + thread->m_tw.on_tick((void*)thread,tw_on_tick_per_thread_cb); + node->m_time += BUCKET_TIME_SEC;; m_p_queue.push(node); } - }else{ - if ((type == CGenNode::FLOW_FIF)) { - /* callback to our method */ - m_p_queue.pop(); - if ( always == false) { - thread->m_cur_time_sec = node->m_time ; - thread->generate_flows_roundrobin(&done); + }else{ - if (!done) { - node->m_time +=d_time; - m_p_queue.push(node); + if ( likely( type == CGenNode::FLOW_PKT ) ) { + /* PKT */ + m_p_queue.pop(); + thread->on_flow_tick<ON_TERMINATE>(node); + //printf(" MOVE from PKT->TW\n"); + }else{ + if ((type == CGenNode::FLOW_FIF)) { + /* callback to our method */ + m_p_queue.pop(); + if ( ON_TERMINATE == false) { + thread->m_cur_time_sec = node->m_time ; + + thread->generate_flows_roundrobin(&done); + + if (!done) { + node->m_time +=d_time; + m_p_queue.push(node); + }else{ + thread->free_node(node); + } }else{ thread->free_node(node); } + }else{ - thread->free_node(node); + exit_scheduler = handle_slow_messages(type,node,thread,ON_TERMINATE); } - - }else{ - exit_scheduler = handle_slow_messages(type,node,thread,always); } } } @@ -3702,18 +3907,16 @@ inline bool CNodeGenerator::do_work_both(CGenNode * node, -template<int SCH_MODE> +template<int SCH_MODE,bool ON_TERMINATE> inline bool CNodeGenerator::do_work(CGenNode * node, - CFlowGenListPerThread * thread, - dsec_t d_time, - bool always - ){ + CFlowGenListPerThread * thread, + dsec_t d_time){ /* template filter in compile time */ if ( SCH_MODE == smSTATELESS ) { - return ( do_work_stl(node,thread,always) ); + return ( do_work_stl(node,thread,ON_TERMINATE) ); }else{ /* smSTATEFUL */ - return ( do_work_both(node,thread,d_time,always) ); + return ( do_work_both<ON_TERMINATE>(node,thread,d_time) ); } } @@ -3741,9 +3944,9 @@ inline void CNodeGenerator::do_sleep(dsec_t & cur_time, inline int CNodeGenerator::teardown(CFlowGenListPerThread * thread, - bool always, - double &old_offset, - double offset){ + bool on_terminate, + double &old_offset, + double offset){ thread->m_cpu_dp_u.commit1(); @@ -3752,7 +3955,7 @@ inline int CNodeGenerator::teardown(CFlowGenListPerThread * thread, return (0); } - if (!always) { + if (!on_terminate) { old_offset =offset; }else{ // free the left other @@ -3763,17 +3966,16 @@ inline int CNodeGenerator::teardown(CFlowGenListPerThread * thread, -template<int SCH_MODE> +template<int SCH_MODE,bool ON_TERIMATE> inline int CNodeGenerator::flush_file_realtime(dsec_t max_time, dsec_t d_time, - bool always, CFlowGenListPerThread * thread, double &old_offset) { CGenNode * node; dsec_t offset=0.0; dsec_t cur_time; dsec_t n_time; - if (always) { + if (ON_TERIMATE) { offset=old_offset; }else{ add_exit_node(thread,max_time); @@ -3810,7 +4012,7 @@ inline int CNodeGenerator::flush_file_realtime(dsec_t max_time, int node_count = 0; do { - bool s=do_work<SCH_MODE>(node,thread,d_time,always); + bool s=do_work<SCH_MODE,ON_TERIMATE>(node,thread,d_time); if (s) { // can we remove this IF ? state=scTERMINATE; break; @@ -3842,7 +4044,7 @@ inline int CNodeGenerator::flush_file_realtime(dsec_t max_time, }/* while*/ - return (teardown(thread,always,old_offset,offset)); + return (teardown(thread,ON_TERIMATE,old_offset,offset)); } @@ -3903,12 +4105,12 @@ void CNodeGenerator::handle_time_strech(CGenNode * &node, int CNodeGenerator::flush_file_sim(dsec_t max_time, dsec_t d_time, - bool always, + bool on_terminate, CFlowGenListPerThread * thread, double &old_offset){ CGenNode * node; - if (!always) { + if (!on_terminate) { add_exit_node(thread,max_time); } @@ -3916,30 +4118,46 @@ int CNodeGenerator::flush_file_sim(dsec_t max_time, node = m_p_queue.top(); bool do_exit; - if ( get_is_stateless() ) { - do_exit=do_work<smSTATELESS>(node,thread,d_time,always); + if (on_terminate) { + if ( get_is_stateless() ) { + do_exit=do_work<smSTATELESS,true>(node,thread,d_time); + }else{ + do_exit=do_work<smSTATEFUL,true>(node,thread,d_time); + } }else{ - do_exit=do_work<smSTATEFUL>(node,thread,d_time,always); + if ( get_is_stateless() ) { + do_exit=do_work<smSTATELESS,false>(node,thread,d_time); + }else{ + do_exit=do_work<smSTATEFUL,false>(node,thread,d_time); + } } if ( do_exit ){ break; } } - return (teardown(thread,always,old_offset,0)); + return (teardown(thread,on_terminate,old_offset,0)); } int CNodeGenerator::flush_file(dsec_t max_time, dsec_t d_time, - bool always, + bool on_terminate, CFlowGenListPerThread * thread, double &old_offset){ #ifdef TREX_SIM - return ( flush_file_sim(max_time, d_time,always,thread,old_offset) ); + return ( flush_file_sim(max_time, d_time,on_terminate,thread,old_offset) ); #else - if ( get_is_stateless() ) { - return ( flush_file_realtime<smSTATELESS>(max_time, d_time,always,thread,old_offset) ); + if (on_terminate) { + if ( get_is_stateless() ) { + return ( flush_file_realtime<smSTATELESS,true>(max_time, d_time,thread,old_offset) ); + }else{ + return ( flush_file_realtime<smSTATEFUL,true>(max_time, d_time,thread,old_offset) ); + } }else{ - return ( flush_file_realtime<smSTATEFUL>(max_time, d_time,always,thread,old_offset) ); + if ( get_is_stateless() ) { + return ( flush_file_realtime<smSTATELESS,false>(max_time, d_time,thread,old_offset) ); + }else{ + return ( flush_file_realtime<smSTATEFUL,false>(max_time, d_time,thread,old_offset) ); + } } #endif @@ -3953,9 +4171,7 @@ void CNodeGenerator::handle_flow_pkt(CGenNode *node, CFlowGenListPerThread *thre if ( node->is_nat_first_state() ) { node->set_nat_wait_state(); flush_one_node_to_file(node); - #ifdef _DEBUG - update_stats(node); - #endif + UPDATE_STATS(node); } else { if ( node->is_nat_wait_state() ) { if (node->is_responder_pkt()) { @@ -3966,9 +4182,7 @@ void CNodeGenerator::handle_flow_pkt(CGenNode *node, CFlowGenListPerThread *thre } else { flush_one_node_to_file(node); - #ifdef _DEBUG - update_stats(node); - #endif + UPDATE_STATS(node); } } else { if ( node->is_nat_wait_ack_state() ) { @@ -3980,9 +4194,7 @@ void CNodeGenerator::handle_flow_pkt(CGenNode *node, CFlowGenListPerThread *thre } else { flush_one_node_to_file(node); -#ifdef _DEBUG - update_stats(node); -#endif + UPDATE_STATS(node); } } else { assert(0); @@ -3993,7 +4205,7 @@ void CNodeGenerator::handle_flow_pkt(CGenNode *node, CFlowGenListPerThread *thre if ( node->is_last_in_flow() ) { thread->free_last_flow_node( node); } else { - node->update_next_pkt_in_flow(); + node->update_next_pkt_in_flow_as(); m_p_queue.push(node); } } @@ -4062,7 +4274,7 @@ bool CNodeGenerator::handle_slow_messages(uint8_t type, CGenNode * node, CFlowGenListPerThread * thread, - bool always){ + bool on_terminate){ /* should we continue after */ bool exit_scheduler = false; @@ -4519,9 +4731,18 @@ void CFlowGenListPerThread::start_generate_stateful(std::string erf_file_name, node= create_node() ; node->m_type = CGenNode::FLOW_SYNC; node->m_time = m_cur_time_sec + SYNC_TIME_OUT ; - m_node_gen.add_node(node); + + if ( !get_is_stateless() ){ + /* add TW only for Stateful right now */ + node= create_node() ; + node->m_type = CGenNode::TW_SYNC; + node->m_time = m_cur_time_sec + BUCKET_TIME_SEC ; + m_node_gen.add_node(node); + } + + #ifdef _DEBUG if ( m_preview_mode.getVMode() >2 ){ |