From ff443a39bc967fa58c5fb16d626bb96a2abb59f0 Mon Sep 17 00:00:00 2001 From: imarom Date: Wed, 6 Apr 2016 17:29:28 +0300 Subject: NULL stream and multi core better support for streams --- src/bp_sim.cpp | 6 +- src/bp_sim.h | 1 + src/sim/trex_sim.h | 6 +- src/sim/trex_sim_stateless.cpp | 59 ++++++++--- src/stateless/cp/trex_stream.cpp | 13 ++- src/stateless/cp/trex_stream.h | 19 +++- src/stateless/cp/trex_streams_compiler.cpp | 146 +++++++++++++++++++++++----- src/stateless/cp/trex_streams_compiler.h | 20 +++- src/stateless/dp/trex_stateless_dp_core.cpp | 1 + src/stateless/dp/trex_stream_node.h | 20 +++- 10 files changed, 243 insertions(+), 48 deletions(-) (limited to 'src') diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp index cc9af837..6c7ab966 100755 --- a/src/bp_sim.cpp +++ b/src/bp_sim.cpp @@ -3195,6 +3195,7 @@ int CNodeGenerator::open_file(std::string file_name, m_v_if->set_review_mode(preview_mode); m_v_if->open_file(file_name); m_cnt = 0; + m_non_active = 0; m_limit = 0; return (0); } @@ -3209,10 +3210,13 @@ int CNodeGenerator::close_file(CFlowGenListPerThread * thread){ int CNodeGenerator::update_stl_stats(CGenNodeStateless *node_sl){ m_cnt++; - + if (!node_sl->is_node_active()) { + m_non_active++; + } #ifdef _DEBUG if ( m_preview_mode.getVMode() >2 ){ fprintf(stdout," %4lu ,", (ulong)m_cnt); + fprintf(stdout," %4lu ,", (ulong)m_non_active); node_sl->Dump(stdout); } #endif diff --git a/src/bp_sim.h b/src/bp_sim.h index cd85e82b..583aba82 100755 --- a/src/bp_sim.h +++ b/src/bp_sim.h @@ -2014,6 +2014,7 @@ public: CFlowGenListPerThread * m_parent; CPreviewMode m_preview_mode; uint64_t m_cnt; + uint64_t m_non_active; uint64_t m_limit; CTimeHistogram m_realtime_his; }; diff --git a/src/sim/trex_sim.h b/src/sim/trex_sim.h index 5aeeb226..0c343261 100644 --- a/src/sim/trex_sim.h +++ b/src/sim/trex_sim.h @@ -26,12 +26,14 @@ limitations under the License. #include #include #include +#include int gtest_main(int argc, char **argv); class TrexStateless; class TrexPublisher; class DpToCpHandler; +class DPCoreStats; void set_stateless_obj(TrexStateless *obj); @@ -146,8 +148,8 @@ private: void run_dp_core(int core_index, const std::string &out_filename, - uint64_t &simulated_pkts, - uint64_t &written_pkts); + std::vector &stats, + DPCoreStats &total); void cleanup(); void flush_dp_to_cp_messages_core(int core_index); diff --git a/src/sim/trex_sim_stateless.cpp b/src/sim/trex_sim_stateless.cpp index fa13401d..acbeef69 100644 --- a/src/sim/trex_sim_stateless.cpp +++ b/src/sim/trex_sim_stateless.cpp @@ -30,6 +30,23 @@ limitations under the License. using namespace std; +class DPCoreStats { +public: + DPCoreStats() { + m_simulated_pkts = 0; + m_non_active_pkts = 0; + m_written_pkts = 0; + } + + uint64_t get_on_wire_count() { + return (m_simulated_pkts - m_non_active_pkts); + } + + uint64_t m_simulated_pkts; + uint64_t m_non_active_pkts; + uint64_t m_written_pkts; +}; + /****** utils ******/ static string format_num(double num, const string &suffix = "") { const char x[] = {' ','K','M','G','T','P'}; @@ -330,21 +347,21 @@ SimStateless::show_intro(const std::string &out_filename) { void SimStateless::run_dp(const std::string &out_filename) { - uint64_t simulated_pkts_cnt = 0; - uint64_t written_pkts_cnt = 0; + std::vector core_stats(m_dp_core_count); + DPCoreStats total; - show_intro(out_filename); + show_intro(out_filename); if (is_multiple_capture()) { for (int i = 0; i < m_dp_core_count; i++) { std::stringstream ss; ss << out_filename << "-" << i; - run_dp_core(i, ss.str(), simulated_pkts_cnt, written_pkts_cnt); + run_dp_core(i, ss.str(), core_stats, total); } } else { for (int i = 0; i < m_dp_core_count; i++) { - run_dp_core(i, out_filename, simulated_pkts_cnt, written_pkts_cnt); + run_dp_core(i, out_filename, core_stats, total); } } @@ -354,12 +371,25 @@ SimStateless::run_dp(const std::string &out_filename) { std::cout << "\n\nSimulation summary:\n"; std::cout << "-------------------\n\n"; - std::cout << "simulated " << simulated_pkts_cnt << " packets\n"; + + for (int i = 0; i < m_dp_core_count; i++) { + std::cout << "core index " << i << "\n"; + std::cout << "-----------------\n\n"; + std::cout << " simulated packets : " << core_stats[i].m_simulated_pkts << "\n"; + std::cout << " non active packets : " << core_stats[i].m_non_active_pkts << "\n"; + std::cout << " on-wire packets : " << core_stats[i].get_on_wire_count() << "\n\n"; + } + + std::cout << "Total:" << "\n"; + std::cout << "-----------------\n\n"; + std::cout << " simulated packets : " << total.m_simulated_pkts << "\n"; + std::cout << " non active packets : " << total.m_non_active_pkts << "\n"; + std::cout << " on-wire packets : " << total.get_on_wire_count() << "\n\n"; if (m_is_dry_run) { std::cout << "*DRY RUN* - no packets were written\n"; } else { - std::cout << "written " << written_pkts_cnt << " packets " << "to '" << out_filename << "'\n\n"; + std::cout << "written " << total.m_written_pkts << " packets " << "to '" << out_filename << "'\n\n"; } std::cout << "\n"; @@ -395,8 +425,8 @@ SimStateless::get_limit_per_core(int core_index) { void SimStateless::run_dp_core(int core_index, const std::string &out_filename, - uint64_t &simulated_pkts, - uint64_t &written_pkts) { + std::vector &stats, + DPCoreStats &total) { CFlowGenListPerThread *lpt = m_fl.m_threads_info[core_index]; @@ -406,10 +436,17 @@ SimStateless::run_dp_core(int core_index, flush_dp_to_cp_messages_core(core_index); - simulated_pkts += lpt->m_node_gen.m_cnt; + /* core */ + stats[core_index].m_simulated_pkts = lpt->m_node_gen.m_cnt; + stats[core_index].m_non_active_pkts = lpt->m_node_gen.m_non_active; + + /* total */ + total.m_simulated_pkts += lpt->m_node_gen.m_cnt; + total.m_non_active_pkts += lpt->m_node_gen.m_non_active; if (should_capture_core(core_index)) { - written_pkts += lpt->m_node_gen.m_cnt; + stats[core_index].m_written_pkts = (lpt->m_node_gen.m_cnt - lpt->m_node_gen.m_non_active); + total.m_written_pkts += (lpt->m_node_gen.m_cnt - lpt->m_node_gen.m_non_active); } } diff --git a/src/stateless/cp/trex_stream.cpp b/src/stateless/cp/trex_stream.cpp index e3f0ba7c..6e4478d6 100644 --- a/src/stateless/cp/trex_stream.cpp +++ b/src/stateless/cp/trex_stream.cpp @@ -129,11 +129,13 @@ TrexStream::TrexStream(uint8_t type, uint8_t port_id, uint32_t stream_id) : m_port_id(port_id), m_stream_id(stream_id) , m_rate(*this) { /* default values */ - m_type = type; - m_isg_usec = 0; - m_next_stream_id = -1; - m_enabled = false; - m_self_start = false; + m_type = type; + m_isg_usec = 0; + m_next_stream_id = -1; + m_enabled = false; + m_self_start = false; + + m_delay_next_stream_sec = 0; m_pkt.binary = NULL; m_pkt.len = 0; @@ -148,6 +150,7 @@ TrexStream::TrexStream(uint8_t type, m_vm_dp = NULL; m_flags=0; m_action_count=0; + m_null_stream = false; } TrexStream::~TrexStream() { diff --git a/src/stateless/cp/trex_stream.h b/src/stateless/cp/trex_stream.h index ded6363e..e4ce914e 100644 --- a/src/stateless/cp/trex_stream.h +++ b/src/stateless/cp/trex_stream.h @@ -356,6 +356,14 @@ public: m_type = type; } + void set_null_stream(bool enable) { + m_null_stream = enable; + } + + bool get_null_stream() { + return m_null_stream; + } + uint8_t get_type(void) const { return ( m_type ); } @@ -413,8 +421,9 @@ public: dp->m_vm_dp = NULL; } - dp->m_isg_usec = m_isg_usec; - dp->m_next_stream_id = m_next_stream_id; + dp->m_isg_usec = m_isg_usec; + dp->m_delay_next_stream_sec = m_delay_next_stream_sec; + dp->m_next_stream_id = m_next_stream_id; dp->m_enabled = m_enabled; dp->m_self_start = m_self_start; @@ -448,6 +457,9 @@ public: return ( (m_burst_total_pkts / get_pps()) * 1000 * 1000); } + double get_ipg() { + return (1.0 / get_pps()); + } void Dump(FILE *fd); @@ -490,6 +502,7 @@ public: /* config fields */ + double m_delay_next_stream_sec; double m_isg_usec; int m_next_stream_id; @@ -497,6 +510,8 @@ public: bool m_enabled; bool m_self_start; + /* null stream (a dummy stream) */ + bool m_null_stream; /* VM CP and DP */ StreamVm m_vm; diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp index d6971d68..da2b9c9b 100644 --- a/src/stateless/cp/trex_streams_compiler.cpp +++ b/src/stateless/cp/trex_streams_compiler.cpp @@ -379,6 +379,8 @@ TrexStreamsCompiler::compile(uint8_t port_id, double factor, std::string *fail_msg) { + assert(dp_core_count > 0); + try { return compile_internal(port_id,streams,objs,dp_core_count,factor,fail_msg); } catch (const TrexException &ex) { @@ -415,13 +417,88 @@ TrexStreamsCompiler::compile_internal(uint8_t por /* check if all are cont. streams */ bool all_continues = true; + int non_splitable_count = 0; for (const auto stream : streams) { if (stream->get_type() != TrexStream::stCONTINUOUS) { all_continues = false; - break; } + if (!stream->is_splitable(dp_core_count)) { + non_splitable_count++; + } + } + + /* if all streams are not splitable - all should go to core 0 */ + if (non_splitable_count == streams.size()) { + compile_on_single_core(port_id, + streams, + objs, + dp_core_count, + factor, + nodes, + all_continues); + } else { + compile_on_all_cores(port_id, + streams, + objs, + dp_core_count, + factor, + nodes, + all_continues); + } + + return true; + +} + +/** + * compile a list of streams on a single core (pinned to core 0) + * + */ +void +TrexStreamsCompiler::compile_on_single_core(uint8_t port_id, + const std::vector &streams, + std::vector &objs, + uint8_t dp_core_count, + double factor, + GraphNodeMap &nodes, + bool all_continues) { + + /* allocate object only for core 0 */ + TrexStreamsCompiledObj *obj = new TrexStreamsCompiledObj(port_id); + obj->m_all_continues = all_continues; + objs.push_back(obj); + + /* put NULL for the rest */ + for (uint8_t i = 1; i < dp_core_count; i++) { + objs.push_back(NULL); } + /* compile all the streams */ + for (auto stream : streams) { + + /* skip non-enabled streams */ + if (!stream->m_enabled) { + continue; + } + + /* compile a single stream to all cores */ + compile_stream(stream, factor, 1, objs, nodes); + } +} + +/** + * compile a list of streams on all DP cores + * + */ +void +TrexStreamsCompiler::compile_on_all_cores(uint8_t port_id, + const std::vector &streams, + std::vector &objs, + uint8_t dp_core_count, + double factor, + GraphNodeMap &nodes, + bool all_continues) { + /* allocate objects for all DP cores */ for (uint8_t i = 0; i < dp_core_count; i++) { TrexStreamsCompiledObj *obj = new TrexStreamsCompiledObj(port_id); @@ -441,15 +518,6 @@ TrexStreamsCompiler::compile_internal(uint8_t por compile_stream(stream, factor, dp_core_count, objs, nodes); } - /* some objects might be empty - no streams were assigned */ - for (uint8_t i = 0; i < dp_core_count; i++) { - if (objs[i]->is_empty()) { - delete objs[i]; - objs[i] = NULL; - } - } - - return true; } /** @@ -483,10 +551,11 @@ TrexStreamsCompiler::compile_stream(TrexStream *stream, get_stateless_obj()->m_rx_flow_stat.copy_state(fixed_rx_flow_stat_stream, stream); /* can this stream be split to many cores ? */ - if (!stream->is_splitable(dp_core_count)) { + if ( (dp_core_count == 1) || (!stream->is_splitable(dp_core_count)) ) { compile_stream_on_single_core(fixed_rx_flow_stat_stream, factor, - objs[0], + dp_core_count, + objs, new_id, new_next_id); } else { @@ -515,8 +584,8 @@ TrexStreamsCompiler::compile_stream_on_all_cores(TrexStream *stream, std::vector core_streams(dp_core_count); - double per_core_factor = (factor / dp_core_count); int per_core_burst_total_pkts = (stream->m_burst_total_pkts / dp_core_count); + int burst_remainder = (stream->m_burst_total_pkts % dp_core_count); /* for each core - creates its own version of the stream */ for (uint8_t i = 0; i < dp_core_count; i++) { @@ -526,17 +595,33 @@ TrexStreamsCompiler::compile_stream_on_all_cores(TrexStream *stream, dp_stream->fix_dp_stream_id(new_id, new_next_id); - /* adjust rate and packets count */ - dp_stream->update_rate_factor(per_core_factor); + /* each core gets a share of the packets */ dp_stream->m_burst_total_pkts = per_core_burst_total_pkts; + /* core 0 also gets the remainder */ + if (i == 0) { + dp_stream->m_burst_total_pkts += burst_remainder; + } + + /* for continous the rate is divided by the cores */ + if (stream->m_type == TrexStream::stCONTINUOUS) { + dp_stream->update_rate_factor(factor / dp_core_count); + } else { + /* rate is according to the share of the packetes the core got */ + dp_stream->update_rate_factor(factor * (dp_stream->m_burst_total_pkts / double(stream->m_burst_total_pkts))); + } + + + //dp_stream->m_pkt.binary[14 + 20] = 0; + //dp_stream->m_pkt.binary[14 + 21] = i; + + /* some phase */ + dp_stream->m_isg_usec += (stream->get_ipg() * i) * 1e6; + dp_stream->m_delay_next_stream_sec = stream->get_ipg() * (dp_core_count - 1 - i); + core_streams[i] = dp_stream; } - /* take care of remainder from a burst on core 0 */ - int burst_remainder = stream->m_burst_total_pkts - (per_core_burst_total_pkts * dp_core_count); - core_streams[0]->m_burst_total_pkts += burst_remainder; - /* handle VM (split if needed) */ TrexVmSplitter vm_splitter; vm_splitter.split( (TrexStream *)stream, core_streams); @@ -555,7 +640,8 @@ TrexStreamsCompiler::compile_stream_on_all_cores(TrexStream *stream, void TrexStreamsCompiler::compile_stream_on_single_core(TrexStream *stream, double factor, - TrexStreamsCompiledObj *obj, + uint8_t dp_core_count, + std::vector &objs, int new_id, int new_next_id) { @@ -570,8 +656,24 @@ TrexStreamsCompiler::compile_stream_on_single_core(TrexStream *stream, dp_stream->m_vm_dp = stream->m_vm_dp->clone(); } - /* update the core */ - obj->add_compiled_stream(dp_stream); + //dp_stream->m_pkt.binary[14 + 20] = 0; + //dp_stream->m_pkt.binary[14 + 21] = 0; + + /* update core 0 with the real stream */ + objs[0]->add_compiled_stream(dp_stream); + + + /* create dummy streams for the other cores */ + for (uint8_t i = 1; i < dp_core_count; i++) { + TrexStream *null_dp_stream = stream->clone(); + + /* fix stream ID */ + null_dp_stream->fix_dp_stream_id(new_id, new_next_id); + + /* mark as null stream and add */ + null_dp_stream->set_null_stream(true); + objs[i]->add_compiled_stream(null_dp_stream); + } } /************************************** diff --git a/src/stateless/cp/trex_streams_compiler.h b/src/stateless/cp/trex_streams_compiler.h index b8b0be37..d910f216 100644 --- a/src/stateless/cp/trex_streams_compiler.h +++ b/src/stateless/cp/trex_streams_compiler.h @@ -123,6 +123,23 @@ private: void add_warning(const std::string &warning); void err(const std::string &err); + void compile_on_single_core(uint8_t port_id, + const std::vector &streams, + std::vector &objs, + uint8_t dp_core_count, + double factor, + GraphNodeMap &nodes, + bool all_continues); + + void compile_on_all_cores(uint8_t port_id, + const std::vector &streams, + std::vector &objs, + uint8_t dp_core_count, + double factor, + GraphNodeMap &nodes, + bool all_continues); + + void compile_stream(TrexStream *stream, double factor, uint8_t dp_core_count, @@ -131,7 +148,8 @@ private: void compile_stream_on_single_core(TrexStream *stream, double factor, - TrexStreamsCompiledObj *obj, + uint8_t dp_core_count, + std::vector &objs, int new_id, int new_next_id); diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index f125a46a..42ff9e24 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -627,6 +627,7 @@ TrexStatelessDpCore::add_stream(TrexStatelessDpPerPort * lp_port, node->m_pause =0; node->m_stream_type = stream->m_type; node->m_next_time_offset = 1.0 / stream->get_pps(); + node->m_null_stream = (stream->m_null_stream ? 1 : 0); /* stateless specific fields */ switch ( stream->m_type ) { diff --git a/src/stateless/dp/trex_stream_node.h b/src/stateless/dp/trex_stream_node.h index 104e4d3b..fa6fd8bd 100644 --- a/src/stateless/dp/trex_stream_node.h +++ b/src/stateless/dp/trex_stream_node.h @@ -84,7 +84,7 @@ private: double m_next_time_offset; /* in sec */ uint16_t m_action_counter; uint8_t m_stat_hw_id; // hw id used to count rx and tx stats - uint8_t m_pad11; + uint8_t m_null_stream; uint32_t m_pad12; stream_state_t m_state; @@ -170,6 +170,15 @@ public: } } + bool is_node_active() { + /* bitwise or - faster instead of two IFs */ + return ((m_pause | m_null_stream) == 0); + } + + bool is_null_stream() { + return (m_null_stream == 1); + } + inline uint8_t get_stream_type(){ return (m_stream_type); } @@ -199,7 +208,7 @@ public: inline void handle_continues(CFlowGenListPerThread *thread) { - if (unlikely (is_pause()==false)) { + if (likely (is_node_active())) { thread->m_node_gen.m_v_if->send_node( (CGenNode *)this); } @@ -211,7 +220,9 @@ public: } inline void handle_multi_burst(CFlowGenListPerThread *thread) { - thread->m_node_gen.m_v_if->send_node( (CGenNode *)this); + if (likely (is_node_active())) { + thread->m_node_gen.m_v_if->send_node( (CGenNode *)this); + } m_single_burst--; if (m_single_burst > 0 ) { @@ -225,7 +236,8 @@ public: set_state(CGenNodeStateless::ss_INACTIVE); if ( thread->set_stateless_next_node(this,m_next_stream) ){ /* update the next stream time using isg */ - m_next_stream->update_refresh_time(m_time); + //m_next_stream->update_refresh_time(m_time + m_next_time_offset); + m_next_stream->update_refresh_time(m_time + m_ref_stream_info->m_delay_next_stream_sec); thread->m_node_gen.m_p_queue.push( (CGenNode *)m_next_stream); }else{ -- cgit 1.2.3-korg