diff options
Diffstat (limited to 'src/stateless/cp/trex_streams_compiler.cpp')
-rw-r--r-- | src/stateless/cp/trex_streams_compiler.cpp | 151 |
1 files changed, 125 insertions, 26 deletions
diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp index d6971d68..d1cffbb7 100644 --- a/src/stateless/cp/trex_streams_compiler.cpp +++ b/src/stateless/cp/trex_streams_compiler.cpp @@ -379,6 +379,8 @@ TrexStreamsCompiler::compile(uint8_t port_id, double factor, std::string *fail_msg) { + assert(dp_core_count > 0); + try { return compile_internal(port_id,streams,objs,dp_core_count,factor,fail_msg); } catch (const TrexException &ex) { @@ -415,12 +417,87 @@ TrexStreamsCompiler::compile_internal(uint8_t por /* check if all are cont. streams */ bool all_continues = true; + int non_splitable_count = 0; for (const auto stream : streams) { if (stream->get_type() != TrexStream::stCONTINUOUS) { all_continues = false; - break; } + if (!stream->is_splitable(dp_core_count)) { + non_splitable_count++; + } + } + + /* if all streams are not splitable - all should go to core 0 */ + if (non_splitable_count == streams.size()) { + compile_on_single_core(port_id, + streams, + objs, + dp_core_count, + factor, + nodes, + all_continues); + } else { + compile_on_all_cores(port_id, + streams, + objs, + dp_core_count, + factor, + nodes, + all_continues); + } + + return true; + +} + +/** + * compile a list of streams on a single core (pinned to core 0) + * + */ +void +TrexStreamsCompiler::compile_on_single_core(uint8_t port_id, + const std::vector<TrexStream *> &streams, + std::vector<TrexStreamsCompiledObj *> &objs, + uint8_t dp_core_count, + double factor, + GraphNodeMap &nodes, + bool all_continues) { + + /* allocate object only for core 0 */ + TrexStreamsCompiledObj *obj = new TrexStreamsCompiledObj(port_id); + obj->m_all_continues = all_continues; + objs.push_back(obj); + + /* put NULL for the rest */ + for (uint8_t i = 1; i < dp_core_count; i++) { + objs.push_back(NULL); + } + + /* compile all the streams */ + for (auto stream : streams) { + + /* skip non-enabled streams */ + if (!stream->m_enabled) { + continue; + } + + /* compile the stream for only one core */ + compile_stream(stream, factor, 1, objs, nodes); } +} + +/** + * compile a list of streams on all DP cores + * + */ +void +TrexStreamsCompiler::compile_on_all_cores(uint8_t port_id, + const std::vector<TrexStream *> &streams, + std::vector<TrexStreamsCompiledObj *> &objs, + uint8_t dp_core_count, + double factor, + GraphNodeMap &nodes, + bool all_continues) { /* allocate objects for all DP cores */ for (uint8_t i = 0; i < dp_core_count; i++) { @@ -441,15 +518,6 @@ TrexStreamsCompiler::compile_internal(uint8_t por compile_stream(stream, factor, dp_core_count, objs, nodes); } - /* some objects might be empty - no streams were assigned */ - for (uint8_t i = 0; i < dp_core_count; i++) { - if (objs[i]->is_empty()) { - delete objs[i]; - objs[i] = NULL; - } - } - - return true; } /** @@ -482,16 +550,17 @@ TrexStreamsCompiler::compile_stream(TrexStream *stream, // change the packet kept in the stream). We want the state to be saved in the original stream. get_stateless_obj()->m_rx_flow_stat.copy_state(fixed_rx_flow_stat_stream, stream); + fixed_rx_flow_stat_stream->update_rate_factor(factor); + /* can this stream be split to many cores ? */ - if (!stream->is_splitable(dp_core_count)) { + if ( (dp_core_count == 1) || (!stream->is_splitable(dp_core_count)) ) { compile_stream_on_single_core(fixed_rx_flow_stat_stream, - factor, - objs[0], + dp_core_count, + objs, new_id, new_next_id); } else { compile_stream_on_all_cores(fixed_rx_flow_stat_stream, - factor, dp_core_count, objs, new_id, @@ -507,7 +576,6 @@ TrexStreamsCompiler::compile_stream(TrexStream *stream, */ void TrexStreamsCompiler::compile_stream_on_all_cores(TrexStream *stream, - double factor, uint8_t dp_core_count, std::vector<TrexStreamsCompiledObj *> &objs, int new_id, @@ -515,8 +583,13 @@ TrexStreamsCompiler::compile_stream_on_all_cores(TrexStream *stream, std::vector<TrexStream *> core_streams(dp_core_count); - double per_core_factor = (factor / dp_core_count); int per_core_burst_total_pkts = (stream->m_burst_total_pkts / dp_core_count); + const int burst_remainder = (stream->m_burst_total_pkts % dp_core_count); + int remainder_left = burst_remainder; + + /* this is the stream base IPG (pre split) */ + double base_ipg_sec = stream->get_ipg_sec(); + /* for each core - creates its own version of the stream */ for (uint8_t i = 0; i < dp_core_count; i++) { @@ -525,18 +598,31 @@ TrexStreamsCompiler::compile_stream_on_all_cores(TrexStream *stream, /* fix stream ID */ dp_stream->fix_dp_stream_id(new_id, new_next_id); + /* some phase is added to avoid all the cores TXing at once */ + dp_stream->m_mc_phase_pre_sec = base_ipg_sec * i; - /* adjust rate and packets count */ - dp_stream->update_rate_factor(per_core_factor); + + /* each core gets a share of the packets */ dp_stream->m_burst_total_pkts = per_core_burst_total_pkts; + + /* rate is slower * dp_core_count */ + dp_stream->update_rate_factor(1.0 / dp_core_count); + + + if (remainder_left > 0) { + dp_stream->m_burst_total_pkts++; + remainder_left--; + /* this core needs to wait to the rest of the cores that will participate in the last round */ + dp_stream->m_mc_phase_post_sec = base_ipg_sec * remainder_left; + } else { + /* this core did not participate in the last round so it will wait its current round's left + burst_remainder */ + dp_stream->m_mc_phase_post_sec = base_ipg_sec * (dp_core_count - 1 - i + burst_remainder); + } + core_streams[i] = dp_stream; } - /* take care of remainder from a burst on core 0 */ - int burst_remainder = stream->m_burst_total_pkts - (per_core_burst_total_pkts * dp_core_count); - core_streams[0]->m_burst_total_pkts += burst_remainder; - /* handle VM (split if needed) */ TrexVmSplitter vm_splitter; vm_splitter.split( (TrexStream *)stream, core_streams); @@ -554,8 +640,8 @@ TrexStreamsCompiler::compile_stream_on_all_cores(TrexStream *stream, */ void TrexStreamsCompiler::compile_stream_on_single_core(TrexStream *stream, - double factor, - TrexStreamsCompiledObj *obj, + uint8_t dp_core_count, + std::vector<TrexStreamsCompiledObj *> &objs, int new_id, int new_next_id) { @@ -570,8 +656,21 @@ TrexStreamsCompiler::compile_stream_on_single_core(TrexStream *stream, dp_stream->m_vm_dp = stream->m_vm_dp->clone(); } - /* update the core */ - obj->add_compiled_stream(dp_stream); + /* update core 0 with the real stream */ + objs[0]->add_compiled_stream(dp_stream); + + + /* create dummy streams for the other cores */ + for (uint8_t i = 1; i < dp_core_count; i++) { + TrexStream *null_dp_stream = stream->clone(); + + /* fix stream ID */ + null_dp_stream->fix_dp_stream_id(new_id, new_next_id); + + /* mark as null stream and add */ + null_dp_stream->set_null_stream(true); + objs[i]->add_compiled_stream(null_dp_stream); + } } /************************************** |