From 3c4a29e15f3663f6413fbee2562d7d0aa4e2f80d Mon Sep 17 00:00:00 2001 From: imarom Date: Wed, 1 Jun 2016 15:52:00 +0300 Subject: watchdog phase 2 --- src/bp_gtest.cpp | 2 +- src/bp_sim.cpp | 9 + src/bp_sim.h | 9 + src/latency.cpp | 21 ++- src/latency.h | 8 +- src/main.cpp | 7 +- src/main_dpdk.cpp | 40 ++++- src/rpc-server/trex_rpc_async_server.cpp | 2 +- src/rpc-server/trex_rpc_async_server.h | 2 +- src/rpc-server/trex_rpc_req_resp_server.cpp | 36 +++- src/rpc-server/trex_rpc_req_resp_server.h | 2 +- src/rpc-server/trex_rpc_server.cpp | 17 +- src/rpc-server/trex_rpc_server_api.h | 19 ++- src/sim/trex_sim_stateless.cpp | 2 +- src/stateless/cp/trex_stateless.cpp | 2 +- src/stateless/cp/trex_stateless.h | 3 +- src/stateless/dp/trex_stateless_dp_core.cpp | 1 + src/stateless/rx/trex_stateless_rx_core.cpp | 17 +- src/stateless/rx/trex_stateless_rx_core.h | 7 +- src/trex_watchdog.cpp | 248 ++++++++++++++++++++++++++-- src/trex_watchdog.h | 70 ++++++-- 21 files changed, 464 insertions(+), 60 deletions(-) (limited to 'src') diff --git a/src/bp_gtest.cpp b/src/bp_gtest.cpp index 86b7821b..b36ac6e1 100755 --- a/src/bp_gtest.cpp +++ b/src/bp_gtest.cpp @@ -897,7 +897,7 @@ TEST_F(basic, latency3) { EXPECT_EQ_UINT32(mg.is_active()?1:0, (uint32_t)0)<< "pass"; - mg.start(8); + mg.start(8, NULL); mg.stop(); mg.Dump(stdout); mg.DumpShort(stdout); diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp index 51023b90..c9171ae5 100755 --- a/src/bp_sim.cpp +++ b/src/bp_sim.cpp @@ -24,6 +24,8 @@ limitations under the License. #include "utl_json.h" #include "utl_yaml.h" #include "msg_manager.h" +#include "trex_watchdog.h" + #include #include @@ -3322,6 +3324,9 @@ bool CFlowGenListPerThread::Create(uint32_t thread_id, m_max_threads=max_threads; m_thread_id=thread_id; + m_watchdog = NULL; + m_watchdog_handle = -1; + m_cpu_cp_u.Create(&m_cpu_dp_u); uint32_t socket_id=rte_lcore_to_socket_id(m_core_id); @@ -3897,6 +3902,10 @@ void CNodeGenerator::handle_flow_pkt(CGenNode *node, CFlowGenListPerThread *thre } void CNodeGenerator::handle_flow_sync(CGenNode *node, CFlowGenListPerThread *thread, bool &exit_scheduler) { + + /* tickle the watchdog */ + thread->tickle(); + /* flow sync message is a sync point for time */ thread->m_cur_time_sec = node->m_time; diff --git a/src/bp_sim.h b/src/bp_sim.h index bb7dd928..d940080e 100755 --- a/src/bp_sim.h +++ b/src/bp_sim.h @@ -58,6 +58,7 @@ limitations under the License. #include #include "platform_cfg.h" #include "flow_stat.h" +#include "trex_watchdog.h" #include @@ -3637,6 +3638,12 @@ public: m_node_gen.m_v_if->flush_tx_queue(); } + void tickle() { + if (m_watchdog) { + m_watchdog->tickle(m_watchdog_handle); + } + } + /* return the dual port ID this thread is attached to in 4 ports configuration there are 2 dual-ports @@ -3759,6 +3766,8 @@ public: CTupleGeneratorSmart m_smart_gen; + TrexWatchDog *m_watchdog; + int m_watchdog_handle; public: CNodeGenerator m_node_gen; diff --git a/src/latency.cpp b/src/latency.cpp index a7652bed..fd2a5b5a 100644 --- a/src/latency.cpp +++ b/src/latency.cpp @@ -22,6 +22,8 @@ limitations under the License. #include "latency.h" #include "bp_sim.h" #include "utl_json.h" +#include "trex_watchdog.h" + #include const uint8_t sctp_pkt[]={ @@ -562,6 +564,10 @@ bool CLatencyManager::Create(CLatencyManagerCfg * cfg){ if ( CGlobalInfo::is_learn_mode() ){ m_nat_check_manager.Create(); } + + m_watchdog = NULL; + m_watchdog_handle = -1; + return (true); } @@ -711,7 +717,13 @@ void CLatencyManager::reset(){ } -void CLatencyManager::start(int iter) { +void CLatencyManager::tickle() { + if (m_watchdog) { + m_watchdog->tickle(m_watchdog_handle); + } +} + +void CLatencyManager::start(int iter, TrexWatchDog *watchdog) { m_do_stop =false; m_is_active =false; int cnt=0; @@ -728,6 +740,10 @@ void CLatencyManager::start(int iter) { m_p_queue.push(node); bool do_try_rx_queue =CGlobalInfo::m_options.preview.get_vm_one_queue_enable()?true:false; + if (watchdog) { + m_watchdog = watchdog; + m_watchdog_handle = watchdog->register_monitor("STF RX CORE", 1); + } while ( !m_p_queue.empty() ) { node = m_p_queue.top(); @@ -748,6 +764,9 @@ void CLatencyManager::start(int iter) { switch (node->m_type) { case CGenNode::FLOW_SYNC: + + tickle(); + if ( CGlobalInfo::is_learn_mode() ) { m_nat_check_manager.handle_aging(); } diff --git a/src/latency.h b/src/latency.h index eef7146a..2b74f737 100644 --- a/src/latency.h +++ b/src/latency.h @@ -28,6 +28,8 @@ limitations under the License. #define L_PKT_SUBMODE_REPLY 2 #define L_PKT_SUBMODE_0_SEQ 3 +class TrexWatchDog; + class CLatencyPktInfo { public: void Create(class CLatencyPktMode *m_l_pkt_info); @@ -337,7 +339,7 @@ public: bool Create(CLatencyManagerCfg * cfg); void Delete(); void reset(); - void start(int iter); + void start(int iter, TrexWatchDog *watchdog); void stop(); bool is_active(); void set_ip(uint32_t client_ip, @@ -374,6 +376,7 @@ public: CLatencyPktMode *c_l_pkt_mode; private: + void tickle(); void send_pkt_all_ports(); void try_rx(); void try_rx_queues(); @@ -400,6 +403,9 @@ private: CNatRxManager m_nat_check_manager; CCpuUtlDp m_cpu_dp_u; CCpuUtlCp m_cpu_cp_u; + TrexWatchDog *m_watchdog; + int m_watchdog_handle; + volatile bool m_do_stop __rte_cache_aligned ; }; diff --git a/src/main.cpp b/src/main.cpp index 701a65d2..62eee880 100755 --- a/src/main.cpp +++ b/src/main.cpp @@ -83,6 +83,11 @@ static CSimpleOpt::SOption parser_options[] = }; static TrexStateless *m_sim_statelss_obj; +static char *g_exe_name; + +const char *get_exe_name() { + return g_exe_name; +} static int usage(){ @@ -261,8 +266,8 @@ void set_stateless_obj(TrexStateless *obj) { m_sim_statelss_obj = obj; } - int main(int argc , char * argv[]){ + g_exe_name = argv[0]; std::unordered_map params; diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index bd3cac64..0de646b7 100644 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -73,6 +73,7 @@ extern "C" { #include "debug.h" #include "internal_api/trex_platform_api.h" #include "main_dpdk.h" +#include "trex_watchdog.h" #define RX_CHECK_MIX_SAMPLE_RATE 8 #define RX_CHECK_MIX_SAMPLE_RATE_1G 2 @@ -2845,6 +2846,7 @@ private: std::mutex m_cp_lock; public: + TrexWatchDog m_watchdog; TrexStateless *m_trex_stateless; }; @@ -3272,14 +3274,16 @@ bool CGlobalTRex::Create(){ TrexStatelessCfg cfg; - TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, global_platform_cfg_info.m_zmq_rpc_port); + TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_TCP, + global_platform_cfg_info.m_zmq_rpc_port, + &m_cp_lock, + &m_watchdog); cfg.m_port_count = CGlobalInfo::m_options.m_expected_portd; cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg; cfg.m_rpc_server_verbose = false; cfg.m_platform_api = new TrexDpdkPlatformApi(); cfg.m_publisher = &m_zmq_publisher; - cfg.m_global_lock = &m_cp_lock; m_trex_stateless = new TrexStateless(cfg); } @@ -3975,6 +3979,9 @@ int CGlobalTRex::run_in_master() { const int FASTPATH_DELAY_MS = 10; const int SLOWPATH_DELAY_MS = 500; + int handle = m_watchdog.register_monitor("master", 2); + m_watchdog.start(); + while ( true ) { /* fast path */ @@ -3995,6 +4002,8 @@ int CGlobalTRex::run_in_master() { delay(FASTPATH_DELAY_MS); slow_path_counter += FASTPATH_DELAY_MS; cp_lock.lock(); + + m_watchdog.tickle(handle); } /* on exit release the lock */ @@ -4006,6 +4015,9 @@ int CGlobalTRex::run_in_master() { } m_mg.stop(); + + m_watchdog.stop(); + delay(1000); if ( was_stopped ){ /* we should stop latency and exit to stop agents */ @@ -4017,14 +4029,15 @@ int CGlobalTRex::run_in_master() { int CGlobalTRex::run_in_rx_core(void){ + if (get_is_stateless()) { m_sl_rx_running = true; - m_rx_sl.start(); + m_rx_sl.start(m_watchdog); m_sl_rx_running = false; } else { if ( CGlobalInfo::m_options.is_rx_enabled() ){ m_sl_rx_running = false; - m_mg.start(0); + m_mg.start(0, &m_watchdog); } } @@ -4032,6 +4045,8 @@ int CGlobalTRex::run_in_rx_core(void){ } int CGlobalTRex::run_in_core(virtual_thread_id_t virt_core_id){ + std::stringstream ss; + ss << "DP core " << int(virt_core_id); CPreviewMode *lp=&CGlobalInfo::m_options.preview; if ( lp->getSingleCore() && @@ -4045,8 +4060,14 @@ int CGlobalTRex::run_in_core(virtual_thread_id_t virt_core_id){ assert(m_fl_was_init); CFlowGenListPerThread * lpt; + lpt = m_fl.m_threads_info[virt_core_id-1]; + /* register a watchdog handle on current core */ + lpt->m_watchdog = &m_watchdog; + lpt->m_watchdog_handle = m_watchdog.register_monitor(ss.str(), 1); + + if (get_is_stateless()) { lpt->start_stateless_daemon(*lp); }else{ @@ -4422,9 +4443,14 @@ uint32_t get_cores_mask(uint32_t cores,int offset){ } +static char *g_exe_name; +const char *get_exe_name() { + return g_exe_name; +} int main(int argc , char * argv[]){ + g_exe_name = argv[0]; return ( main_test(argc , argv)); } @@ -4733,13 +4759,17 @@ int main_test(int argc , char * argv[]){ uint32_t pkts = CGlobalInfo::m_options.m_latency_prev * CGlobalInfo::m_options.m_latency_rate; printf("Starting pre latency check for %d sec\n",CGlobalInfo::m_options.m_latency_prev); - g_trex.m_mg.start(pkts); + g_trex.m_mg.start(pkts, NULL); delay(CGlobalInfo::m_options.m_latency_prev* 1000); printf("Finished \n"); g_trex.m_mg.reset(); g_trex.reset_counters(); } + /* this will give us all cores - master + tx + latency */ + g_trex.m_watchdog.mark_pending_monitor(g_trex.m_max_cores); + + g_trex.m_sl_rx_running = false; if ( get_is_stateless() ) { g_trex.start_master_stateless(); diff --git a/src/rpc-server/trex_rpc_async_server.cpp b/src/rpc-server/trex_rpc_async_server.cpp index aee92539..82c42458 100644 --- a/src/rpc-server/trex_rpc_async_server.cpp +++ b/src/rpc-server/trex_rpc_async_server.cpp @@ -36,7 +36,7 @@ limitations under the License. * ZMQ based publisher server * */ -TrexRpcServerAsync::TrexRpcServerAsync(const TrexRpcServerConfig &cfg, std::mutex *lock) : TrexRpcServerInterface(cfg, "publisher", lock) { +TrexRpcServerAsync::TrexRpcServerAsync(const TrexRpcServerConfig &cfg) : TrexRpcServerInterface(cfg, "publisher") { /* ZMQ is not thread safe - this should be outside */ m_context = zmq_ctx_new(); } diff --git a/src/rpc-server/trex_rpc_async_server.h b/src/rpc-server/trex_rpc_async_server.h index 80d92c2f..daefa174 100644 --- a/src/rpc-server/trex_rpc_async_server.h +++ b/src/rpc-server/trex_rpc_async_server.h @@ -33,7 +33,7 @@ limitations under the License. class TrexRpcServerAsync : public TrexRpcServerInterface { public: - TrexRpcServerAsync(const TrexRpcServerConfig &cfg, std::mutex *lock = NULL); + TrexRpcServerAsync(const TrexRpcServerConfig &cfg); protected: void _prepare(); diff --git a/src/rpc-server/trex_rpc_req_resp_server.cpp b/src/rpc-server/trex_rpc_req_resp_server.cpp index 5c587e0f..d36753d4 100644 --- a/src/rpc-server/trex_rpc_req_resp_server.cpp +++ b/src/rpc-server/trex_rpc_req_resp_server.cpp @@ -32,11 +32,13 @@ limitations under the License. #include #include +#include "trex_watchdog.h" + /** * ZMQ based request-response server * */ -TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock) : TrexRpcServerInterface(cfg, "req resp", lock) { +TrexRpcServerReqRes::TrexRpcServerReqRes(const TrexRpcServerConfig &cfg) : TrexRpcServerInterface(cfg, "ZMQ sync request-response") { } @@ -52,11 +54,19 @@ void TrexRpcServerReqRes::_prepare() { */ void TrexRpcServerReqRes::_rpc_thread_cb() { std::stringstream ss; + int zmq_rc; + + m_watchdog_handle = m_watchdog->register_monitor(m_name, 1); /* create a socket based on the configuration */ m_socket = zmq_socket (m_context, ZMQ_REP); + /* to make sure the watchdog gets tickles form time to time we give a timeout of 500ms */ + int timeout = 500; + zmq_rc = zmq_setsockopt (m_socket, ZMQ_RCVTIMEO, &timeout, sizeof(int)); + assert(zmq_rc == 0); + switch (m_cfg.get_protocol()) { case TrexRpcServerConfig::RPC_PROT_TCP: ss << "tcp://*:"; @@ -68,8 +78,8 @@ void TrexRpcServerReqRes::_rpc_thread_cb() { ss << m_cfg.get_port(); /* bind the scoket */ - int rc = zmq_bind (m_socket, ss.str().c_str()); - if (rc != 0) { + zmq_rc = zmq_bind (m_socket, ss.str().c_str()); + if (zmq_rc != 0) { throw TrexRpcException("Unable to start ZMQ server at: " + ss.str()); } @@ -101,10 +111,22 @@ TrexRpcServerReqRes::fetch_one_request(std::string &msg) { rc = zmq_msg_init(&zmq_msg); assert(rc == 0); - rc = zmq_msg_recv (&zmq_msg, m_socket, 0); + while (true) { + m_watchdog->tickle(m_watchdog_handle); + + rc = zmq_msg_recv (&zmq_msg, m_socket, 0); + if (rc != -1) { + break; + } + + /* timeout ? */ + if (errno == EAGAIN) { + continue; + } - if (rc == -1) { + /* error ! */ zmq_msg_close(&zmq_msg); + /* normal shutdown and zmq_term was called */ if (errno == ETERM) { return false; @@ -113,7 +135,9 @@ TrexRpcServerReqRes::fetch_one_request(std::string &msg) { } } - const char *data = (const char *)zmq_msg_data(&zmq_msg); + + + const char *data = (const char *)zmq_msg_data(&zmq_msg); size_t len = zmq_msg_size(&zmq_msg); msg.append(data, len); diff --git a/src/rpc-server/trex_rpc_req_resp_server.h b/src/rpc-server/trex_rpc_req_resp_server.h index 26b3248f..92d51a2a 100644 --- a/src/rpc-server/trex_rpc_req_resp_server.h +++ b/src/rpc-server/trex_rpc_req_resp_server.h @@ -32,7 +32,7 @@ limitations under the License. class TrexRpcServerReqRes : public TrexRpcServerInterface { public: - TrexRpcServerReqRes(const TrexRpcServerConfig &cfg, std::mutex *lock = NULL); + TrexRpcServerReqRes(const TrexRpcServerConfig &cfg); /* for test purposes - bypass the ZMQ and inject a message */ std::string test_inject_request(const std::string &req); diff --git a/src/rpc-server/trex_rpc_server.cpp b/src/rpc-server/trex_rpc_server.cpp index 7d2e31a5..e4ca95c3 100644 --- a/src/rpc-server/trex_rpc_server.cpp +++ b/src/rpc-server/trex_rpc_server.cpp @@ -28,11 +28,20 @@ limitations under the License. #include #include +#include "trex_watchdog.h" + /************** RPC server interface ***************/ -TrexRpcServerInterface::TrexRpcServerInterface(const TrexRpcServerConfig &cfg, const std::string &name, std::mutex *lock) : m_cfg(cfg), m_name(name), m_lock(lock) { +TrexRpcServerInterface::TrexRpcServerInterface(const TrexRpcServerConfig &cfg, const std::string &name) : m_cfg(cfg) { + m_name = name; + + m_lock = cfg.m_lock; + m_watchdog = cfg.m_watchdog; + m_watchdog_handle = -1; + m_is_running = false; m_is_verbose = false; + if (m_lock == NULL) { m_lock = &m_dummy_lock; } @@ -69,6 +78,7 @@ void TrexRpcServerInterface::start() { /* prepare for run */ _prepare(); + m_watchdog->mark_pending_monitor(); m_thread = new std::thread(&TrexRpcServerInterface::_rpc_thread_cb, this); if (!m_thread) { throw TrexRpcException("unable to create RPC thread"); @@ -119,8 +129,7 @@ get_current_date_time() { const std::string TrexRpcServer::s_server_uptime = get_current_date_time(); -TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg, - std::mutex *lock) { +TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg) { m_req_resp = NULL; @@ -130,7 +139,7 @@ TrexRpcServer::TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg, if (req_resp_cfg->get_protocol() == TrexRpcServerConfig::RPC_PROT_MOCK) { m_req_resp = new TrexRpcServerReqResMock(*req_resp_cfg); } else { - m_req_resp = new TrexRpcServerReqRes(*req_resp_cfg, lock); + m_req_resp = new TrexRpcServerReqRes(*req_resp_cfg); } m_servers.push_back(m_req_resp); diff --git a/src/rpc-server/trex_rpc_server_api.h b/src/rpc-server/trex_rpc_server_api.h index a02b2cc0..3d9837ef 100644 --- a/src/rpc-server/trex_rpc_server_api.h +++ b/src/rpc-server/trex_rpc_server_api.h @@ -33,6 +33,7 @@ limitations under the License. class TrexRpcServerInterface; class TrexRpcServerReqRes; +class TrexWatchDog; /** * defines a configuration of generic RPC server @@ -47,8 +48,11 @@ public: RPC_PROT_MOCK }; - TrexRpcServerConfig(rpc_prot_e protocol, uint16_t port) : m_protocol(protocol), m_port(port) { - + TrexRpcServerConfig(rpc_prot_e protocol, uint16_t port, std::mutex *lock, TrexWatchDog *watchdog) { + m_protocol = protocol; + m_port = port; + m_lock = lock; + m_watchdog = watchdog; } uint16_t get_port() const { @@ -62,6 +66,10 @@ public: private: rpc_prot_e m_protocol; uint16_t m_port; + +public: + std::mutex *m_lock; + TrexWatchDog *m_watchdog; }; /** @@ -72,7 +80,7 @@ private: class TrexRpcServerInterface { public: - TrexRpcServerInterface(const TrexRpcServerConfig &cfg, const std::string &name, std::mutex *m_lock = NULL); + TrexRpcServerInterface(const TrexRpcServerConfig &cfg, const std::string &name); virtual ~TrexRpcServerInterface(); /** @@ -134,6 +142,8 @@ protected: std::string m_name; std::mutex *m_lock; std::mutex m_dummy_lock; + TrexWatchDog *m_watchdog; + int m_watchdog_handle; }; /** @@ -147,8 +157,7 @@ class TrexRpcServer { public: /* creates the collection of servers using configurations */ - TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg, - std::mutex *m_lock = NULL); + TrexRpcServer(const TrexRpcServerConfig *req_resp_cfg); ~TrexRpcServer(); diff --git a/src/sim/trex_sim_stateless.cpp b/src/sim/trex_sim_stateless.cpp index acbeef69..d3981e97 100644 --- a/src/sim/trex_sim_stateless.cpp +++ b/src/sim/trex_sim_stateless.cpp @@ -200,7 +200,7 @@ SimStateless::prepare_control_plane() { m_publisher = new SimPublisher(); - TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_MOCK, 0); + TrexRpcServerConfig rpc_req_resp_cfg(TrexRpcServerConfig::RPC_PROT_MOCK, 0, NULL, NULL); cfg.m_port_count = m_port_count; cfg.m_rpc_req_resp_cfg = &rpc_req_resp_cfg; diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index 5bbe9faf..698ede90 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -40,7 +40,7 @@ TrexStateless::TrexStateless(const TrexStatelessCfg &cfg) { /* create RPC servers */ /* set both servers to mutex each other */ - m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg, cfg.m_global_lock); + m_rpc_server = new TrexRpcServer(cfg.m_rpc_req_resp_cfg); m_rpc_server->set_verbose(cfg.m_rpc_server_verbose); /* configure ports */ diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h index 033326ca..83ab6976 100644 --- a/src/stateless/cp/trex_stateless.h +++ b/src/stateless/cp/trex_stateless.h @@ -41,6 +41,7 @@ limitations under the License. #include "trex_api_class.h" class TrexStatelessPort; +class TrexWatchDog; /** * unified stats @@ -87,7 +88,6 @@ public: m_rpc_server_verbose = false; m_platform_api = NULL; m_publisher = NULL; - m_global_lock = NULL; } const TrexRpcServerConfig *m_rpc_req_resp_cfg; @@ -95,7 +95,6 @@ public: bool m_rpc_server_verbose; uint8_t m_port_count; TrexPublisher *m_publisher; - std::mutex *m_global_lock; }; diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index 21334363..fe78c5b2 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -656,6 +656,7 @@ TrexStatelessDpCore::idle_state_loop() { int counter = 0; while (m_state == STATE_IDLE) { + m_core->tickle(); m_core->m_node_gen.m_v_if->flush_dp_rx_queue(); bool had_msg = periodic_check_for_cp_messages(); if (had_msg) { diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index f9150ff7..95fcc1b0 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -76,6 +76,9 @@ void CRxCoreStateless::create(const CRxSlCfg &cfg) { m_ring_to_cp = cp_rx->getRingDpToCp(0); m_state = STATE_IDLE; + m_watchdog_handle = -1; + m_watchdog = NULL; + for (int i = 0; i < m_max_ports; i++) { CLatencyManagerPerPortStl * lp = &m_ports[i]; lp->m_io = cfg.m_ports[i]; @@ -93,7 +96,15 @@ void CRxCoreStateless::handle_cp_msg(TrexStatelessCpToRxMsgBase *msg) { delete msg; } +void CRxCoreStateless::tickle() { + m_watchdog->tickle(m_watchdog_handle); +} + bool CRxCoreStateless::periodic_check_for_cp_messages() { + + /* tickle the watchdog */ + tickle(); + /* fast path */ if ( likely ( m_ring_from_cp->isEmpty() ) ) { return false; @@ -140,11 +151,15 @@ void CRxCoreStateless::idle_state_loop() { } } -void CRxCoreStateless::start() { +void CRxCoreStateless::start(TrexWatchDog &watchdog) { int count = 0; int i = 0; bool do_try_rx_queue =CGlobalInfo::m_options.preview.get_vm_one_queue_enable() ? true : false; + /* register a watchdog handle on current core */ + m_watchdog = &watchdog; + m_watchdog_handle = watchdog.register_monitor("STL RX CORE", 1); + while (true) { if (m_state == STATE_WORKING) { i++; diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index d18356b6..1528a7e1 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -147,7 +147,7 @@ class CRxCoreStateless { }; public: - void start(); + void start(TrexWatchDog &watchdog); void create(const CRxSlCfg &cfg); void reset_rx_stats(uint8_t port_id); int get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset @@ -165,6 +165,7 @@ class CRxCoreStateless { private: void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg); bool periodic_check_for_cp_messages(); + void tickle(); void idle_state_loop(); void handle_rx_pkt(CLatencyManagerPerPortStl * lp, rte_mbuf_t * m); void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r); @@ -176,6 +177,10 @@ class CRxCoreStateless { uint16_t get_hw_id(uint16_t id); private: + + TrexWatchDog *m_watchdog; + int m_watchdog_handle; + uint32_t m_max_ports; bool m_has_streams; CLatencyManagerPerPortStl m_ports[TREX_MAX_PORTS]; diff --git a/src/trex_watchdog.cpp b/src/trex_watchdog.cpp index b3a0733c..d38809fc 100644 --- a/src/trex_watchdog.cpp +++ b/src/trex_watchdog.cpp @@ -26,21 +26,187 @@ limitations under the License. #include #include -int WatchDog::register_monitor(const std::string &name, double timeout_sec) { +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static TrexWatchDog::monitor_st *global_monitor; + +const char *get_exe_name(); + +std::string exec(const char* cmd) { + char buffer[128]; + std::string result = ""; + std::shared_ptr pipe(popen(cmd, "r"), pclose); + if (!pipe) throw std::runtime_error("popen() failed!"); + while (!feof(pipe.get())) { + if (fgets(buffer, 128, pipe.get()) != NULL) { + result += buffer; + } + } + return result; +} + +// This function produces a stack backtrace with demangled function & method names. +__attribute__((noinline)) +std::string Backtrace(int skip = 1) +{ + void *callstack[128]; + const int nMaxFrames = sizeof(callstack) / sizeof(callstack[0]); + char buf[1024]; + int nFrames = backtrace(callstack, nMaxFrames); + char **symbols = backtrace_symbols(callstack, nFrames); + + std::ostringstream trace_buf; + for (int i = skip; i < nFrames; i++) { + + Dl_info info; + if (dladdr(callstack[i], &info) && info.dli_sname) { + char *demangled = NULL; + int status = -1; + if (info.dli_sname[0] == '_') + demangled = abi::__cxa_demangle(info.dli_sname, NULL, 0, &status); + snprintf(buf, sizeof(buf), "%-3d %*p %s + %zd\n", + i, int(2 + sizeof(void*) * 2), callstack[i], + status == 0 ? demangled : + info.dli_sname == 0 ? symbols[i] : info.dli_sname, + (char *)callstack[i] - (char *)info.dli_saddr); + free(demangled); + } else { + snprintf(buf, sizeof(buf), "%-3d %*p %s\n", + i, int(2 + sizeof(void*) * 2), callstack[i], symbols[i]); + } + trace_buf << buf; + } + free(symbols); + if (nFrames == nMaxFrames) + trace_buf << "[truncated]\n"; + + /* add the addr2line info */ + std::stringstream addr2line; + + addr2line << "/usr/bin/addr2line -e " << get_exe_name() << " "; + for (int i = skip; i < nFrames; i++) { + addr2line << callstack[i] << " "; + } + + trace_buf << "\n\n*** addr2line information follows ***\n\n"; + try { + trace_buf << exec(addr2line.str().c_str()); + } catch (std::runtime_error &e) { + trace_buf << "\n" << e.what(); + } + + return trace_buf.str(); +} + +__attribute__((noinline)) +static void _callstack_signal_handler(int signr, siginfo_t *info, void *secret) { + std::stringstream ss; + + double now = now_sec(); + + ss << "WATCHDOG: task '" << global_monitor->name << "' has not responded for more than " << (now - global_monitor->ts) << " seconds - timeout is " << global_monitor->timeout_sec << " seconds"; + + std::string backtrace = Backtrace(); + ss << "\n\n*** traceback follows ***\n\n" << backtrace << "\n"; + + throw std::runtime_error(ss.str()); +} + +void TrexWatchDog::mark_pending_monitor(int count) { + std::unique_lock lock(m_lock); + m_pending += count; + lock.unlock(); +} + +void TrexWatchDog::block_on_pending(int max_block_time_ms) { + + int timeout_msec = max_block_time_ms; + + std::unique_lock lock(m_lock); + + while (m_pending > 0) { + + lock.unlock(); + delay(1); + lock.lock(); + + timeout_msec -= 1; + if (timeout_msec == 0) { + throw TrexException("WATCHDOG: block on pending monitors timed out"); + } + } + + /* lock will be released */ +} + +/** + * register a monitor + * must be called from the relevant thread + * + * this function is thread safe + * + * @author imarom (01-Jun-16) + * + * @param name + * @param timeout_sec + * + * @return int + */ +int TrexWatchDog::register_monitor(const std::string &name, double timeout_sec) { monitor_st monitor; + /* cannot add monitors while active */ + assert(m_active == false); + + monitor.tid = pthread_self(); monitor.name = name; monitor.timeout_sec = timeout_sec; monitor.tickled = true; monitor.ts = 0; + /* critical section start */ + std::unique_lock lock(m_lock); + + /* make sure no double register */ + for (auto &m : m_monitors) { + if (m.tid == pthread_self()) { + std::stringstream ss; + ss << "WATCHDOG: double register detected\n\n" << Backtrace(); + throw TrexException(ss.str()); + } + } + monitor.handle = m_monitors.size(); m_monitors.push_back(monitor); + assert(m_pending > 0); + m_pending--; + + /* critical section end */ + lock.unlock(); + return monitor.handle; } -void WatchDog::tickle(int handle) { +/** + * thread safe function + * + */ +void TrexWatchDog::tickle(int handle) { + + /* ignore ticks if not active */ + if (!m_active) { + return; + } + assert(handle < m_monitors.size()); /* not nesscary but write gets cache invalidate for nothing */ @@ -51,24 +217,74 @@ void WatchDog::tickle(int handle) { m_monitors[handle].tickled = true; } -void WatchDog::start() { +void TrexWatchDog::register_signal() { + + /* do this once */ + if (g_signal_init) { + return; + } + + /* register a handler on SIG ALARM */ + struct sigaction sa; + memset (&sa, '\0', sizeof(sa)); + + sa.sa_flags = SA_SIGINFO; + sa.sa_sigaction = _callstack_signal_handler; + + int rc = sigaction(SIGALRM , &sa, NULL); + assert(rc == 0); + + g_signal_init = true; +} + +void TrexWatchDog::start() { + + block_on_pending(); + + /* no pending monitors */ + assert(m_pending == 0); + + /* under GDB - disable the watchdog */ + if (ptrace(PTRACE_TRACEME, 0, NULL, 0) == -1) { + printf("\n\n*** GDB detected - disabling watchdog... ***\n\n"); + return; + } + + register_signal(); + m_active = true; - m_thread = new std::thread(&WatchDog::_main, this); + m_thread = new std::thread(&TrexWatchDog::_main, this); if (!m_thread) { throw TrexException("unable to create watchdog thread"); } } -void WatchDog::stop() { - m_thread->join(); - delete m_thread; +void TrexWatchDog::stop() { + m_active = false; + + if (m_thread) { + m_thread->join(); + delete m_thread; + m_thread = NULL; + } + + m_monitors.clear(); } + + /** * main loop * */ -void WatchDog::_main() { +void TrexWatchDog::_main() { + + /* reset all the monitors */ + for (auto &monitor : m_monitors) { + monitor.tickled = true; + } + + /* start main loop */ while (m_active) { dsec_t now = now_sec(); @@ -83,15 +299,21 @@ void WatchDog::_main() { /* the bit is off - check the time first */ if ( (now - monitor.ts) > monitor.timeout_sec ) { - std::stringstream ss; - ss << "WATCHDOG: task '" << monitor.name << "' has not responded for more than " << (now - monitor.ts) << " seconds - timeout is " << monitor.timeout_sec << " seconds"; - throw TrexException(ss.str()); - assert(0); + global_monitor = &monitor; + + pthread_kill(monitor.tid, SIGALRM); + + /* nothing to do more... the other thread will terminate, but if not - we terminate */ + sleep(5); + printf("\n\n*** WATCHDOG violation detected on task '%s' which have failed to response to the signal ***\n\n", monitor.name.c_str()); + exit(1); } } - sleep(1); + /* the internal clock - 250 ms */ + delay(250); } } +bool TrexWatchDog::g_signal_init = false; diff --git a/src/trex_watchdog.h b/src/trex_watchdog.h index ecc90960..53a48217 100644 --- a/src/trex_watchdog.h +++ b/src/trex_watchdog.h @@ -25,16 +25,40 @@ limitations under the License. #include #include #include +#include +//#include "rte_memory.h" +#include "mbuf.h" #include "os_time.h" -class WatchDog { +class TrexWatchDog { public: - WatchDog() { - m_thread = NULL; - m_active = false; + TrexWatchDog() { + m_thread = NULL; + m_active = false; + m_pending = 0; + + register_signal(); } + /** + * registering a monitor happens from another thread + * this make sure that start will be able to block until + * all threads has registered + * + * @author imarom (01-Jun-16) + */ + void mark_pending_monitor(int count = 1); + + + /** + * blocks while monitors are pending registeration + * + * @author imarom (01-Jun-16) + */ + void block_on_pending(int max_block_time_ms = 200); + + /** * add a monitor to the watchdog * this thread will be monitored and if timeout @@ -49,6 +73,7 @@ public: */ int register_monitor(const std::string &name, double timeout_sec); + /** * should be called by each thread on it's handle * @@ -58,34 +83,51 @@ public: */ void tickle(int handle); + /** * start the watchdog * */ void start(); + /** * stop the watchdog * */ void stop(); -private: - void _main(); + /* should be cache aligned to avoid false sharing */ struct monitor_st { - int handle; - std::string name; - double timeout_sec; - bool tickled; - dsec_t ts; + /* write fields are first */ + volatile bool tickled; + dsec_t ts; + + int handle; + double timeout_sec; + pthread_t tid; + std::string name; + + /* for for a full cacheline */ + uint8_t pad[16]; }; - std::vector m_monitors; +private: + void register_signal(); + void _main(); + + std::vector m_monitors __rte_cache_aligned; + std::mutex m_lock; - volatile bool m_active; - std::thread *m_thread; + volatile bool m_active; + std::thread *m_thread; + volatile int m_pending; + + static bool g_signal_init; }; +static_assert(sizeof(TrexWatchDog::monitor_st) >= RTE_CACHE_LINE_SIZE, "sizeof(monitor_st) != RTE_CACHE_LINE_SIZE" ); + #endif /* __TREX_WATCHDOG_H__ */ -- cgit 1.2.3-korg