From 1e93f5b1fc20d7e8fd4b01b4e3c0715095b42e14 Mon Sep 17 00:00:00 2001 From: Ido Barnea Date: Thu, 17 Mar 2016 16:10:11 +0200 Subject: Added CP to RX messages --- linux/ws_main.py | 1 + linux_dpdk/ws_main.py | 1 + src/flow_stat.cpp | 32 +++++- src/flow_stat.h | 8 +- src/main_dpdk.cpp | 8 +- src/msg_manager.cpp | 17 ++-- src/msg_manager.h | 26 +++-- src/stateless/cp/trex_stateless_port.cpp | 7 ++ src/stateless/cp/trex_stateless_port.h | 100 +++++++++--------- src/stateless/dp/trex_stateless_dp_core.cpp | 16 ++- .../messaging/trex_stateless_messaging.cpp | 42 +++++--- src/stateless/messaging/trex_stateless_messaging.h | 76 ++++++++++---- src/stateless/rx/trex_stateless_rx_core.cpp | 112 ++++++++++++++++----- src/stateless/rx/trex_stateless_rx_core.h | 18 +++- 14 files changed, 320 insertions(+), 144 deletions(-) diff --git a/linux/ws_main.py b/linux/ws_main.py index 9422a8ff..58f5b661 100755 --- a/linux/ws_main.py +++ b/linux/ws_main.py @@ -258,6 +258,7 @@ includes_path =''' ../src/pal/linux/ ../src/rpc-server/ ../src/stateless/cp/ ../src/stateless/dp/ + ../src/stateless/rx/ ../src/stateless/messaging/ ../external_libs/json/ ../external_libs/zmq/include/ diff --git a/linux_dpdk/ws_main.py b/linux_dpdk/ws_main.py index faaca0de..2aa06e3b 100755 --- a/linux_dpdk/ws_main.py +++ b/linux_dpdk/ws_main.py @@ -424,6 +424,7 @@ includes_path =''' ../src/pal/linux_dpdk/ ../src/rpc-server/ ../src/stateless/cp/ ../src/stateless/dp/ + ../src/stateless/rx/ ../src/stateless/messaging/ ../external_libs/yaml-cpp/include/ diff --git a/src/flow_stat.cpp b/src/flow_stat.cpp index 01038292..d44a91da 100644 --- a/src/flow_stat.cpp +++ b/src/flow_stat.cpp @@ -25,6 +25,7 @@ #include #include "internal_api/trex_platform_api.h" #include "trex_stateless.h" +#include "trex_stateless_messaging.h" #include "trex_stream.h" #include "flow_stat_parser.h" #include "flow_stat.h" @@ -385,6 +386,8 @@ void CFlowStatHwIdMap::unmap(uint16_t hw_id) { CFlowStatRuleMgr::CFlowStatRuleMgr() { m_api = NULL; m_max_hw_id = -1; + m_num_started_streams = 0; + m_ring_to_rx = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0); } std::ostream& operator<<(std::ostream& os, const CFlowStatRuleMgr& cf) { @@ -488,6 +491,12 @@ int CFlowStatRuleMgr::del_stream(const TrexStream * stream) { return 0; } + if (m_user_id_map.is_started(stream->m_rx_check.m_pg_id)) { + std::cerr << "Error: Trying to delete flow statistics stream " << stream->m_rx_check.m_pg_id + << " which is not stopped." << std::endl; + return -1; + } + return m_user_id_map.del_stream(stream->m_rx_check.m_pg_id); } @@ -556,6 +565,10 @@ int CFlowStatRuleMgr::start_stream(TrexStream * stream, uint16_t &ret_hw_id) { std::cout << "exit:" << __METHOD_NAME__ << " hw_id:" << ret_hw_id << std::endl; #endif + if (m_num_started_streams == 0) { + send_start_stop_msg_to_rx(true); // First transmitting stream. Rx core should start reading packets; + } + m_num_started_streams++; return 0; } @@ -605,6 +618,11 @@ int CFlowStatRuleMgr::stop_stream(const TrexStream * stream) { m_hw_id_map.unmap(hw_id); } } + m_num_started_streams--; + assert (m_num_started_streams >= 0); + if (m_num_started_streams == 0) { + send_start_stop_msg_to_rx(false); // No more transmittig streams. Rx core shoulde get into idle loop. + } return 0; } @@ -618,6 +636,18 @@ int CFlowStatRuleMgr::get_active_pgids(flow_stat_active_t &result) { return 0; } +extern bool rx_should_stop; +void CFlowStatRuleMgr::send_start_stop_msg_to_rx(bool is_start) { + TrexStatelessCpToRxMsgBase *msg; + + if (is_start) { + msg = new TrexRxStartMsg(); + } else { + msg = new TrexRxStopMsg(); + } + m_ring_to_rx->Enqueue((CGenNode *)msg); +} + // return false if no counters changed since last run. true otherwise bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) { rx_per_flow_t rx_stats[MAX_FLOW_STATS]; @@ -627,7 +657,7 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) { root["name"] = "flow_stats"; root["type"] = 0; - + if (baseline) { root["baseline"] = true; } diff --git a/src/flow_stat.h b/src/flow_stat.h index 0fb4fede..83f076de 100644 --- a/src/flow_stat.h +++ b/src/flow_stat.h @@ -26,6 +26,7 @@ #include #include "trex_defs.h" #include "trex_stream.h" +#include "msg_manager.h" #include // range reserved for rx stat measurement is from IP_ID_RESERVE_BASE to 0xffff @@ -144,8 +145,8 @@ class CFlowStatUserIdInfo { tx_per_flow_t m_tx_counter_base[TREX_MAX_PORTS]; uint16_t m_hw_id; // Associated hw id. UINT16_MAX if no associated hw id. uint8_t m_proto; // protocol (UDP, TCP, other), associated with this user id. - uint8_t m_ref_count; // How many streams with this ref count exists - uint8_t m_trans_ref_count; // How many streams with this ref count currently transmit + uint8_t m_ref_count; // How many streams with this user id exists + uint8_t m_trans_ref_count; // How many streams with this user id currently transmit bool m_was_sent; // Did we send this info to clients once? }; @@ -208,6 +209,7 @@ class CFlowStatRuleMgr { private: int compile_stream(const TrexStream * stream, Cxl710Parser &parser); int add_hw_rule(uint16_t hw_id, uint8_t proto); + void send_start_stop_msg_to_rx(bool is_start); private: CFlowStatHwIdMap m_hw_id_map; // map hw ids to user ids @@ -215,6 +217,8 @@ class CFlowStatRuleMgr { uint8_t m_num_ports; // How many ports are being used const TrexPlatformApi *m_api; int m_max_hw_id; // max hw id we ever used + uint32_t m_num_started_streams; // How many started (transmitting) streams we have + CNodeRing *m_ring_to_rx; // handle for sending messages to Rx core }; #endif diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index 4fc048ff..9e690951 100644 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -2873,14 +2873,14 @@ void CGlobalTRex::rx_sl_configure(void) { if ( get_vm_one_queue_enable() ) { #if 0 - ??? + /// what to do here ??? /* vm mode, indirect queues */ for (i=0; i < m_max_ports; i++) { CMessagingManager * rx_dp = CMsgIns::Ins()->getRxDp(); uint8_t thread_id = (i >> 1); - CNodeRing * r = rx_dp->getRingCpToDp(thread_id); ///??? should be rx to dp? - m_latency_vm_vports[i].Create((uint8_t)i,r,&m_mg); - rx_sl_cfg.m_ports[i] =&m_latency_vm_vports[i]; + CNodeRing * r = rx_dp->getRingCpToDp(thread_id); + m_latency_vm_vports[i].Create((uint8_t)i, r, &m_mg); + rx_sl_cfg.m_ports[i] = &m_latency_vm_vports[i]; } #endif } else { diff --git a/src/msg_manager.cpp b/src/msg_manager.cpp index 9ade1bfc..7e39391a 100755 --- a/src/msg_manager.cpp +++ b/src/msg_manager.cpp @@ -4,7 +4,7 @@ */ /* -Copyright (c) 2015-2015 Cisco Systems, Inc. +Copyright (c) 2015-2016 Cisco Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -65,12 +65,12 @@ void CMessagingManager::Delete(){ delete [] m_dp_to_cp; m_dp_to_cp = NULL; } - + if (m_cp_to_dp) { delete [] m_cp_to_dp; m_cp_to_dp = NULL; } - + } CNodeRing * CMessagingManager::getRingCpToDp(uint8_t thread_id){ @@ -84,7 +84,6 @@ CNodeRing * CMessagingManager::getRingDpToCp(uint8_t thread_id){ } - void CMsgIns::Free(){ if (m_ins) { m_ins->Delete(); @@ -107,6 +106,11 @@ bool CMsgIns::Create(uint8_t num_threads){ if (!res) { return (res); } + res = m_cp_rx.Create(1, "cp_rx"); + if (!res) { + return (res); + } + return (m_rx_dp.Create(num_threads,"rx_dp")); } @@ -114,9 +118,8 @@ bool CMsgIns::Create(uint8_t num_threads){ void CMsgIns::Delete(){ m_cp_dp.Delete(); m_rx_dp.Delete(); + m_cp_rx.Delete(); } -CMsgIns * CMsgIns::m_ins=0; - - +CMsgIns * CMsgIns::m_ins=0; diff --git a/src/msg_manager.h b/src/msg_manager.h index 0390ce10..de11edbd 100755 --- a/src/msg_manager.h +++ b/src/msg_manager.h @@ -6,7 +6,7 @@ */ /* -Copyright (c) 2015-2015 Cisco Systems, Inc. +Copyright (c) 2015-2016 Cisco Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -40,37 +40,37 @@ public: /* -e.g DP with 4 threads -will look like this +e.g DP with 4 threads +will look like this - cp_to_dp + cp_to_dp master :push dpx : pop - + - --> dp0 cp - --> dp1 - --> dp2 - --> dp3 - dp_to_cp + dp_to_cp cp : pop dpx : push - + <- -- dp0 cp <- -- dp1 <- -- dp2 <- -- dp3 -*/ +*/ class CGenNode ; typedef CTRingSp CNodeRing; -/* CP == latency thread +/* CP == latency thread DP == traffic pkt generator */ class CMessagingManager { public: @@ -83,6 +83,7 @@ public: void Delete(); CNodeRing * getRingCpToDp(uint8_t thread_id); CNodeRing * getRingDpToCp(uint8_t thread_id); + CNodeRing * getRingCpToRx(); uint8_t get_num_threads(){ return (m_num_dp_threads); } @@ -106,6 +107,9 @@ public: CMessagingManager * getCpDp(){ return (&m_cp_dp); } + CMessagingManager * getCpRx(){ + return (&m_cp_rx); + } uint8_t get_num_threads(){ return (m_rx_dp.get_num_threads()); @@ -114,11 +118,11 @@ public: private: CMessagingManager m_rx_dp; CMessagingManager m_cp_dp; - + CMessagingManager m_cp_rx; private: /* one instance */ - static CMsgIns * m_ins; + static CMsgIns * m_ins; }; #endif diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index 5947aaf7..90589d7a 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -473,6 +473,13 @@ TrexStatelessPort::send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBas ring->Enqueue((CGenNode *)msg); } +void +TrexStatelessPort::send_message_to_rx(TrexStatelessCpToRxMsgBase *msg) { + + /* send the message to the core */ + CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0); + ring->Enqueue((CGenNode *)msg); +} uint64_t TrexStatelessPort::get_port_speed_bps() const { diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index d3c4dcb9..7e1838d4 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -4,7 +4,7 @@ */ /* -Copyright (c) 2015-2015 Cisco Systems, Inc. +Copyright (c) 2015-2016 Cisco Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -21,20 +21,21 @@ limitations under the License. #ifndef __TREX_STATELESS_PORT_H__ #define __TREX_STATELESS_PORT_H__ -#include -#include -#include +#include "internal_api/trex_platform_api.h" +#include "trex_dp_port_events.h" +#include "trex_stream.h" class TrexStatelessCpToDpMsgBase; +class TrexStatelessCpToRxMsgBase; class TrexStreamsGraphObj; class TrexPortMultiplier; -/** +/** * TRex port owner can perform * write commands * while port is owned - others can * do read only commands - * + * */ class TrexPortOwner { public: @@ -92,7 +93,7 @@ private: /* handler genereated internally */ std::string m_handler; - + /* seed for generating random values */ unsigned int m_seed; @@ -106,7 +107,7 @@ class AsyncStopEvent; /** * describes a stateless port - * + * * @author imarom (31-Aug-15) */ class TrexStatelessPort { @@ -137,9 +138,9 @@ public: RC_ERR_FAILED_TO_COMPILE_STREAMS }; - + TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api); - + ~TrexStatelessPort(); /** @@ -155,11 +156,11 @@ public: void release(void); /** - * validate the state of the port before start - * it will return a stream graph - * containing information about the streams - * configured on this port - * + * validate the state of the port before start + * it will return a stream graph + * containing information about the streams + * configured on this port + * * on error it throws TrexException */ const TrexStreamsGraphObj *validate(void); @@ -190,13 +191,13 @@ public: /** * update current traffic on port - * + * */ void update_traffic(const TrexPortMultiplier &mul, bool force); /** * get the port state - * + * */ port_state_e get_state() const { return m_port_state; @@ -204,23 +205,23 @@ public: /** * port state as string - * + * */ std::string get_state_as_string() const; /** * the the max stream id currently assigned - * + * */ int get_max_stream_id() const; /** * fill up properties of the port - * + * * @author imarom (16-Sep-15) - * - * @param driver - * @param speed + * + * @param driver + * @param speed */ void get_properties(std::string &driver, TrexPlatformApi::driver_speed_e &speed); @@ -237,7 +238,7 @@ public: /** * delegators - * + * */ void add_stream(TrexStream *stream); @@ -267,7 +268,7 @@ public: /** * returns the number of DP cores linked to this port - * + * */ uint8_t get_dp_core_count() { return m_cores_id_list.size(); @@ -275,7 +276,7 @@ public: /** * returns the traffic multiplier currently being used by the DP - * + * */ double get_multiplier() { return (m_factor); @@ -283,13 +284,13 @@ public: /** * get port speed in bits per second - * + * */ uint64_t get_port_speed_bps() const; /** * return RX caps - * + * */ int get_rx_caps() const { return m_rx_caps; @@ -300,12 +301,12 @@ public: } /** - * return true if port adds CRC to a packet (not occurs for - * VNICs) - * + * return true if port adds CRC to a packet (not occurs for + * VNICs) + * * @author imarom (24-Feb-16) - * - * @return bool + * + * @return bool */ bool has_crc_added() const { return m_api_info.has_crc; @@ -318,9 +319,9 @@ public: /** * get the port effective rate (on a started / paused port) - * + * * @author imarom (07-Jan-16) - * + * */ void get_port_effective_rate(double &pps, double &bps_L1, @@ -330,8 +331,8 @@ public: /** * set port promiscuous on/off - * - * @param enabled + * + * @param enabled */ void set_promiscuous(bool enabled); bool get_promiscuous(); @@ -357,40 +358,45 @@ private: /** * send message to all cores using duplicate - * + * */ void send_message_to_all_dp(TrexStatelessCpToDpMsgBase *msg); /** * send message to specific DP core - * + * */ void send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg); + /** + * send message to specific RX core + * + */ + void send_message_to_rx(TrexStatelessCpToRxMsgBase *msg); /** * when a port stops, perform various actions - * + * */ void common_port_stop_actions(bool async); /** * calculate effective M per core - * + * */ double calculate_effective_factor(const TrexPortMultiplier &mul, bool force = false); double calculate_effective_factor_internal(const TrexPortMultiplier &mul); - + /** * generates a graph of streams graph - * + * */ void generate_streams_graph(); /** * dispose of it - * + * * @author imarom (26-Nov-15) */ void delete_streams_graph(); @@ -426,7 +432,7 @@ private: /** * port multiplier object - * + * */ class TrexPortMultiplier { public: @@ -443,8 +449,8 @@ public: }; /** - * multiplier can be absolute value - * increment value or subtract value + * multiplier can be absolute value + * increment value or subtract value */ enum mul_op_e { OP_ABS, diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index f8d6d828..ba25f61d 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -5,7 +5,7 @@ */ /* -Copyright (c) 2015-2015 Cisco Systems, Inc. +Copyright (c) 2015-2016 Cisco Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -19,14 +19,12 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include -#include -#include -#include -#include - -#include - +#include "bp_sim.h" +#include "trex_stateless_dp_core.h" +#include "trex_stateless_messaging.h" +#include "trex_stream.h" +#include "trex_stream_node.h" +#include "trex_streams_compiler.h" void CDpOneStream::Delete(CFlowGenListPerThread * core){ assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE); diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index 333aec88..3468d622 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -5,7 +5,7 @@ */ /* -Copyright (c) 2015-2015 Cisco Systems, Inc. +Copyright (c) 2015-2016 Cisco Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -19,17 +19,18 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include -#include -#include -#include -#include - #include +#include "trex_stateless_messaging.h" +#include "trex_stateless_dp_core.h" +#include "trex_stateless_rx_core.h" +#include "trex_streams_compiler.h" +#include "trex_stateless.h" +#include "bp_sim.h" + /************************* start traffic message - ************************/ + ************************/ TrexStatelessDpStart::TrexStatelessDpStart(uint8_t port_id, int event_id, TrexStreamsCompiledObj *obj, double duration) { m_port_id = port_id; m_event_id = event_id; @@ -40,7 +41,7 @@ TrexStatelessDpStart::TrexStatelessDpStart(uint8_t port_id, int event_id, TrexSt /** * clone for DP start message - * + * */ TrexStatelessCpToDpMsgBase * TrexStatelessDpStart::clone() { @@ -69,7 +70,7 @@ TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) { /************************* stop traffic message - ************************/ + ************************/ bool TrexStatelessDpStop::handle(TrexStatelessDpCore *dp_core) { @@ -114,7 +115,7 @@ bool TrexStatelessDpResume::handle(TrexStatelessDpCore *dp_core){ /** * clone for DP stop message - * + * */ TrexStatelessCpToDpMsgBase * TrexStatelessDpStop::clone() { @@ -130,7 +131,7 @@ TrexStatelessDpStop::clone() { -TrexStatelessCpToDpMsgBase * +TrexStatelessCpToDpMsgBase * TrexStatelessDpQuit::clone(){ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpQuit(); @@ -140,7 +141,7 @@ TrexStatelessDpQuit::clone(){ bool TrexStatelessDpQuit::handle(TrexStatelessDpCore *dp_core){ - + /* quit */ dp_core->quit_main_loop(); return (true); @@ -155,7 +156,7 @@ bool TrexStatelessDpCanQuit::handle(TrexStatelessDpCore *dp_core){ return (true); } -TrexStatelessCpToDpMsgBase * +TrexStatelessCpToDpMsgBase * TrexStatelessDpCanQuit::clone(){ TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpCanQuit(); @@ -165,7 +166,7 @@ TrexStatelessDpCanQuit::clone(){ /************************* update traffic message - ************************/ + ************************/ bool TrexStatelessDpUpdate::handle(TrexStatelessDpCore *dp_core) { dp_core->update_traffic(m_port_id, m_factor); @@ -207,3 +208,14 @@ TrexDpPortEventMsg::handle() { return (true); } +/************************* messages from CP to RX **********************/ +bool TrexRxStartMsg::handle (CRxCoreStateless *rx_core) { + rx_core->work(); + return true; +} + +/************************* messages from CP to RX **********************/ +bool TrexRxStopMsg::handle (CRxCoreStateless *rx_core) { + rx_core->idle(); + return true; +} diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index dda086b7..b7e8fd3f 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -5,7 +5,7 @@ */ /* -Copyright (c) 2015-2015 Cisco Systems, Inc. +Copyright (c) 2015-2016 Cisco Systems, Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -22,16 +22,17 @@ limitations under the License. #ifndef __TREX_STATELESS_MESSAGING_H__ #define __TREX_STATELESS_MESSAGING_H__ -#include -#include +#include "msg_manager.h" +#include "trex_dp_port_events.h" class TrexStatelessDpCore; +class CRxCoreStateless; class TrexStreamsCompiledObj; class CFlowGenListPerThread; /** * defines the base class for CP to DP messages - * + * * @author imarom (27-Oct-15) */ class TrexStatelessCpToDpMsgBase { @@ -49,7 +50,7 @@ public: /** * clone the current message - * + * */ virtual TrexStatelessCpToDpMsgBase * clone() = 0; @@ -76,7 +77,7 @@ protected: /** * a message to start traffic - * + * * @author imarom (27-Oct-15) */ class TrexStatelessDpStart : public TrexStatelessCpToDpMsgBase { @@ -137,7 +138,7 @@ private: /** * a message to stop traffic - * + * * @author imarom (27-Oct-15) */ class TrexStatelessDpStop : public TrexStatelessCpToDpMsgBase { @@ -191,9 +192,9 @@ private: }; /** - * a message to Quit the datapath traffic. support only stateless for now - * - * @author hhaim + * a message to Quit the datapath traffic. support only stateless for now + * + * @author hhaim */ class TrexStatelessDpQuit : public TrexStatelessCpToDpMsgBase { public: @@ -209,9 +210,9 @@ public: }; /** - * a message to check if both port are idel and exit - * - * @author hhaim + * a message to check if both port are idel and exit + * + * @author hhaim */ class TrexStatelessDpCanQuit : public TrexStatelessCpToDpMsgBase { public: @@ -247,7 +248,7 @@ private: /** * barrier message for DP core - * + * */ class TrexStatelessDpBarrier : public TrexStatelessCpToDpMsgBase { public: @@ -270,7 +271,7 @@ private: /** * defines the base class for CP to DP messages - * + * * @author imarom (27-Oct-15) */ class TrexStatelessDpToCpMsgBase { @@ -284,7 +285,7 @@ public: /** * virtual function to handle a message - * + * */ virtual bool handle() = 0; @@ -295,9 +296,9 @@ public: /** - * a message indicating an event has happened on a port at the - * DP - * + * a message indicating an event has happened on a port at the + * DP + * */ class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase { public: @@ -326,8 +327,41 @@ private: int m_thread_id; uint8_t m_port_id; int m_event_id; - + }; -#endif /* __TREX_STATELESS_MESSAGING_H__ */ +/************************* messages from CP to RX **********************/ + +/** + * defines the base class for CP to RX messages + * + */ +class TrexStatelessCpToRxMsgBase { +public: + TrexStatelessCpToRxMsgBase() { + } + + virtual ~TrexStatelessCpToRxMsgBase() { + } + + /** + * virtual function to handle a message + * + */ + virtual bool handle (CRxCoreStateless *rx_core) = 0; + + /* no copy constructor */ + TrexStatelessCpToRxMsgBase(TrexStatelessCpToRxMsgBase &) = delete; + +}; + +class TrexRxStartMsg : public TrexStatelessCpToRxMsgBase { + bool handle (CRxCoreStateless *rx_core); +}; + +class TrexRxStopMsg : public TrexStatelessCpToRxMsgBase { + bool handle (CRxCoreStateless *rx_core); +}; + +#endif /* __TREX_STATELESS_MESSAGING_H__ */ diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index a108bef3..86711189 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -1,48 +1,110 @@ #include -#include "latency.h" +#include "bp_sim.h" #include "flow_stat_parser.h" -#include "stateless/rx/trex_stateless_rx_core.h" - +#include "latency.h" +#include "trex_stateless_messaging.h" +#include "trex_stateless_rx_core.h" void CRxCoreStateless::create(const CRxSlCfg &cfg) { m_max_ports = cfg.m_max_ports; + CMessagingManager * cp_rx = CMsgIns::Ins()->getCpRx(); + + m_ring_from_cp = cp_rx->getRingCpToDp(0); + m_ring_to_cp = cp_rx->getRingDpToCp(0); + m_state = STATE_IDLE; + for (int i = 0; i < m_max_ports; i++) { CLatencyManagerPerPort * lp = &m_ports[i]; - // CCPortLatency * lpo = &m_ports[swap_port(i)].m_port; - lp->m_io = cfg.m_ports[i]; - /* lp->m_port.Create(this, - i, - m_pkt_gen.get_payload_offset(), - m_pkt_gen.get_l4_offset(), - m_pkt_gen.get_pkt_size(),lpo );???*/ } +} +void CRxCoreStateless::handle_cp_msg(TrexStatelessCpToRxMsgBase *msg) { + msg->handle(this); + delete msg; } -void CRxCoreStateless::start() { - static int count = 0; - static int i = 0; - while (1) { - count += try_rx(); - i++; - if (i == 100000000) { - i = 0; - //??? remove - printf("counter:%d port0:[%u], port1:[%u]\n", count, m_ports[0].m_port.m_rx_pg_pkts[0], m_ports[1].m_port.m_rx_pg_pkts[1]); +bool CRxCoreStateless::periodic_check_for_cp_messages() { + /* fast path */ + if ( likely ( m_ring_from_cp->isEmpty() ) ) { + return false; + } + + while ( true ) { + CGenNode * node = NULL; + + if (m_ring_from_cp->Dequeue(node) != 0) { + break; + } + assert(node); + TrexStatelessCpToRxMsgBase * msg = (TrexStatelessCpToRxMsgBase *)node; + handle_cp_msg(msg); + } + + return true; + +} + +void CRxCoreStateless::idle_state_loop() { + const int SHORT_DELAY_MS = 2; + const int LONG_DELAY_MS = 50; + const int DEEP_SLEEP_LIMIT = 2000; + + int counter = 0; + + while (m_state == STATE_IDLE) { + bool had_msg = periodic_check_for_cp_messages(); + if (had_msg) { + counter = 0; + continue; + } + + /* enter deep sleep only if enough time had passed */ + if (counter < DEEP_SLEEP_LIMIT) { + delay(SHORT_DELAY_MS); + counter++; + } else { + delay(LONG_DELAY_MS); } } } -// ??? temp try +void CRxCoreStateless::start() { + static int count = 0; + static int i = 0; + + while (true) { + if (m_state == STATE_WORKING) { + count += try_rx(); + i++; + if (i == 100) { + i = 0; + // if no packets in 100 cycles, sleep for a while to spare the cpu + if (count == 0) { + delay(1); + } + count = 0; + periodic_check_for_cp_messages(); + } + } else { + idle_state_loop(); + } +#if 0 + ??? do we need this? + if ( m_core->is_terminated_by_master() ) { + break; + } +#endif + } +} + int CRxCoreStateless::try_rx() { rte_mbuf_t * rx_pkts[64]; int i, total_pkts = 0; for (i = 0; i < m_max_ports; i++) { CLatencyManagerPerPort * lp = &m_ports[i]; rte_mbuf_t * m; - //m_cpu_dp_u.start_work(); /* try to read 64 packets clean up the queue */ uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64); total_pkts += cnt_p; @@ -63,10 +125,8 @@ int CRxCoreStateless::try_rx() { } rte_pktmbuf_free(m); } - /* commit only if there was work to do ! */ - //m_cpu_dp_u.commit(); //??? what's this? - }/* if work */ - }// all ports + }/* if work */ + }// all ports return total_pkts; } diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index 942ddbd6..eecc8033 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -23,6 +23,8 @@ limitations under the License. #include #include "latency.h" +class TrexStatelessCpToRxMsgBase; + class CRxSlCfg { public: CRxSlCfg (){ @@ -37,19 +39,33 @@ class CRxSlCfg { }; class CRxCoreStateless { + enum state_e { + STATE_IDLE, + STATE_WORKING, + }; + public: void start(); void create(const CRxSlCfg &cfg); void reset_rx_stats(uint8_t port_id); int get_rx_stats(uint8_t port_id, uint32_t *pkts, uint32_t *prev_pkts , uint32_t *bytes, uint32_t *prev_bytes, int min, int max); + void work() {m_state = STATE_WORKING;} + void idle() {m_state = STATE_IDLE;} private: + void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg); + bool periodic_check_for_cp_messages(); + void idle_state_loop(); int try_rx(); bool is_flow_stat_id(uint16_t id); uint16_t get_hw_id(uint16_t id); - + private: uint32_t m_max_ports; + bool m_has_streams; CLatencyManagerPerPort m_ports[TREX_MAX_PORTS]; + state_e m_state; /* state of all ports */ + CNodeRing *m_ring_from_cp; + CNodeRing *m_ring_to_cp; }; #endif -- cgit 1.2.3-korg