diff options
Diffstat (limited to 'src')
-rwxr-xr-x | src/common/basic_utils.cpp | 6 | ||||
-rwxr-xr-x | src/common/basic_utils.h | 2 | ||||
-rw-r--r-- | src/flow_stat.cpp | 20 | ||||
-rw-r--r-- | src/main_dpdk.cpp | 2 | ||||
-rw-r--r-- | src/rpc-server/commands/trex_rpc_cmd_general.cpp | 36 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.h | 30 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.cpp | 2 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.h | 17 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_core.cpp | 40 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_core.h | 16 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_port_mngr.cpp | 25 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_port_mngr.h | 19 |
12 files changed, 138 insertions, 77 deletions
diff --git a/src/common/basic_utils.cpp b/src/common/basic_utils.cpp index dfd3b183..fded49ec 100755 --- a/src/common/basic_utils.cpp +++ b/src/common/basic_utils.cpp @@ -273,18 +273,18 @@ void utl_set_coredump_size(long size, bool map_huge_pages) { fclose(fp); } -uint32_t utl_ipv4_to_uint32(const char *ipv4_str, uint32_t &ipv4_num) { +bool utl_ipv4_to_uint32(const char *ipv4_str, uint32_t &ipv4_num) { uint32_t tmp; int rc = my_inet_pton4(ipv4_str, (unsigned char *)&tmp); if (!rc) { - return (0); + return false; } ipv4_num = PAL_NTOHL(tmp); - return (1); + return true; } std::string utl_uint32_to_ipv4(uint32_t ipv4_addr) { diff --git a/src/common/basic_utils.h b/src/common/basic_utils.h index c30457d7..36f9db85 100755 --- a/src/common/basic_utils.h +++ b/src/common/basic_utils.h @@ -101,7 +101,7 @@ std::string utl_generate_random_str(unsigned int &seed, int len); */ void utl_set_coredump_size(long size, bool map_huge_pages = false); -uint32_t utl_ipv4_to_uint32(const char *ipv4_str, uint32_t &ipv4_num); +bool utl_ipv4_to_uint32(const char *ipv4_str, uint32_t &ipv4_num); std::string utl_uint32_to_ipv4(uint32_t ipv4_addr); #endif diff --git a/src/flow_stat.cpp b/src/flow_stat.cpp index dae29795..ba125df2 100644 --- a/src/flow_stat.cpp +++ b/src/flow_stat.cpp @@ -802,11 +802,15 @@ int CFlowStatRuleMgr::start_stream(TrexStream * stream) { #endif if (m_num_started_streams == 0) { + send_start_stop_msg_to_rx(true); // First transmitting stream. Rx core should start reading packets; + assert(m_rx_core->is_working()); + //also good time to zero global counters memset(m_rx_cant_count_err, 0, sizeof(m_rx_cant_count_err)); memset(m_tx_cant_count_err, 0, sizeof(m_tx_cant_count_err)); + #if 0 // wait to make sure that message is acknowledged. RX core might be in deep sleep mode, and we want to // start transmitting packets only after it is working, otherwise, packets will get lost. if (m_rx_core) { // in simulation, m_rx_core will be NULL @@ -819,6 +823,8 @@ int CFlowStatRuleMgr::start_stream(TrexStream * stream) { } } } + #endif + } else { // make sure rx core is working. If not, we got really confused somehow. if (m_rx_core) @@ -966,13 +972,21 @@ int CFlowStatRuleMgr::set_mode(enum flow_stat_mode_e mode) { extern bool rx_should_stop; void CFlowStatRuleMgr::send_start_stop_msg_to_rx(bool is_start) { TrexStatelessCpToRxMsgBase *msg; - + if (is_start) { - msg = new TrexStatelessRxEnableLatency(); + static MsgReply<bool> reply; + reply.reset(); + + msg = new TrexStatelessRxEnableLatency(reply); + m_ring_to_rx->Enqueue((CGenNode *)msg); + + /* hold until message was ack'ed - otherwise we might lose packets */ + reply.wait_for_reply(); + } else { msg = new TrexStatelessRxDisableLatency(); + m_ring_to_rx->Enqueue((CGenNode *)msg); } - m_ring_to_rx->Enqueue((CGenNode *)msg); } // return false if no counters changed since last run. true otherwise diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index 2783182a..c02d6760 100644 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -5744,7 +5744,7 @@ CFlowStatParser *CTRexExtendedDriverBase::get_flow_stat_parser() { // in 1G we need to wait if links became ready to soon void CTRexExtendedDriverBase1G::wait_after_link_up(){ - //wait_x_sec(6 + CGlobalInfo::m_options.m_wait_before_traffic); + wait_x_sec(6 + CGlobalInfo::m_options.m_wait_before_traffic); } int CTRexExtendedDriverBase1G::wait_for_stable_link(){ diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index cb7d5149..107b7b4b 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -776,31 +776,31 @@ TrexRpcCmdSetRxFeature::parse_capture_msg(const Json::Value &msg, TrexStatelessP void TrexRpcCmdSetRxFeature::parse_queue_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result) { - bool enabled = parse_bool(msg, "enabled", result); + bool enabled = parse_bool(msg, "enabled", result); - if (enabled) { + if (enabled) { - uint64_t size = parse_uint32(msg, "size", result); + uint64_t size = parse_uint32(msg, "size", result); - if (size == 0) { - generate_parse_err(result, "queue size cannot be zero"); - } + if (size == 0) { + generate_parse_err(result, "queue size cannot be zero"); + } - try { - port->start_rx_queue(size); - } catch (const TrexException &ex) { - generate_execute_err(result, ex.what()); - } + try { + port->start_rx_queue(size); + } catch (const TrexException &ex) { + generate_execute_err(result, ex.what()); + } - } else { + } else { - try { - port->stop_rx_queue(); - } catch (const TrexException &ex) { - generate_execute_err(result, ex.what()); - } + try { + port->stop_rx_queue(); + } catch (const TrexException &ex) { + generate_execute_err(result, ex.what()); + } - } + } } diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index f2829b8a..74ab17f1 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -377,21 +377,21 @@ public: */ void stop_rx_capture(); - /** - * start RX queueing of packets - * - * @author imarom (11/7/2016) - * - * @param limit - */ - void start_rx_queue(uint64_t limit); - - /** - * stop RX queueing - * - * @author imarom (11/7/2016) - */ - void stop_rx_queue(); + /** + * start RX queueing of packets + * + * @author imarom (11/7/2016) + * + * @param limit + */ + void start_rx_queue(uint64_t limit); + + /** + * stop RX queueing + * + * @author imarom (11/7/2016) + */ + void stop_rx_queue(); /** * fetch the RX queue packets from the queue diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index 53d5a87e..17acb21e 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -243,6 +243,8 @@ TrexDpPortEventMsg::handle() { /************************* messages from CP to RX **********************/ bool TrexStatelessRxEnableLatency::handle (CRxCoreStateless *rx_core) { rx_core->enable_latency(); + m_reply.set_reply(true); + return true; } diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index 303548aa..79a6bf08 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -63,7 +63,7 @@ public: m_pending = false; } - T wait_for_reply(int timeout_ms = 100, int backoff_ms = 1) { + T wait_for_reply(int timeout_ms = 500, int backoff_ms = 1) { int guard = timeout_ms; while (is_pending()) { @@ -461,7 +461,14 @@ public: class TrexStatelessRxEnableLatency : public TrexStatelessCpToRxMsgBase { +public: + TrexStatelessRxEnableLatency(MsgReply<bool> &reply) : m_reply(reply) { + } + bool handle (CRxCoreStateless *rx_core); + +private: + MsgReply<bool> &m_reply; }; class TrexStatelessRxDisableLatency : public TrexStatelessCpToRxMsgBase { @@ -505,7 +512,7 @@ public: virtual bool handle(CRxCoreStateless *rx_core); private: - uint8_t m_port_id; + uint8_t m_port_id; }; @@ -515,8 +522,8 @@ public: uint64_t size, MsgReply<bool> &reply) : m_reply(reply) { - m_port_id = port_id; - m_size = size; + m_port_id = port_id; + m_size = size; } virtual bool handle(CRxCoreStateless *rx_core); @@ -537,7 +544,7 @@ public: virtual bool handle(CRxCoreStateless *rx_core); private: - uint8_t m_port_id; + uint8_t m_port_id; }; diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index f518fcd3..dc637163 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -121,6 +121,7 @@ bool CRxCoreStateless::periodic_check_for_cp_messages() { handle_cp_msg(msg); } + /* a message might result in a change of state */ recalculate_next_state(); return true; @@ -218,9 +219,7 @@ void CRxCoreStateless::start() { switch (m_state) { case STATE_IDLE: - set_working_msg_ack(false); idle_state_loop(); - set_working_msg_ack(true); break; case STATE_WORKING: @@ -311,23 +310,11 @@ void CRxCoreStateless::reset_rx_stats(uint8_t port_id) { int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max , bool reset, TrexPlatformApi::driver_stat_cap_e type) { - RXLatency &latency = m_rx_port_mngr[port_id].get_latency(); - - for (int hw_id = min; hw_id <= max; hw_id++) { - if (type == TrexPlatformApi::IF_STAT_PAYLOAD) { - rx_stats[hw_id - min] = latency.m_rx_pg_stat_payload[hw_id]; - } else { - rx_stats[hw_id - min] = latency.m_rx_pg_stat[hw_id]; - } - if (reset) { - if (type == TrexPlatformApi::IF_STAT_PAYLOAD) { - latency.m_rx_pg_stat_payload[hw_id].clear(); - } else { - latency.m_rx_pg_stat[hw_id].clear(); - } - } - } - return 0; + /* for now only latency stats */ + m_rx_port_mngr[port_id].get_latency_stats(rx_stats, min, max, reset, type); + + return (0); + } int CRxCoreStateless::get_rfc2544_info(rfc2544_info_t *rfc2544_info, int min, int max, bool reset) { @@ -354,13 +341,6 @@ int CRxCoreStateless::get_rx_err_cntrs(CRxCoreErrCntrs *rx_err) { return 0; } -void CRxCoreStateless::set_working_msg_ack(bool val) { - sanb_smp_memory_barrier(); - m_ack_start_work_msg = val; - sanb_smp_memory_barrier(); -} - - void CRxCoreStateless::update_cpu_util(){ m_cpu_cp_u.Update(); } @@ -373,21 +353,25 @@ double CRxCoreStateless::get_cpu_util() { void CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit) { m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit); + recalculate_next_state(); } void CRxCoreStateless::stop_recorder(uint8_t port_id) { m_rx_port_mngr[port_id].stop_recorder(); + recalculate_next_state(); } void CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size) { m_rx_port_mngr[port_id].start_queue(size); + recalculate_next_state(); } void CRxCoreStateless::stop_queue(uint8_t port_id) { m_rx_port_mngr[port_id].stop_queue(); + recalculate_next_state(); } void @@ -395,6 +379,8 @@ CRxCoreStateless::enable_latency() { for (int i = 0; i < m_max_ports; i++) { m_rx_port_mngr[i].enable_latency(); } + + recalculate_next_state(); } void @@ -402,6 +388,8 @@ CRxCoreStateless::disable_latency() { for (int i = 0; i < m_max_ports; i++) { m_rx_port_mngr[i].disable_latency(); } + + recalculate_next_state(); } const RXPortManager & diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index b27a7ca5..7481ae2f 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -29,7 +29,10 @@ class TrexStatelessCpToRxMsgBase; - +/** + * RFC 2544 implementation + * + */ class CRFC2544Info { public: void create(); @@ -88,7 +91,15 @@ class CRxCoreErrCntrs { uint64_t m_old_flow; }; +/** + * stateless RX core + * + */ class CRxCoreStateless { + + /** + * core states + */ enum state_e { STATE_IDLE, STATE_WORKING, @@ -106,8 +117,7 @@ class CRxCoreStateless { void quit() {m_state = STATE_QUIT;} - bool is_working() const {return (m_ack_start_work_msg == true);} - void set_working_msg_ack(bool val); + bool is_working() const {return (m_state == STATE_WORKING);} double get_cpu_util(); void update_cpu_util(); diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp index 00032e8b..bd8650a4 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp @@ -156,6 +156,31 @@ RXLatency::reset_stats() { } } + +void +RXLatency::get_stats(rx_per_flow_t *rx_stats, + int min, + int max, + bool reset, + TrexPlatformApi::driver_stat_cap_e type) { + + for (int hw_id = min; hw_id <= max; hw_id++) { + if (type == TrexPlatformApi::IF_STAT_PAYLOAD) { + rx_stats[hw_id - min] = m_rx_pg_stat_payload[hw_id]; + } else { + rx_stats[hw_id - min] = m_rx_pg_stat[hw_id]; + } + if (reset) { + if (type == TrexPlatformApi::IF_STAT_PAYLOAD) { + m_rx_pg_stat_payload[hw_id].clear(); + } else { + m_rx_pg_stat[hw_id].clear(); + } + } + } +} + + Json::Value RXLatency::to_json() const { return Json::objectValue; diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h index bc34b5aa..6af90f8b 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.h +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h @@ -43,12 +43,18 @@ public: void create(CRFC2544Info *rfc2544, CRxCoreErrCntrs *err_cntrs); - void reset_stats(); - void handle_pkt(const rte_mbuf_t *m); Json::Value to_json() const; + void get_stats(rx_per_flow_t *rx_stats, + int min, + int max, + bool reset, + TrexPlatformApi::driver_stat_cap_e type); + + void reset_stats(); + private: bool is_flow_stat_id(uint32_t id) { if ((id & 0x000fff00) == IP_ID_RESERVE_BASE) return true; @@ -276,6 +282,15 @@ public: m_latency.reset_stats(); } + void get_latency_stats(rx_per_flow_t *rx_stats, + int min, + int max, + bool reset, + TrexPlatformApi::driver_stat_cap_e type) { + + return m_latency.get_stats(rx_stats, min, max, reset, type); + } + RXLatency & get_latency() { return m_latency; } |