diff options
Diffstat (limited to 'src/stateless/cp/trex_stateless_port.cpp')
-rw-r--r-- | src/stateless/cp/trex_stateless_port.cpp | 132 |
1 files changed, 95 insertions, 37 deletions
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index 95bdca0b..0e45bf0b 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -115,7 +115,7 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration) /* on start - we can only provide absolute values */ assert(mul.m_op == TrexPortMultiplier::OP_ABS); - double per_core_mul = calculate_effective_mul(mul); + double factor = calculate_effective_factor(mul); /* fetch all the streams from the table */ vector<TrexStream *> streams; @@ -123,12 +123,18 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration) /* compiler it */ - TrexStreamsCompiler compiler; - TrexStreamsCompiledObj *compiled_obj = new TrexStreamsCompiledObj(m_port_id, per_core_mul); + std::vector<TrexStreamsCompiledObj *> compiled_objs; + std::string fail_msg; - bool rc = compiler.compile(streams, *compiled_obj); + TrexStreamsCompiler compiler; + bool rc = compiler.compile(m_port_id, + streams, + compiled_objs, + get_dp_core_count(), + factor, + &fail_msg); if (!rc) { - throw TrexRpcException("Failed to compile streams"); + throw TrexRpcException(fail_msg); } /* generate a message to all the relevant DP cores to start transmitting */ @@ -137,21 +143,29 @@ TrexStatelessPort::start_traffic(const TrexPortMultiplier &mul, double duration) /* mark that DP event of stoppped is possible */ m_dp_events.wait_for_event(TrexDpPortEvent::EVENT_STOP, event_id); - TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_obj, duration); - - m_last_all_streams_continues = compiled_obj->get_all_streams_continues(); - m_last_duration =duration; + /* update object status */ + m_factor = factor; + m_last_all_streams_continues = compiled_objs[0]->get_all_streams_continues(); + m_last_duration = duration; change_state(PORT_STATE_TX); - send_message_to_dp(start_msg); + + /* update the DP - messages will be freed by the DP */ + int index = 0; + for (auto core_id : m_cores_id_list) { + + TrexStatelessCpToDpMsgBase *start_msg = new TrexStatelessDpStart(m_port_id, event_id, compiled_objs[index], duration); + send_message_to_dp(core_id, start_msg); + + index++; + } + /* update subscribers */ Json::Value data; data["port_id"] = m_port_id; get_stateless_obj()->get_publisher()->publish_event(TrexPublisher::EVENT_PORT_STARTED, data); - - /* save the per core multiplier for update messages */ - m_current_per_core_m = per_core_mul; + } @@ -179,7 +193,7 @@ TrexStatelessPort::stop_traffic(void) { /* generate a message to all the relevant DP cores to start transmitting */ TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpStop(m_port_id); - send_message_to_dp(stop_msg); + send_message_to_all_dp(stop_msg); change_state(PORT_STATE_STREAMS); @@ -202,9 +216,9 @@ TrexStatelessPort::pause_traffic(void) { throw TrexRpcException(" pause is supported when duration is not enable is start command "); } - TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpPause(m_port_id); + TrexStatelessCpToDpMsgBase *pause_msg = new TrexStatelessDpPause(m_port_id); - send_message_to_dp(stop_msg); + send_message_to_all_dp(pause_msg); change_state(PORT_STATE_PAUSE); } @@ -215,9 +229,9 @@ TrexStatelessPort::resume_traffic(void) { verify_state(PORT_STATE_PAUSE); /* generate a message to all the relevant DP cores to start transmitting */ - TrexStatelessCpToDpMsgBase *stop_msg = new TrexStatelessDpResume(m_port_id); + TrexStatelessCpToDpMsgBase *resume_msg = new TrexStatelessDpResume(m_port_id); - send_message_to_dp(stop_msg); + send_message_to_all_dp(resume_msg); change_state(PORT_STATE_TX); } @@ -230,19 +244,19 @@ TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul) { verify_state(PORT_STATE_TX | PORT_STATE_PAUSE); /* generate a message to all the relevant DP cores to start transmitting */ - double new_per_core_m = calculate_effective_mul(mul); + double new_factor = calculate_effective_factor(mul); switch (mul.m_op) { case TrexPortMultiplier::OP_ABS: - factor = new_per_core_m / m_current_per_core_m; + factor = new_factor / m_factor; break; case TrexPortMultiplier::OP_ADD: - factor = (m_current_per_core_m + new_per_core_m) / m_current_per_core_m; + factor = (m_factor + new_factor) / m_factor; break; case TrexPortMultiplier::OP_SUB: - factor = (m_current_per_core_m - new_per_core_m) / m_current_per_core_m; + factor = (m_factor - new_factor) / m_factor; if (factor <= 0) { throw TrexRpcException("Update request will lower traffic to less than zero"); } @@ -255,9 +269,9 @@ TrexStatelessPort::update_traffic(const TrexPortMultiplier &mul) { TrexStatelessCpToDpMsgBase *update_msg = new TrexStatelessDpUpdate(m_port_id, factor); - send_message_to_dp(update_msg); + send_message_to_all_dp(update_msg); - m_current_per_core_m *= factor; + m_factor *= factor; } @@ -356,15 +370,22 @@ TrexStatelessPort::encode_stats(Json::Value &port) { } void -TrexStatelessPort::send_message_to_dp(TrexStatelessCpToDpMsgBase *msg) { +TrexStatelessPort::send_message_to_all_dp(TrexStatelessCpToDpMsgBase *msg) { for (auto core_id : m_cores_id_list) { - - /* send the message to the core */ - CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_id); - ring->Enqueue((CGenNode *)msg->clone()); + send_message_to_dp(core_id, msg->clone()); } + /* original was not sent - delete it */ + delete msg; +} + +void +TrexStatelessPort::send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg) { + + /* send the message to the core */ + CNodeRing *ring = CMsgIns::Ins()->getCpDp()->getRingCpToDp(core_id); + ring->Enqueue((CGenNode *)msg); } /** @@ -393,7 +414,7 @@ TrexStatelessPort::on_dp_event_occured(TrexDpPortEvent::event_e event_type) { } uint64_t -TrexStatelessPort::get_port_speed_bps() { +TrexStatelessPort::get_port_speed_bps() const { switch (m_speed) { case TrexPlatformApi::SPEED_1G: return (1LLU * 1000 * 1000 * 1000); @@ -410,11 +431,11 @@ TrexStatelessPort::get_port_speed_bps() { } double -TrexStatelessPort::calculate_effective_mul(const TrexPortMultiplier &mul) { +TrexStatelessPort::calculate_effective_factor(const TrexPortMultiplier &mul) { - /* for a simple factor request - calculate the multiplier per core */ + /* for a simple factor request */ if (mul.m_type == TrexPortMultiplier::MUL_FACTOR) { - return (mul.m_value / m_cores_id_list.size()); + return (mul.m_value); } /* we now need the graph - generate it if we don't have it (happens once) */ @@ -424,19 +445,19 @@ TrexStatelessPort::calculate_effective_mul(const TrexPortMultiplier &mul) { switch (mul.m_type) { case TrexPortMultiplier::MUL_BPS: - return ( (mul.m_value / m_graph_obj->get_max_bps()) / m_cores_id_list.size()); + return (mul.m_value / m_graph_obj->get_max_bps()); case TrexPortMultiplier::MUL_PPS: - return ( (mul.m_value / m_graph_obj->get_max_pps()) / m_cores_id_list.size()); + return (mul.m_value / m_graph_obj->get_max_pps()); case TrexPortMultiplier::MUL_PERCENTAGE: /* if abs percentage is from the line speed - otherwise its from the current speed */ if (mul.m_op == TrexPortMultiplier::OP_ABS) { double required = (mul.m_value / 100.0) * get_port_speed_bps(); - return ( (required / m_graph_obj->get_max_bps()) / m_cores_id_list.size()); + return (required / m_graph_obj->get_max_bps()); } else { - return (m_current_per_core_m * (mul.m_value / 100.0)); + return (m_factor * (mul.m_value / 100.0)); } default: @@ -518,3 +539,40 @@ TrexPortMultiplier(const std::string &type_str, const std::string &op_str, doubl } +const TrexStreamsGraphObj * +TrexStatelessPort::validate(void) { + + /* first compile the graph */ + + vector<TrexStream *> streams; + get_object_list(streams); + + if (streams.size() == 0) { + throw TrexException("no streams attached to port"); + } + + TrexStreamsCompiler compiler; + std::vector<TrexStreamsCompiledObj *> compiled_objs; + + std::string fail_msg; + bool rc = compiler.compile(m_port_id, + streams, + compiled_objs, + get_dp_core_count(), + 1.0, + &fail_msg); + if (!rc) { + throw TrexException(fail_msg); + } + + for (auto obj : compiled_objs) { + delete obj; + } + + /* now create a stream graph */ + if (!m_graph_obj) { + generate_streams_graph(); + } + + return m_graph_obj; +} |