summaryrefslogtreecommitdiffstats
path: root/src/stateless/cp/trex_stateless_port.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/stateless/cp/trex_stateless_port.cpp')
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp132
1 files changed, 95 insertions, 37 deletions
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index 95bdca0b..0e45bf0b 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -115,7 +115,7 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration)
/* on start - we can only provide absolute values */
assert(mul.m_op == TrexPortMultiplier::OP_ABS);
- double per_core_mul = calculate_effective_mul(mul);
+ double factor = calculate_effective_factor(mul);
/* fetch all the streams from the table */
vector<TrexStream *> streams;
@@ -123,12 +123,18 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration)
/* compiler it */
- TrexStreamsCompiler compiler;
- TrexStreamsCompiledObj *compiled_obj = new TrexStreamsCompiledObj(m_port_id, per_core_mul);
+ std::vector<TrexStreamsCompiledObj *> compiled_objs;
+ std::string fail_msg;
- bool rc = compiler.compile(streams, *compiled_obj);
+ TrexStreamsCompiler compiler;
+ bool rc = compiler.compile(m_port_id,
+ streams,
+ compiled_objs,
+ get_dp_core_count(),
+ factor,
+ &fail_msg);
if (!rc) {
- throw TrexRpcException("Failed to compile streams");
+ throw TrexRpcException(fail_msg);
}
/* generate a message to all the relevant DP cores to start transmitting */
@@ -137,21 +143,29 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration)
/* mark that DP event of stoppped is possible */
m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, event_id);
- TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_obj, duration);
-
- m_last_all_streams_continues = compiled_obj->get_all_streams_continues();
- m_last_duration =duration;
+ /* update object status */
+ m_factor = factor;
+ m_last_all_streams_continues = compiled_objs[0]->get_all_streams_continues();
+ m_last_duration = duration;
change_state(PORT_STATE_TX);
- send_message_to_dp(start_msg);
+
+ /* update the DP - messages will be freed by the DP */
+ int index = 0;
+ for (auto core_id : m_cores_id_list) {
+
+ TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_objs[index], duration);
+ send_message_to_dp(core_id, start_msg);
+
+ index++;
+ }
+ /* update subscribers */
Json::Value data;
data["port_id"] = m_port_id;
get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STARTED, data);
-
- /* save the per core multiplier for update messages */
- m_current_per_core_m = per_core_mul;
+
}
@@ -179,7 +193,7 @@ TrexStatelessPort::stop_traffic(void) {
/* generate a message to all the relevant DP cores to start transmitting */
TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id);
- send_message_to_dp(stop_msg);
+ send_message_to_all_dp(stop_msg);
change_state(PORT_STATE_STREAMS);
@@ -202,9 +216,9 @@ TrexStatelessPort::pause_traffic(void) {
throw TrexRpcException(" pause is supported when duration is not enable is start command ");
}
- TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpPause(m_port_id);
+ TrexStatelessCpToDpMsgBase *pause_msg = new TrexStatelessDpPause(m_port_id);
- send_message_to_dp(stop_msg);
+ send_message_to_all_dp(pause_msg);
change_state(PORT_STATE_PAUSE);
}
@@ -215,9 +229,9 @@ TrexStatelessPort::resume_traffic(void) {
verify_state(PORT_STATE_PAUSE);
/* generate a message to all the relevant DP cores to start transmitting */
- TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpResume(m_port_id);
+ TrexStatelessCpToDpMsgBase *resume_msg = new TrexStatelessDpResume(m_port_id);
- send_message_to_dp(stop_msg);
+ send_message_to_all_dp(resume_msg);
change_state(PORT_STATE_TX);
}
@@ -230,19 +244,19 @@ TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul) {
verify_state(PORT_STATE_TX | PORT_STATE_PAUSE);
/* generate a message to all the relevant DP cores to start transmitting */
- double new_per_core_m = calculate_effective_mul(mul);
+ double new_factor = calculate_effective_factor(mul);
switch (mul.m_op) {
case TrexPortMultiplier::OP_ABS:
- factor = new_per_core_m / m_current_per_core_m;
+ factor = new_factor / m_factor;
break;
case TrexPortMultiplier::OP_ADD:
- factor = (m_current_per_core_m + new_per_core_m) / m_current_per_core_m;
+ factor = (m_factor + new_factor) / m_factor;
break;
case TrexPortMultiplier::OP_SUB:
- factor = (m_current_per_core_m - new_per_core_m) / m_current_per_core_m;
+ factor = (m_factor - new_factor) / m_factor;
if (factor <= 0) {
throw TrexRpcException("Update request will lower traffic to less than zero");
}
@@ -255,9 +269,9 @@ TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul) {
TrexStatelessCpToDpMsgBase *update_msg = new TrexStatelessDpUpdate(m_port_id, factor);
- send_message_to_dp(update_msg);
+ send_message_to_all_dp(update_msg);
- m_current_per_core_m *= factor;
+ m_factor *= factor;
}
@@ -356,15 +370,22 @@ TrexStatelessPort::encode_stats(Json::Value &port) {
}
void
-TrexStatelessPort::send_message_to_dp(TrexStatelessCpToDpMsgBase *msg) {
+TrexStatelessPort::send_message_to_all_dp(TrexStatelessCpToDpMsgBase *msg) {
for (auto core_id : m_cores_id_list) {
-
- /* send the message to the core */
- CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_id);
- ring->Enqueue((CGenNode *)msg->clone());
+ send_message_to_dp(core_id, msg->clone());
}
+ /* original was not sent - delete it */
+ delete msg;
+}
+
+void
+TrexStatelessPort::send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg) {
+
+ /* send the message to the core */
+ CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_id);
+ ring->Enqueue((CGenNode *)msg);
}
/**
@@ -393,7 +414,7 @@ TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) {
}
uint64_t
-TrexStatelessPort::get_port_speed_bps() {
+TrexStatelessPort::get_port_speed_bps() const {
switch (m_speed) {
case TrexPlatformApi::SPEED_1G:
return (1LLU * 1000 * 1000 * 1000);
@@ -410,11 +431,11 @@ TrexStatelessPort::get_port_speed_bps() {
}
double
-TrexStatelessPort::calculate_effective_mul(const TrexPortMultiplier &mul) {
+TrexStatelessPort::calculate_effective_factor(const TrexPortMultiplier &mul) {
- /* for a simple factor request - calculate the multiplier per core */
+ /* for a simple factor request */
if (mul.m_type == TrexPortMultiplier::MUL_FACTOR) {
- return (mul.m_value / m_cores_id_list.size());
+ return (mul.m_value);
}
/* we now need the graph - generate it if we don't have it (happens once) */
@@ -424,19 +445,19 @@ TrexStatelessPort::calculate_effective_mul(const TrexPortMultiplier &mul) {
switch (mul.m_type) {
case TrexPortMultiplier::MUL_BPS:
- return ( (mul.m_value / m_graph_obj->get_max_bps()) / m_cores_id_list.size());
+ return (mul.m_value / m_graph_obj->get_max_bps());
case TrexPortMultiplier::MUL_PPS:
- return ( (mul.m_value / m_graph_obj->get_max_pps()) / m_cores_id_list.size());
+ return (mul.m_value / m_graph_obj->get_max_pps());
case TrexPortMultiplier::MUL_PERCENTAGE:
/* if abs percentage is from the line speed - otherwise its from the current speed */
if (mul.m_op == TrexPortMultiplier::OP_ABS) {
double required = (mul.m_value / 100.0) * get_port_speed_bps();
- return ( (required / m_graph_obj->get_max_bps()) / m_cores_id_list.size());
+ return (required / m_graph_obj->get_max_bps());
} else {
- return (m_current_per_core_m * (mul.m_value / 100.0));
+ return (m_factor * (mul.m_value / 100.0));
}
default:
@@ -518,3 +539,40 @@ TrexPortMultiplier(const std::string &type_str, const std::string &op_str, doubl
}
+const TrexStreamsGraphObj *
+TrexStatelessPort::validate(void) {
+
+ /* first compile the graph */
+
+ vector<TrexStream *> streams;
+ get_object_list(streams);
+
+ if (streams.size() == 0) {
+ throw TrexException("no streams attached to port");
+ }
+
+ TrexStreamsCompiler compiler;
+ std::vector<TrexStreamsCompiledObj *> compiled_objs;
+
+ std::string fail_msg;
+ bool rc = compiler.compile(m_port_id,
+ streams,
+ compiled_objs,
+ get_dp_core_count(),
+ 1.0,
+ &fail_msg);
+ if (!rc) {
+ throw TrexException(fail_msg);
+ }
+
+ for (auto obj : compiled_objs) {
+ delete obj;
+ }
+
+ /* now create a stream graph */
+ if (!m_graph_obj) {
+ generate_streams_graph();
+ }
+
+ return m_graph_obj;
+}