summaryrefslogtreecommitdiffstats
path: root/src/stateless/cp/trex_streams_compiler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/stateless/cp/trex_streams_compiler.cpp')
-rw-r--r--src/stateless/cp/trex_streams_compiler.cpp151
1 files changed, 125 insertions, 26 deletions
diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp
index d6971d68..d1cffbb7 100644
--- a/src/stateless/cp/trex_streams_compiler.cpp
+++ b/src/stateless/cp/trex_streams_compiler.cpp
@@ -379,6 +379,8 @@ TrexStreamsCompiler::compile(uint8_t port_id,
double factor,
std::string *fail_msg) {
+ assert(dp_core_count > 0);
+
try {
return compile_internal(port_id,streams,objs,dp_core_count,factor,fail_msg);
} catch (const TrexException &ex) {
@@ -415,12 +417,87 @@ TrexStreamsCompiler::compile_internal(uint8_t por
/* check if all are cont. streams */
bool all_continues = true;
+ int non_splitable_count = 0;
for (const auto stream : streams) {
if (stream->get_type() != TrexStream::stCONTINUOUS) {
all_continues = false;
- break;
}
+ if (!stream->is_splitable(dp_core_count)) {
+ non_splitable_count++;
+ }
+ }
+
+ /* if all streams are not splitable - all should go to core 0 */
+ if (non_splitable_count == streams.size()) {
+ compile_on_single_core(port_id,
+ streams,
+ objs,
+ dp_core_count,
+ factor,
+ nodes,
+ all_continues);
+ } else {
+ compile_on_all_cores(port_id,
+ streams,
+ objs,
+ dp_core_count,
+ factor,
+ nodes,
+ all_continues);
+ }
+
+ return true;
+
+}
+
+/**
+ * compile a list of streams on a single core (pinned to core 0)
+ *
+ */
+void
+TrexStreamsCompiler::compile_on_single_core(uint8_t port_id,
+ const std::vector<TrexStream *> &streams,
+ std::vector<TrexStreamsCompiledObj *> &objs,
+ uint8_t dp_core_count,
+ double factor,
+ GraphNodeMap &nodes,
+ bool all_continues) {
+
+ /* allocate object only for core 0 */
+ TrexStreamsCompiledObj *obj = new TrexStreamsCompiledObj(port_id);
+ obj->m_all_continues = all_continues;
+ objs.push_back(obj);
+
+ /* put NULL for the rest */
+ for (uint8_t i = 1; i < dp_core_count; i++) {
+ objs.push_back(NULL);
+ }
+
+ /* compile all the streams */
+ for (auto stream : streams) {
+
+ /* skip non-enabled streams */
+ if (!stream->m_enabled) {
+ continue;
+ }
+
+ /* compile the stream for only one core */
+ compile_stream(stream, factor, 1, objs, nodes);
}
+}
+
+/**
+ * compile a list of streams on all DP cores
+ *
+ */
+void
+TrexStreamsCompiler::compile_on_all_cores(uint8_t port_id,
+ const std::vector<TrexStream *> &streams,
+ std::vector<TrexStreamsCompiledObj *> &objs,
+ uint8_t dp_core_count,
+ double factor,
+ GraphNodeMap &nodes,
+ bool all_continues) {
/* allocate objects for all DP cores */
for (uint8_t i = 0; i < dp_core_count; i++) {
@@ -441,15 +518,6 @@ TrexStreamsCompiler::compile_internal(uint8_t por
compile_stream(stream, factor, dp_core_count, objs, nodes);
}
- /* some objects might be empty - no streams were assigned */
- for (uint8_t i = 0; i < dp_core_count; i++) {
- if (objs[i]->is_empty()) {
- delete objs[i];
- objs[i] = NULL;
- }
- }
-
- return true;
}
/**
@@ -482,16 +550,17 @@ TrexStreamsCompiler::compile_stream(TrexStream *stream,
// change the packet kept in the stream). We want the state to be saved in the original stream.
get_stateless_obj()->m_rx_flow_stat.copy_state(fixed_rx_flow_stat_stream, stream);
+ fixed_rx_flow_stat_stream->update_rate_factor(factor);
+
/* can this stream be split to many cores ? */
- if (!stream->is_splitable(dp_core_count)) {
+ if ( (dp_core_count == 1) || (!stream->is_splitable(dp_core_count)) ) {
compile_stream_on_single_core(fixed_rx_flow_stat_stream,
- factor,
- objs[0],
+ dp_core_count,
+ objs,
new_id,
new_next_id);
} else {
compile_stream_on_all_cores(fixed_rx_flow_stat_stream,
- factor,
dp_core_count,
objs,
new_id,
@@ -507,7 +576,6 @@ TrexStreamsCompiler::compile_stream(TrexStream *stream,
*/
void
TrexStreamsCompiler::compile_stream_on_all_cores(TrexStream *stream,
- double factor,
uint8_t dp_core_count,
std::vector<TrexStreamsCompiledObj *> &objs,
int new_id,
@@ -515,8 +583,13 @@ TrexStreamsCompiler::compile_stream_on_all_cores(TrexStream *stream,
std::vector<TrexStream *> core_streams(dp_core_count);
- double per_core_factor = (factor / dp_core_count);
int per_core_burst_total_pkts = (stream->m_burst_total_pkts / dp_core_count);
+ const int burst_remainder = (stream->m_burst_total_pkts % dp_core_count);
+ int remainder_left = burst_remainder;
+
+ /* this is the stream base IPG (pre split) */
+ double base_ipg_sec = stream->get_ipg_sec();
+
/* for each core - creates its own version of the stream */
for (uint8_t i = 0; i < dp_core_count; i++) {
@@ -525,18 +598,31 @@ TrexStreamsCompiler::compile_stream_on_all_cores(TrexStream *stream,
/* fix stream ID */
dp_stream->fix_dp_stream_id(new_id, new_next_id);
+ /* some phase is added to avoid all the cores TXing at once */
+ dp_stream->m_mc_phase_pre_sec = base_ipg_sec * i;
- /* adjust rate and packets count */
- dp_stream->update_rate_factor(per_core_factor);
+
+ /* each core gets a share of the packets */
dp_stream->m_burst_total_pkts = per_core_burst_total_pkts;
+
+ /* rate is slower * dp_core_count */
+ dp_stream->update_rate_factor(1.0 / dp_core_count);
+
+
+ if (remainder_left > 0) {
+ dp_stream->m_burst_total_pkts++;
+ remainder_left--;
+ /* this core needs to wait to the rest of the cores that will participate in the last round */
+ dp_stream->m_mc_phase_post_sec = base_ipg_sec * remainder_left;
+ } else {
+ /* this core did not participate in the last round so it will wait its current round's left + burst_remainder */
+ dp_stream->m_mc_phase_post_sec = base_ipg_sec * (dp_core_count - 1 - i + burst_remainder);
+ }
+
core_streams[i] = dp_stream;
}
- /* take care of remainder from a burst on core 0 */
- int burst_remainder = stream->m_burst_total_pkts - (per_core_burst_total_pkts * dp_core_count);
- core_streams[0]->m_burst_total_pkts += burst_remainder;
-
/* handle VM (split if needed) */
TrexVmSplitter vm_splitter;
vm_splitter.split( (TrexStream *)stream, core_streams);
@@ -554,8 +640,8 @@ TrexStreamsCompiler::compile_stream_on_all_cores(TrexStream *stream,
*/
void
TrexStreamsCompiler::compile_stream_on_single_core(TrexStream *stream,
- double factor,
- TrexStreamsCompiledObj *obj,
+ uint8_t dp_core_count,
+ std::vector<TrexStreamsCompiledObj *> &objs,
int new_id,
int new_next_id) {
@@ -570,8 +656,21 @@ TrexStreamsCompiler::compile_stream_on_single_core(TrexStream *stream,
dp_stream->m_vm_dp = stream->m_vm_dp->clone();
}
- /* update the core */
- obj->add_compiled_stream(dp_stream);
+ /* update core 0 with the real stream */
+ objs[0]->add_compiled_stream(dp_stream);
+
+
+ /* create dummy streams for the other cores */
+ for (uint8_t i = 1; i < dp_core_count; i++) {
+ TrexStream *null_dp_stream = stream->clone();
+
+ /* fix stream ID */
+ null_dp_stream->fix_dp_stream_id(new_id, new_next_id);
+
+ /* mark as null stream and add */
+ null_dp_stream->set_null_stream(true);
+ objs[i]->add_compiled_stream(null_dp_stream);
+ }
}
/**************************************