diff options
-rw-r--r-- | scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py | 18 | ||||
-rw-r--r-- | scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py | 10 | ||||
-rw-r--r-- | scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py | 12 | ||||
-rw-r--r-- | src/flow_stat.cpp | 12 | ||||
-rw-r--r-- | src/flow_stat.h | 2 | ||||
-rw-r--r-- | src/internal_api/trex_platform_api.h | 6 | ||||
-rw-r--r-- | src/main_dpdk.cpp | 20 | ||||
-rw-r--r-- | src/rpc-server/commands/trex_rpc_cmd_general.cpp | 5 | ||||
-rw-r--r-- | src/rpc-server/commands/trex_rpc_cmds.h | 2 |
9 files changed, 45 insertions, 42 deletions
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py index 82891b68..ae6cb497 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py @@ -178,7 +178,8 @@ class CTRexAsyncClient(): self.connected = True - rc = self.barrier() + # sync all stats data as a baseline from the server + rc = self.barrier(baseline = True) if not rc: self.disconnect() return rc @@ -245,11 +246,11 @@ class CTRexAsyncClient(): name = msg['name'] data = msg['data'] type = msg['type'] - sync = msg.get('sync', False) + baseline = msg.get('baseline', False) self.raw_snapshot[name] = data - self.__dispatch(name, type, data, sync) + self.__dispatch(name, type, data, baseline) # closing of socket must be from the same thread @@ -270,10 +271,11 @@ class CTRexAsyncClient(): return self.raw_snapshot # dispatch the message to the right place - def __dispatch (self, name, type, data, sync): + def __dispatch (self, name, type, data, baseline): + # stats if name == "trex-global": - self.event_handler.handle_async_stats_update(data, sync) + self.event_handler.handle_async_stats_update(data, baseline) # events elif name == "trex-event": @@ -284,7 +286,7 @@ class CTRexAsyncClient(): self.handle_async_barrier(type, data) elif name == "flow_stats": - self.event_handler.handle_async_rx_stats_event(data, sync) + self.event_handler.handle_async_rx_stats_event(data, baseline) else: pass @@ -297,7 +299,7 @@ class CTRexAsyncClient(): # block on barrier for async channel - def barrier(self, timeout = 5): + def barrier(self, timeout = 5, baseline = False): # set a random key key = random.getrandbits(32) @@ -309,7 +311,7 @@ class CTRexAsyncClient(): while not self.async_barrier['ack']: # inject - rc = self.stateless_client._transmit("publish_now", params = {'key' : key}) + rc = self.stateless_client._transmit("publish_now", params = {'key' : key, 'baseline': baseline}) if not rc: return rc diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index 1905d44f..5ecb1120 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -155,12 +155,12 @@ class AsyncEventHandler(object): pass - def handle_async_rx_stats_event (self, data, sync): - self.client.flow_stats.update(data, sync) + def handle_async_rx_stats_event (self, data, baseline): + self.client.flow_stats.update(data, baseline) # handles an async stats update from the subscriber - def handle_async_stats_update(self, dump_data, sync): + def handle_async_stats_update(self, dump_data, baseline): global_stats = {} port_stats = {} @@ -182,11 +182,11 @@ class AsyncEventHandler(object): global_stats[key] = value # update the general object with the snapshot - self.client.global_stats.update(global_stats, sync) + self.client.global_stats.update(global_stats, baseline) # update all ports for port_id, data in port_stats.iteritems(): - self.client.ports[port_id].port_stats.update(data, sync) + self.client.ports[port_id].port_stats.update(data, baseline) # dispatcher for server async events (port started, port stopped and etc.) diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py index 60c8229d..cf394445 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py @@ -389,7 +389,7 @@ class CTRexStats(object): self.last_update_ts = time.time() self.history = deque(maxlen = 10) self.lock = threading.Lock() - self.is_synced = False + self.has_baseline = False ######## abstract methods ########## @@ -402,15 +402,15 @@ class CTRexStats(object): raise NotImplementedError() # called when a snapshot arrives - add more fields - def _update (self, snapshot, sync): + def _update (self, snapshot, baseline): raise NotImplementedError() ######## END abstract methods ########## - def update(self, snapshot, sync): + def update(self, snapshot, baseline): - if not self.is_synced and not sync: + if not self.has_baseline and not baseline: return rc = self._update(snapshot) @@ -418,9 +418,9 @@ class CTRexStats(object): return # sync one time - if not self.is_synced and sync: + if not self.has_baseline and baseline: self.reference_stats = copy.deepcopy(self.latest_stats) - self.is_synced = True + self.has_baseline = True # save history with self.lock: diff --git a/src/flow_stat.cpp b/src/flow_stat.cpp index b2714b1e..de081ffe 100644 --- a/src/flow_stat.cpp +++ b/src/flow_stat.cpp @@ -622,7 +622,7 @@ int CFlowStatRuleMgr::get_active_pgids(flow_stat_active_t &result) { } // return false if no counters changed since last run. true otherwise -bool CFlowStatRuleMgr::dump_json(std::string & json, bool force_sync) { +bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) { uint64_t rx_stats[MAX_FLOW_STATS]; tx_per_flow_t tx_stats[MAX_FLOW_STATS]; Json::FastWriter writer; @@ -631,8 +631,8 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool force_sync) { root["name"] = "flow_stats"; root["type"] = 0; - if (force_sync) { - root["sync"] = true; + if (baseline) { + root["baseline"] = true; } Json::Value &data_section = root["data"]; @@ -640,7 +640,7 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool force_sync) { data_section["ts"]["freq"] = Json::Value::UInt64(os_get_hr_freq()); if (m_user_id_map.is_empty()) { - if (force_sync) { + if (baseline) { json = writer.write(root); return true; } else @@ -692,11 +692,11 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool force_sync) { } for (uint8_t port = 0; port < m_num_ports; port++) { std::string str_port = static_cast<std::ostringstream*>( &(std::ostringstream() << int(port) ) )->str(); - if (user_id_info->need_to_send_rx(port) || force_sync) { + if (user_id_info->need_to_send_rx(port) || baseline) { user_id_info->set_no_need_to_send_rx(port); data_section[str_user_id]["rx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_rx_counter(port)); } - if (user_id_info->need_to_send_tx(port) || force_sync) { + if (user_id_info->need_to_send_tx(port) || baseline) { user_id_info->set_no_need_to_send_tx(port); data_section[str_user_id]["tx_pkts"][str_port] = Json::Value::UInt64(user_id_info->get_tx_counter(port).get_pkts()); data_section[str_user_id]["tx_bytes"][str_port] = Json::Value::UInt64(user_id_info->get_tx_counter(port).get_bytes()); diff --git a/src/flow_stat.h b/src/flow_stat.h index 6966b116..3e00a180 100644 --- a/src/flow_stat.h +++ b/src/flow_stat.h @@ -202,7 +202,7 @@ class CFlowStatRuleMgr { int start_stream(TrexStream * stream, uint16_t &ret_hw_id); int stop_stream(const TrexStream * stream); int get_active_pgids(flow_stat_active_t &result); - bool dump_json(std::string & json, bool force_sync); + bool dump_json(std::string & json, bool baseline); private: int compile_stream(const TrexStream * stream, Cxl710Parser &parser); diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h index e9cf56d7..f8f76584 100644 --- a/src/internal_api/trex_platform_api.h +++ b/src/internal_api/trex_platform_api.h @@ -139,7 +139,7 @@ public: virtual void get_interface_info(uint8_t interface_id, intf_info_st &info) const = 0; - virtual void publish_async_data_now(uint32_t key) const = 0; + virtual void publish_async_data_now(uint32_t key, bool baseline) const = 0; virtual uint8_t get_dp_core_count() const = 0; virtual void get_interface_stat_info(uint8_t interface_id, uint16_t &num_counters, uint16_t &capabilities) const =0; virtual int get_flow_stats(uint8_t port_id, uint64_t *stats, void *tx_stats, int min, int max, bool reset) const = 0; @@ -168,7 +168,7 @@ public: void get_interface_info(uint8_t interface_id, intf_info_st &info) const; - void publish_async_data_now(uint32_t key) const; + void publish_async_data_now(uint32_t key, bool baseline) const; uint8_t get_dp_core_count() const; void get_interface_stat_info(uint8_t interface_id, uint16_t &num_counters, uint16_t &capabilities) const; int get_flow_stats(uint8_t port_id, uint64_t *stats, void *tx_stats, int min, int max, bool reset) const; @@ -222,7 +222,7 @@ public: } } - virtual void publish_async_data_now(uint32_t key) const { + virtual void publish_async_data_now(uint32_t key, bool baseline) const { } virtual int get_flow_stats(uint8_t port_id, uint64_t *stats, void *tx_stats, int min, int max, bool reset) const {return 0;}; diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index 2d087c8c..1b750bbd 100644 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -2307,7 +2307,7 @@ public: public: void Dump(FILE *fd,DumpFormat mode); void DumpAllPorts(FILE *fd); - void dump_json(std::string & json, bool force_sync); + void dump_json(std::string & json, bool baseline); private: std::string get_field(std::string name,float &f); std::string get_field(std::string name,uint64_t &f); @@ -2341,12 +2341,12 @@ std::string CGlobalStats::get_field_port(int port,std::string name,uint64_t &f){ } -void CGlobalStats::dump_json(std::string & json, bool force_sync){ +void CGlobalStats::dump_json(std::string & json, bool baseline){ /* refactor this to JSON */ json="{\"name\":\"trex-global\",\"type\":0,"; - if (force_sync) { - json += "\"sync\": true,"; + if (baseline) { + json += "\"baseline\": true,"; } json +="\"data\":{"; @@ -2638,7 +2638,7 @@ private: public: - void publish_async_data(bool sync_now); + void publish_async_data(bool sync_now, bool baseline = false); void publish_async_barrier(uint32_t key); void dump_stats(FILE *fd, @@ -3563,7 +3563,7 @@ void CGlobalTRex::dump_stats(FILE *fd, CGlobalStats::DumpFormat format){ } void -CGlobalTRex::publish_async_data(bool sync_now) { +CGlobalTRex::publish_async_data(bool sync_now, bool baseline) { std::string json; /* refactor to update, dump, and etc. */ @@ -3572,7 +3572,7 @@ CGlobalTRex::publish_async_data(bool sync_now) { get_stats(m_stats); } - m_stats.dump_json(json, sync_now); + m_stats.dump_json(json, baseline); m_zmq_publisher.publish_json(json); /* generator json , all cores are the same just sample the first one */ @@ -3599,7 +3599,7 @@ CGlobalTRex::publish_async_data(bool sync_now) { m_zmq_publisher.publish_json(json); if (get_is_stateless()) { - if (m_trex_stateless->m_rx_flow_stat.dump_json(json, sync_now)) + if (m_trex_stateless->m_rx_flow_stat.dump_json(json, baseline)) m_zmq_publisher.publish_json(json); } } @@ -5224,8 +5224,8 @@ TrexDpdkPlatformApi::get_interface_info(uint8_t interface_id, intf_info_st &info } void -TrexDpdkPlatformApi::publish_async_data_now(uint32_t key) const { - g_trex.publish_async_data(true); +TrexDpdkPlatformApi::publish_async_data_now(uint32_t key, bool baseline) const { + g_trex.publish_async_data(true, baseline); g_trex.publish_async_barrier(key); } diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index dcf74b50..f054c0ed 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -398,9 +398,10 @@ trex_rpc_cmd_rc_e TrexRpcPublishNow::_run(const Json::Value ¶ms, Json::Value &result) { TrexStateless *main = get_stateless_obj(); - uint32_t key = parse_uint32(params, "key", result); + uint32_t key = parse_uint32(params, "key", result); + bool baseline = parse_bool(params, "baseline", result); - main->get_platform_api()->publish_async_data_now(key); + main->get_platform_api()->publish_async_data_now(key, baseline); result["result"] = Json::objectValue; return (TREX_RPC_CMD_OK); diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h index 386ccc27..c4b01b85 100644 --- a/src/rpc-server/commands/trex_rpc_cmds.h +++ b/src/rpc-server/commands/trex_rpc_cmds.h @@ -57,7 +57,7 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdTestSub, "test_sub", 2, false); * general cmds */ TREX_RPC_CMD_DEFINE(TrexRpcCmdPing, "ping", 0, false); -TREX_RPC_CMD_DEFINE(TrexRpcPublishNow, "publish_now", 1, false); +TREX_RPC_CMD_DEFINE(TrexRpcPublishNow, "publish_now", 2, false); TREX_RPC_CMD_DEFINE(TrexRpcCmdGetCmds, "get_supported_cmds", 0, false); TREX_RPC_CMD_DEFINE(TrexRpcCmdGetVersion, "get_version", 0, false); TREX_RPC_CMD_DEFINE(TrexRpcCmdGetActivePGIds, "get_active_pgids",0, false); |