summaryrefslogtreecommitdiffstats
path: root/src/bp_sim.cpp
diff options
context:
space:
mode:
authorHanoh Haim <hhaim@cisco.com>2016-12-07 15:24:38 +0200
committerHanoh Haim <hhaim@cisco.com>2016-12-21 13:01:05 +0200
commiteae78d4356b8834b78a91c52d869a7949f8f3e90 (patch)
tree184156f8e653adfa33eb0e70838f45d2a92355d0 /src/bp_sim.cpp
parent539de1c6af63071c1da9ed5db668c500f8993a03 (diff)
improve Stateful scheduler
Signed-off-by: Hanoh Haim <hhaim@cisco.com>
Diffstat (limited to 'src/bp_sim.cpp')
-rwxr-xr-xsrc/bp_sim.cpp381
1 files changed, 301 insertions, 80 deletions
diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp
index 077bef63..080a6b5e 100755
--- a/src/bp_sim.cpp
+++ b/src/bp_sim.cpp
@@ -25,6 +25,7 @@ limitations under the License.
#include "utl_yaml.h"
#include "msg_manager.h"
#include "trex_watchdog.h"
+#include "utl_ipg_bucket.h"
#include <common/basic_utils.h>
@@ -1897,6 +1898,94 @@ void CFlowPktInfo::Dump(FILE *fd){
+void CCapFileFlowInfo::generate_flow(CTupleTemplateGeneratorSmart * tuple_gen,
+ CNodeGenerator * gen,
+ dsec_t time,
+ uint64_t flow_id,
+ CFlowYamlInfo * template_info,
+ CGenNode * node){
+ dsec_t c_time = time;
+
+ node->m_type=CGenNode::FLOW_PKT;
+ CTupleBase tuple;
+ tuple_gen->GenerateTuple(tuple);
+
+ CFlowGenListPerThread * lpThread=gen->Parent();
+
+ /* add the first packet of the flow */
+ CFlowPktInfo * lp=GetPacket((uint32_t)0);
+
+ node->set_socket_id(gen->m_socket_id);
+
+ node->m_thread_id = tuple_gen->GetThreadId();
+ node->m_flow_id = (flow_id & (0x000fffffffffffffULL)) |
+ ( ((uint64_t)(tuple_gen->GetThreadId()& 0xff)) <<56 ) ;
+
+ node->m_time = c_time;
+ node->m_pkt_info = lp;
+ node->m_flow_info = this;
+ node->m_flags=0;
+ node->m_template_info =template_info;
+ node->m_tuple_gen = tuple_gen->get_gen();
+ node->m_src_ip= tuple.getClient();
+ node->m_dest_ip = tuple.getServer();
+ node->m_src_idx = tuple.getClientId();
+ node->m_dest_idx = tuple.getServerId();
+ node->m_src_port = tuple.getClientPort();
+ node->m_client_cfg = tuple.getClientCfg();
+
+ node->m_plugin_info =(void *)0;
+
+ if ( unlikely( CGlobalInfo::is_learn_mode() ) ){
+ // check if flow is two direction
+ if ( lp->m_pkt_indication.m_desc.IsBiDirectionalFlow() ) {
+ /* we are in learn mode */
+ lpThread->associate(((uint32_t)flow_id) & NAT_FLOW_ID_MASK, node); /* associate flow_id=>node */
+ node->set_nat_first_state();
+ }
+ }
+
+ if ( unlikely( get_is_rx_check_mode()) ) {
+ if ( (CGlobalInfo::m_options.m_rx_check_sample == 1 ) ||
+ ( ( rte_rand() % CGlobalInfo::m_options.m_rx_check_sample ) == 1 )){
+ if (unlikely(!node->is_repeat_flow() )) {
+ node->set_rx_check();
+ }
+ }
+ }
+
+ if ( unlikely( CGlobalInfo::m_options.preview.getClientServerFlowFlipAddr() ) ){
+ node->set_initiator_start_from_server_side_with_server_addr(node->is_eligible_from_server_side());
+ }else{
+ /* -p */
+ if ( likely( CGlobalInfo::m_options.preview.getClientServerFlowFlip() ) ){
+ node->set_initiator_start_from_server(node->is_eligible_from_server_side());
+ node->set_all_flow_from_same_dir(true);
+ }else{
+ /* --flip */
+ if ( unlikely( CGlobalInfo::m_options.preview.getClientServerFlip() ) ){
+ node->set_initiator_start_from_server(node->is_eligible_from_server_side());
+ }
+ }
+ }
+
+
+ /* in case of plugin we need to call the callback */
+ if ( template_info->m_plugin_id ) {
+ /* alloc the info , generate the ports */
+ on_node_first(template_info->m_plugin_id,node,template_info,tuple_gen,gen->Parent() );
+ }
+
+ node->m_tmr.reset();
+
+ /* in case of noraml flow use TW */
+ if (likely(node->m_type == CGenNode::FLOW_PKT)){
+ lpThread->on_flow_tick<false>(node); /* tick packet */
+ }else{
+ gen->add_node(node);
+ }
+}
+
void CCapFileFlowInfo::save_to_erf(std::string cap_file_name,int pcap){
if (Size() ==0) {
@@ -2073,13 +2162,16 @@ enum CCapFileFlowInfo::load_cap_file_err CCapFileFlowInfo::is_valid_template_loa
* 1. maximum aging
* 2. per sub-flow pkt_num/max-pkt per dir and per global
*/
-void CCapFileFlowInfo::update_info(){
+void CCapFileFlowInfo::update_info(CFlowYamlInfo * flow_info){
flow_tmp_map_iter_t iter;
flow_tmp_map_t ft;
CTmpFlowInfo * lpFlow;
int i;
dsec_t ctime=0.0;
+ CCalcIpgDiff dtick_util(BUCKET_TIME_SEC);
+
+
// first iteration, lern all the info into a temp flow table
for (i=0; i<Size(); i++) {
CFlowPktInfo * lp= GetPacket((uint32_t)i);
@@ -2141,6 +2233,23 @@ void CCapFileFlowInfo::update_info(){
lpCurPacket->SetMaxPkts(lpFlow->m_per_dir[dir].m_pkt_id);
lp->m_pkt_indication.m_desc.SetMaxPktsPerFlow(lpFlow->m_max_pkts);
lp->m_pkt_indication.m_desc.SetMaxFlowTimeout(lpFlow->m_max_aging_sec);
+
+
+
+ /* update dtick from ipg */
+ double dtime=0;
+
+ if ( likely ( lp->m_pkt_indication.m_desc.IsPcapTiming()) ){
+ dtime = lp->m_pkt_indication.m_cap_ipg ;
+ }else{
+ if ( lp->m_pkt_indication.m_desc.IsRtt() ){
+ dtime = flow_info->m_rtt_sec ;
+ }else{
+ dtime = flow_info->m_ipg_sec;
+ }
+ lp->m_pkt_indication.m_cap_ipg = dtime;
+ }
+ lp->m_pkt_indication.m_ticks = dtick_util.do_calc(dtime);
}
@@ -2351,6 +2460,8 @@ enum CCapFileFlowInfo::load_cap_file_err CCapFileFlowInfo::load_cap_file(std::st
return kOK;
}
+
+
void CCapFileFlowInfo::update_pcap_mode(){
int i;
for (i=0; i<(int)Size(); i++) {
@@ -3188,7 +3299,7 @@ bool CFlowGeneratorRec::Create(CFlowYamlInfo * info,
if (m_flow_info.is_valid_template_load_time() != 0) {
return (false);
}
- m_flow_info.update_info();
+ m_flow_info.update_info(m_info);
return (true);
}else{
return (false);
@@ -3311,7 +3422,7 @@ int CNodeGenerator::update_stl_stats(CGenNodeStateless *node_sl){
}
-int CNodeGenerator::update_stats(CGenNode * node){
+int CNodeGenerator::update_stats(CGenNode * node){
if ( m_preview_mode.getVMode() >2 ){
fprintf(stdout," %llu ,", (unsigned long long)m_cnt);
node->Dump(stdout);
@@ -3320,6 +3431,7 @@ int CNodeGenerator::update_stats(CGenNode * node){
return (0);
}
+
bool CNodeGenerator::has_limit_reached() {
/* do we have a limit and has it passed ? */
return ( (m_limit > 0) && (m_cnt >= m_limit) );
@@ -3347,7 +3459,6 @@ bool CFlowGenListPerThread::Create(uint32_t thread_id,
char name[100];
sprintf(name,"nodes-%d",m_core_id);
- //printf(" create thread %d %s socket: %d \n",m_core_id,name,socket_id);
m_node_pool = utl_rte_mempool_create_non_pkt(name,
CGlobalInfo::m_memory_cfg.get_each_core_dp_flows(),
@@ -3356,7 +3467,8 @@ bool CFlowGenListPerThread::Create(uint32_t thread_id,
0 ,
socket_id);
- //printf(" pool %p \n",m_node_pool);
+ m_tw.Create(TW_BUCKETS,3);
+
m_node_gen.Create(this);
m_flow_id_to_node_lookup.Create();
@@ -3556,6 +3668,7 @@ void CFlowGenListPerThread::Delete(){
m_node_gen.Delete();
Clean();
m_cpu_cp_u.Delete();
+ m_tw.Delete();
utl_rte_mempool_delete(m_node_pool);
}
@@ -3628,22 +3741,109 @@ inline bool CNodeGenerator::handle_stl_node(CGenNode * node,
}
+
+#define unsafe_container_of(var,ptr, type, member) \
+ ((type *) ((uint8_t *)(ptr) - offsetof(type, member)))
+
+
+/*TEARDOWN is true for stateful in second phase we wait for all the flow to finish
+with --nc there is no TEARDOWN
+
+first phase ==> TEARDOWN =false
+last phase ==> TEARDOWN =true
+
+this is relevant for repeatable flows
+*/
+
+template<bool TEARDOWN>
+inline void CFlowGenListPerThread::on_flow_tick(CGenNode *node){
+
+ #ifdef TREX_SIM
+ node->m_time=m_cur_time_sec;
+ #endif
+ #ifdef _DEBUG
+ m_node_gen.update_stats(node);
+ #endif
+ m_node_gen.flush_one_node_to_file(node);
+
+ if ( likely (!node->is_repeat_flow()) ) {
+ if ( likely (!node->is_last_in_flow()) ) {
+ m_tw.timer_start(&node->m_tmr,node->update_next_pkt_in_flow_tw() );
+ }else{
+ free_last_flow_node( node);
+ }
+ }else{
+ /* repeatable flow, we need to stop it in case of repeat */
+ if ( node->is_last_in_flow() ) {
+
+ if ( TEARDOWN == false ){
+ node->m_time=m_cur_time_sec; /* update the node time as we schedule it */
+ reschedule_flow(node);
+ }else{
+ free_last_flow_node( node);
+ }
+
+ }else{
+ m_tw.timer_start(&node->m_tmr,node->update_next_pkt_in_flow_tw() );
+ }
+ }
+}
+
+#define GCC_DIAG_STR(s) #s
+#define GCC_DIAG_JOINSTR(x,y) GCC_DIAG_STR(x ## y)
+# define GCC_DIAG_DO_PRAGMA(x) _Pragma (#x)
+# define GCC_DIAG_PRAGMA(x) GCC_DIAG_DO_PRAGMA(GCC diagnostic x)
+#define GCC_DIAG_OFF(x) GCC_DIAG_PRAGMA(push) \
+ GCC_DIAG_PRAGMA(ignored GCC_DIAG_JOINSTR(-W,x))
+#define GCC_DIAG_ON() GCC_DIAG_PRAGMA(pop)
+
+#define UNSAFE_CONTAINER_OF_PUSH GCC_DIAG_OFF(invalid-offsetof)
+#define UNSAFE_CONTAINER_OF_POP GCC_DIAG_ON()
+
+
+
+
+static void tw_on_tick_per_thread_cb_always(void *userdata,
+ CHTimerObj *tmr){
+ CFlowGenListPerThread * thread=(CFlowGenListPerThread * )userdata;
+ UNSAFE_CONTAINER_OF_PUSH;
+ CGenNode * node=unsafe_container_of(node,tmr,CGenNode,m_tmr);
+ UNSAFE_CONTAINER_OF_POP;
+
+ thread->on_flow_tick<true>(node);
+}
+
+
+void tw_on_tick_per_thread_cb(void *userdata,
+ CHTimerObj *tmr){
+ CFlowGenListPerThread * thread=(CFlowGenListPerThread * )userdata;
+
+ UNSAFE_CONTAINER_OF_PUSH;
+ CGenNode * node=unsafe_container_of(node,tmr,CGenNode,m_tmr);
+ UNSAFE_CONTAINER_OF_POP;
+
+ thread->on_flow_tick<false>(node);
+}
+
+
inline bool CNodeGenerator::do_work_stl(CGenNode * node,
- CFlowGenListPerThread * thread,
- bool always){
+ CFlowGenListPerThread * thread,
+ bool on_terminate){
if ( handle_stl_node(node,thread)){
return (false);
}else{
- return (handle_slow_messages(node->m_type,node,thread,always));
+ return (handle_slow_messages(node->m_type,node,thread,on_terminate));
}
}
+
+
+
+template<bool ON_TERMINATE>
inline bool CNodeGenerator::do_work_both(CGenNode * node,
- CFlowGenListPerThread * thread,
- dsec_t d_time,
- bool always
- ){
+ CFlowGenListPerThread * thread,
+ dsec_t d_time){
bool exit_scheduler=false;
uint8_t type=node->m_type;
@@ -3651,48 +3851,53 @@ inline bool CNodeGenerator::do_work_both(CGenNode * node,
if ( handle_stl_node (node,thread) ){
}else{
- if ( likely( type == CGenNode::FLOW_PKT ) ) {
- /* PKT */
- if ( !(node->is_repeat_flow()) || (always==false)) {
- flush_one_node_to_file(node);
- #ifdef _DEBUG
- update_stats(node);
- #endif
- }
+ if ( likely( type == CGenNode::TW_SYNC ) ) {
m_p_queue.pop();
- if ( node->is_last_in_flow() ) {
- if ((node->is_repeat_flow()) && (always==false)) {
- /* Flow is repeated, reschedule it */
- thread->reschedule_flow( node);
+ /* update bucket time */
+ thread->m_cur_time_sec = node->m_time;
+ if ( ON_TERMINATE ) {
+ thread->m_tw.on_tick((void*)thread,tw_on_tick_per_thread_cb_always);
+ if ( thread->m_tw.is_any_events_left() ){
+ node->m_time += BUCKET_TIME_SEC;
+ m_p_queue.push(node);
}else{
- /* Flow will not be repeated, so free node */
- thread->free_last_flow_node( node);
+ thread->free_node(node);
}
}else{
- node->update_next_pkt_in_flow();
+ thread->m_tw.on_tick((void*)thread,tw_on_tick_per_thread_cb);
+ node->m_time += BUCKET_TIME_SEC;;
m_p_queue.push(node);
}
- }else{
- if ((type == CGenNode::FLOW_FIF)) {
- /* callback to our method */
- m_p_queue.pop();
- if ( always == false) {
- thread->m_cur_time_sec = node->m_time ;
- thread->generate_flows_roundrobin(&done);
+ }else{
- if (!done) {
- node->m_time +=d_time;
- m_p_queue.push(node);
+ if ( likely( type == CGenNode::FLOW_PKT ) ) {
+ /* PKT */
+ m_p_queue.pop();
+ thread->on_flow_tick<ON_TERMINATE>(node);
+ //printf(" MOVE from PKT->TW\n");
+ }else{
+ if ((type == CGenNode::FLOW_FIF)) {
+ /* callback to our method */
+ m_p_queue.pop();
+ if ( ON_TERMINATE == false) {
+ thread->m_cur_time_sec = node->m_time ;
+
+ thread->generate_flows_roundrobin(&done);
+
+ if (!done) {
+ node->m_time +=d_time;
+ m_p_queue.push(node);
+ }else{
+ thread->free_node(node);
+ }
}else{
thread->free_node(node);
}
+
}else{
- thread->free_node(node);
+ exit_scheduler = handle_slow_messages(type,node,thread,ON_TERMINATE);
}
-
- }else{
- exit_scheduler = handle_slow_messages(type,node,thread,always);
}
}
}
@@ -3702,18 +3907,16 @@ inline bool CNodeGenerator::do_work_both(CGenNode * node,
-template<int SCH_MODE>
+template<int SCH_MODE,bool ON_TERMINATE>
inline bool CNodeGenerator::do_work(CGenNode * node,
- CFlowGenListPerThread * thread,
- dsec_t d_time,
- bool always
- ){
+ CFlowGenListPerThread * thread,
+ dsec_t d_time){
/* template filter in compile time */
if ( SCH_MODE == smSTATELESS ) {
- return ( do_work_stl(node,thread,always) );
+ return ( do_work_stl(node,thread,ON_TERMINATE) );
}else{
/* smSTATEFUL */
- return ( do_work_both(node,thread,d_time,always) );
+ return ( do_work_both<ON_TERMINATE>(node,thread,d_time) );
}
}
@@ -3741,9 +3944,9 @@ inline void CNodeGenerator::do_sleep(dsec_t & cur_time,
inline int CNodeGenerator::teardown(CFlowGenListPerThread * thread,
- bool always,
- double &old_offset,
- double offset){
+ bool on_terminate,
+ double &old_offset,
+ double offset){
thread->m_cpu_dp_u.commit1();
@@ -3752,7 +3955,7 @@ inline int CNodeGenerator::teardown(CFlowGenListPerThread * thread,
return (0);
}
- if (!always) {
+ if (!on_terminate) {
old_offset =offset;
}else{
// free the left other
@@ -3763,17 +3966,16 @@ inline int CNodeGenerator::teardown(CFlowGenListPerThread * thread,
-template<int SCH_MODE>
+template<int SCH_MODE,bool ON_TERIMATE>
inline int CNodeGenerator::flush_file_realtime(dsec_t max_time,
dsec_t d_time,
- bool always,
CFlowGenListPerThread * thread,
double &old_offset) {
CGenNode * node;
dsec_t offset=0.0;
dsec_t cur_time;
dsec_t n_time;
- if (always) {
+ if (ON_TERIMATE) {
offset=old_offset;
}else{
add_exit_node(thread,max_time);
@@ -3810,7 +4012,7 @@ inline int CNodeGenerator::flush_file_realtime(dsec_t max_time,
int node_count = 0;
do {
- bool s=do_work<SCH_MODE>(node,thread,d_time,always);
+ bool s=do_work<SCH_MODE,ON_TERIMATE>(node,thread,d_time);
if (s) { // can we remove this IF ?
state=scTERMINATE;
break;
@@ -3842,7 +4044,7 @@ inline int CNodeGenerator::flush_file_realtime(dsec_t max_time,
}/* while*/
- return (teardown(thread,always,old_offset,offset));
+ return (teardown(thread,ON_TERIMATE,old_offset,offset));
}
@@ -3903,12 +4105,12 @@ void CNodeGenerator::handle_time_strech(CGenNode * &node,
int CNodeGenerator::flush_file_sim(dsec_t max_time,
dsec_t d_time,
- bool always,
+ bool on_terminate,
CFlowGenListPerThread * thread,
double &old_offset){
CGenNode * node;
- if (!always) {
+ if (!on_terminate) {
add_exit_node(thread,max_time);
}
@@ -3916,30 +4118,46 @@ int CNodeGenerator::flush_file_sim(dsec_t max_time,
node = m_p_queue.top();
bool do_exit;
- if ( get_is_stateless() ) {
- do_exit=do_work<smSTATELESS>(node,thread,d_time,always);
+ if (on_terminate) {
+ if ( get_is_stateless() ) {
+ do_exit=do_work<smSTATELESS,true>(node,thread,d_time);
+ }else{
+ do_exit=do_work<smSTATEFUL,true>(node,thread,d_time);
+ }
}else{
- do_exit=do_work<smSTATEFUL>(node,thread,d_time,always);
+ if ( get_is_stateless() ) {
+ do_exit=do_work<smSTATELESS,false>(node,thread,d_time);
+ }else{
+ do_exit=do_work<smSTATEFUL,false>(node,thread,d_time);
+ }
}
if ( do_exit ){
break;
}
}
- return (teardown(thread,always,old_offset,0));
+ return (teardown(thread,on_terminate,old_offset,0));
}
int CNodeGenerator::flush_file(dsec_t max_time,
dsec_t d_time,
- bool always,
+ bool on_terminate,
CFlowGenListPerThread * thread,
double &old_offset){
#ifdef TREX_SIM
- return ( flush_file_sim(max_time, d_time,always,thread,old_offset) );
+ return ( flush_file_sim(max_time, d_time,on_terminate,thread,old_offset) );
#else
- if ( get_is_stateless() ) {
- return ( flush_file_realtime<smSTATELESS>(max_time, d_time,always,thread,old_offset) );
+ if (on_terminate) {
+ if ( get_is_stateless() ) {
+ return ( flush_file_realtime<smSTATELESS,true>(max_time, d_time,thread,old_offset) );
+ }else{
+ return ( flush_file_realtime<smSTATEFUL,true>(max_time, d_time,thread,old_offset) );
+ }
}else{
- return ( flush_file_realtime<smSTATEFUL>(max_time, d_time,always,thread,old_offset) );
+ if ( get_is_stateless() ) {
+ return ( flush_file_realtime<smSTATELESS,false>(max_time, d_time,thread,old_offset) );
+ }else{
+ return ( flush_file_realtime<smSTATEFUL,false>(max_time, d_time,thread,old_offset) );
+ }
}
#endif
@@ -3953,9 +4171,7 @@ void CNodeGenerator::handle_flow_pkt(CGenNode *node, CFlowGenListPerThread *thre
if ( node->is_nat_first_state() ) {
node->set_nat_wait_state();
flush_one_node_to_file(node);
- #ifdef _DEBUG
- update_stats(node);
- #endif
+ UPDATE_STATS(node);
} else {
if ( node->is_nat_wait_state() ) {
if (node->is_responder_pkt()) {
@@ -3966,9 +4182,7 @@ void CNodeGenerator::handle_flow_pkt(CGenNode *node, CFlowGenListPerThread *thre
} else {
flush_one_node_to_file(node);
- #ifdef _DEBUG
- update_stats(node);
- #endif
+ UPDATE_STATS(node);
}
} else {
if ( node->is_nat_wait_ack_state() ) {
@@ -3980,9 +4194,7 @@ void CNodeGenerator::handle_flow_pkt(CGenNode *node, CFlowGenListPerThread *thre
} else {
flush_one_node_to_file(node);
-#ifdef _DEBUG
- update_stats(node);
-#endif
+ UPDATE_STATS(node);
}
} else {
assert(0);
@@ -3993,7 +4205,7 @@ void CNodeGenerator::handle_flow_pkt(CGenNode *node, CFlowGenListPerThread *thre
if ( node->is_last_in_flow() ) {
thread->free_last_flow_node( node);
} else {
- node->update_next_pkt_in_flow();
+ node->update_next_pkt_in_flow_as();
m_p_queue.push(node);
}
}
@@ -4062,7 +4274,7 @@ bool
CNodeGenerator::handle_slow_messages(uint8_t type,
CGenNode * node,
CFlowGenListPerThread * thread,
- bool always){
+ bool on_terminate){
/* should we continue after */
bool exit_scheduler = false;
@@ -4519,9 +4731,18 @@ void CFlowGenListPerThread::start_generate_stateful(std::string erf_file_name,
node= create_node() ;
node->m_type = CGenNode::FLOW_SYNC;
node->m_time = m_cur_time_sec + SYNC_TIME_OUT ;
-
m_node_gen.add_node(node);
+
+ if ( !get_is_stateless() ){
+ /* add TW only for Stateful right now */
+ node= create_node() ;
+ node->m_type = CGenNode::TW_SYNC;
+ node->m_time = m_cur_time_sec + BUCKET_TIME_SEC ;
+ m_node_gen.add_node(node);
+ }
+
+
#ifdef _DEBUG
if ( m_preview_mode.getVMode() >2 ){