summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py10
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py11
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py279
-rw-r--r--src/flow_stat.cpp6
-rw-r--r--src/main_dpdk.cpp15
5 files changed, 214 insertions, 107 deletions
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py
index 36103cae..82891b68 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py
@@ -245,9 +245,11 @@ class CTRexAsyncClient():
name = msg['name']
data = msg['data']
type = msg['type']
+ sync = msg.get('sync', False)
+
self.raw_snapshot[name] = data
- self.__dispatch(name, type, data)
+ self.__dispatch(name, type, data, sync)
# closing of socket must be from the same thread
@@ -268,10 +270,10 @@ class CTRexAsyncClient():
return self.raw_snapshot
# dispatch the message to the right place
- def __dispatch (self, name, type, data):
+ def __dispatch (self, name, type, data, sync):
# stats
if name == "trex-global":
- self.event_handler.handle_async_stats_update(data)
+ self.event_handler.handle_async_stats_update(data, sync)
# events
elif name == "trex-event":
@@ -282,7 +284,7 @@ class CTRexAsyncClient():
self.handle_async_barrier(type, data)
elif name == "flow_stats":
- self.event_handler.handle_async_rx_stats_event(data)
+ self.event_handler.handle_async_rx_stats_event(data, sync)
else:
pass
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
index 7dc7ff32..1905d44f 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
@@ -155,12 +155,12 @@ class AsyncEventHandler(object):
pass
- def handle_async_rx_stats_event (self, data):
- self.client.flow_stats.update(data)
+ def handle_async_rx_stats_event (self, data, sync):
+ self.client.flow_stats.update(data, sync)
# handles an async stats update from the subscriber
- def handle_async_stats_update(self, dump_data):
+ def handle_async_stats_update(self, dump_data, sync):
global_stats = {}
port_stats = {}
@@ -182,11 +182,11 @@ class AsyncEventHandler(object):
global_stats[key] = value
# update the general object with the snapshot
- self.client.global_stats.update(global_stats)
+ self.client.global_stats.update(global_stats, sync)
# update all ports
for port_id, data in port_stats.iteritems():
- self.client.ports[port_id].port_stats.update(data)
+ self.client.ports[port_id].port_stats.update(data, sync)
# dispatcher for server async events (port started, port stopped and etc.)
@@ -808,6 +808,7 @@ class STLClient(object):
self.ports[port_id].invalidate_stats()
self.global_stats.invalidate()
+ self.flow_stats.invalidate()
return RC_OK()
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py
index 418affb7..60c8229d 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py
@@ -13,6 +13,7 @@ import re
import math
import copy
import threading
+import pprint
GLOBAL_STATS = 'g'
PORT_STATS = 'p'
@@ -137,7 +138,7 @@ class CTRexInfoGenerator(object):
stats_table = text_tables.TRexTextTable()
stats_table.set_cols_align(["l"] + ["r"] * stream_count)
- stats_table.set_cols_width([20] + [17] * stream_count)
+ stats_table.set_cols_width([10] + [17] * stream_count)
stats_table.set_cols_dtype(['t'] + ['t'] * stream_count)
stats_table.add_rows([[k] + v
@@ -388,7 +389,7 @@ class CTRexStats(object):
self.last_update_ts = time.time()
self.history = deque(maxlen = 10)
self.lock = threading.Lock()
-
+ self.is_synced = False
######## abstract methods ##########
@@ -401,37 +402,29 @@ class CTRexStats(object):
raise NotImplementedError()
# called when a snapshot arrives - add more fields
- def preprocess_snapshot (self, snapshot):
+ def _update (self, snapshot, sync):
raise NotImplementedError()
- ######## END abstract methods ##########
- def __update_ref (self):
- deep_merge_dicts(self.reference_stats, self.latest_stats)
+ ######## END abstract methods ##########
+ def update(self, snapshot, sync):
- def update(self, snapshot):
+ if not self.is_synced and not sync:
+ return
- # some extended generated values (from base values)
- self.preprocess_snapshot(snapshot)
+ rc = self._update(snapshot)
+ if not rc:
+ return
- # update
- self.latest_stats = snapshot
+ # sync one time
+ if not self.is_synced and sync:
+ self.reference_stats = copy.deepcopy(self.latest_stats)
+ self.is_synced = True
+ # save history
with self.lock:
- self.history.append(snapshot)
-
- diff_time = time.time() - self.last_update_ts
-
- # handle the reference (base)
- self.__update_ref()
-
- # 3 seconds its a timeout
- if diff_time > 3:
- self.clear_stats()
-
-
- self.last_update_ts = time.time()
+ self.history.append(self.latest_stats)
def clear_stats(self):
@@ -467,14 +460,13 @@ class CTRexStats(object):
def get_rel(self, field, format=False, suffix=""):
- ref_value = self._get(self.reference_stats, field)
- if ref_value == None:
- return "N/A"
-
- # if the latest does not have the value - its like the ref
+
+ ref_value = self._get(self.reference_stats, field, default = 0)
latest_value = self._get(self.latest_stats, field)
+
+ # latest value is an aggregation - must contain the value
if latest_value == None:
- latest_value = ref_value
+ return "N/A"
value = latest_value - ref_value
@@ -564,7 +556,7 @@ class CGlobalStats(CTRexStats):
return stats
- def preprocess_snapshot (self, snapshot):
+ def pre_update (self, snapshot):
# L1 bps
bps = snapshot.get("m_tx_bps")
pps = snapshot.get("m_tx_pps")
@@ -578,6 +570,16 @@ class CGlobalStats(CTRexStats):
snapshot['m_tx_bps_L1'] = bps_L1
+ def _update(self, snapshot):
+
+ self.pre_update(snapshot)
+
+ # simple...
+ self.latest_stats = snapshot
+
+ return True
+
+
def generate_stats(self):
return OrderedDict([("connection", "{host}, Port {port}".format(host=self.connection_info.get("server"),
port=self.connection_info.get("sync_port"))),
@@ -674,7 +676,7 @@ class CPortStats(CTRexStats):
return stats
- def preprocess_snapshot (self, snapshot):
+ def pre_update (self, snapshot):
# L1 bps
bps = snapshot.get("m_total_tx_bps")
pps = snapshot.get("m_total_tx_pps")
@@ -689,6 +691,16 @@ class CPortStats(CTRexStats):
snapshot['m_percentage'] = (bps_L1 / self._port_obj.get_speed_bps()) * 100
+ def _update(self, snapshot):
+
+ self.pre_update(snapshot)
+
+ # simple...
+ self.latest_stats = snapshot
+
+ return True
+
+
def generate_stats(self):
state = self._port_obj.get_port_state_name() if self._port_obj else ""
@@ -749,6 +761,9 @@ class CPortStats(CTRexStats):
}
+
+
+# RX stats objects - COMPLEX :-(
class CRxStats(CTRexStats):
def __init__(self):
super(CRxStats, self).__init__()
@@ -760,98 +775,177 @@ class CRxStats(CTRexStats):
factor = bps / (pps * 8.0)
return bps * ( 1 + (20 / factor) )
+ def calculate_diff_sec (self, current, prev):
+ if not 'ts' in current:
+ raise ValueError("INTERNAL ERROR: RX stats snapshot MUST contain 'ts' field")
- def preprocess_snapshot (self, snapshot):
- # heavy pre-processing here...
- new_snapshot = {}
+ if prev:
+ prev_ts = prev['ts']
+ now_ts = current['ts']
+ diff_sec = (now_ts['value'] - prev_ts['value']) / float(now_ts['freq'])
+ else:
+ diff_sec = 0.0
- if not 'ts' in snapshot:
- raise ValueError("INTERNAL ERROR: RX stats snapshot MUST contain 'ts' field")
+ return diff_sec
+
+
+ def process_single_pg (self, current_pg, prev_pg):
+
+ # start with the previous PG
+ output = copy.deepcopy(prev_pg)
+
+ for field in ['tx_pkts', 'tx_bytes', 'rx_pkts']:
+ if not field in output:
+ output[field] = {}
+
+ if field in current_pg:
+ for port, pv in current_pg[field].iteritems():
+ if not self.is_intable(port):
+ continue
- new_snapshot['ts'] = snapshot['ts']
+ output[field][port] = pv
- for key, value in snapshot.iteritems():
- # skip non int values (simply copy)
- if key == 'ts':
- new_snapshot[key] = value
+ # sum up
+ total = 0
+ for port, pv in output[field].iteritems():
+ if not self.is_intable(port):
+ continue
+ total += pv
+
+ output[field]['total'] = total
+
+ return output
+
+
+ def is_intable (self, value):
+ try:
+ int(value)
+ return True
+ except ValueError:
+ return False
+
+ def process_snapshot (self, current, prev):
+
+ # timestamp
+ diff_sec = self.calculate_diff_sec(current, prev)
+
+ # final output
+ output = {}
+
+ # copy timestamp field
+ output['ts'] = current['ts']
+
+ pg_ids = set(prev.keys() + current.keys())
+
+ for pg_id in pg_ids:
+ if not self.is_intable(pg_id):
continue
- # all the rest MUST be ints
- try:
- pg_id = int(key)
- except ValueError:
- assert(0)
+ current_pg = current.get(pg_id, {})
+ prev_pg = prev.get(pg_id, {})
+
+ if current_pg.get('first_time'):
+ # new value - ignore history
+ output[pg_id] = self.process_single_pg(current_pg, {})
+ self.reference_stats[pg_id] = {}
+ self.calculate_bw_for_pg(output[pg_id], prev_pg, 0)
+ else:
+ # aggregate
+ output[pg_id] = self.process_single_pg(current_pg, prev_pg)
- # handle PG ID
- new_snapshot[pg_id] = {}
- for field in ['tx_pkts', 'tx_bytes', 'rx_pkts']:
- new_snapshot[pg_id][field] = {'total': 0}
- if field in value:
- for port, pv in value[field].iteritems():
- new_snapshot[pg_id][field][int(port)] = pv
- new_snapshot[pg_id][field]['total'] += pv
+ self.calculate_bw_for_pg(output[pg_id], prev_pg, diff_sec)
- # add B/W calcs for a PG id
- self.calculate_bw(new_snapshot, pg_id)
- snapshot.clear()
- snapshot.update(new_snapshot)
+ return output
- def calculate_bw (self, snapshot, pg_id):
- if not self.latest_stats:
- snapshot[pg_id]['tx_pps'] = 0.0
- snapshot[pg_id]['tx_bps'] = 0.0
- snapshot[pg_id]['rx_pps'] = 0.0
- snapshot[pg_id]['rx_bps'] = 0.0
+
+ def calculate_bw_for_pg (self, pg_current, pg_prev, diff_sec):
+
+ if diff_sec == 0:
+ pg_current['tx_pps'] = 0.0
+ pg_current['tx_bps'] = 0.0
+ pg_current['tx_bps_L1'] = 0.0
+ pg_current['rx_pps'] = 0.0
+ pg_current['rx_bps'] = 0.0
return
+
# prev
- prev_ts = self._get(self.latest_stats, 'ts')
- prev_tx_pkts = self._get(self.latest_stats, [pg_id, 'tx_pkts', 'total'], default = 0.0)
- prev_tx_bytes = self._get(self.latest_stats, [pg_id, 'tx_bytes', 'total'], default = 0.0)
- prev_tx_pps = self._get(self.latest_stats, [pg_id, 'tx_pps'], default = 0.0)
- prev_tx_bps = self._get(self.latest_stats, [pg_id, 'tx_bps'], default = 0.0)
+ prev_tx_pkts = pg_prev['tx_pkts']['total']
+ prev_tx_bytes = pg_prev['tx_bytes']['total']
+ prev_tx_pps = pg_prev['tx_pps']
+ prev_tx_bps = pg_prev['tx_bps']
# now
- now_ts = snapshot['ts']
- now_tx_pkts = snapshot[pg_id]['tx_pkts']['total']
- now_tx_bytes = snapshot[pg_id]['tx_bytes']['total']
+ now_tx_pkts = pg_current['tx_pkts']['total']
+ now_tx_bytes = pg_current['tx_bytes']['total']
- # diff seconds
- diff_sec = (now_ts['value'] - prev_ts['value']) / float(now_ts['freq'])
-
- # calculate fields
- snapshot[pg_id]['tx_pps'] = (0.5 * prev_tx_pps) + (0.5 * ( (now_tx_pkts - prev_tx_pkts) / diff_sec) )
- snapshot[pg_id]['tx_bps'] = (0.5 * prev_tx_bps) + (0.5 * ( (now_tx_bytes - prev_tx_bytes) * 8 / diff_sec) )
+ if not (now_tx_pkts >= prev_tx_pkts):
+ print "CURRENT:\n"
+ pprint.pprint(pg_current)
+ print "PREV:\n"
+ pprint.pprint(pg_prev)
+ assert(now_tx_pkts > prev_tx_pkts)
+
+ pg_current['tx_pps'] = (0.5 * prev_tx_pps) + (0.5 * ( (now_tx_pkts - prev_tx_pkts) / diff_sec) )
+ pg_current['tx_bps'] = (0.5 * prev_tx_bps) + (0.5 * ( (now_tx_bytes - prev_tx_bytes) * 8 / diff_sec) )
+
+ pg_current['rx_pps'] = 0.0
+ pg_current['rx_bps'] = 0.0
+
+ pg_current['tx_bps_L1'] = self.bps_L1(pg_current['tx_bps'], pg_current['tx_pps'])
- snapshot[pg_id]['rx_pps'] = 0.0
- snapshot[pg_id]['rx_bps'] = 0.0
- snapshot[pg_id]['tx_bps_L1'] = self.bps_L1(snapshot[pg_id]['tx_bps'], snapshot[pg_id]['tx_pps'])
+ def _update (self, snapshot):
+
+ # generate a new snapshot
+ new_snapshot = self.process_snapshot(snapshot, self.latest_stats)
+
+ #print new_snapshot
+ # advance
+ self.latest_stats = new_snapshot
+
+
+ return True
+
+
+
+ # for API
def get_stats (self):
stats = {}
- for pg_id in self.get_streams_keys():
- stats[pg_id] = {}
+ for pg_id, value in self.latest_stats.iteritems():
+ # skip non ints
+ try:
+ int(pg_id)
+ except ValueError:
+ continue
+
+ stats[int(pg_id)] = {}
for field in ['tx_pkts', 'tx_bytes', 'rx_pkts']:
- stats[pg_id][field] = {'total': self.get_rel([pg_id, field, 'total'])}
+ stats[int(pg_id)][field] = {'total': self.get_rel([pg_id, field, 'total'])}
- for port, pv in self.reference_stats[pg_id][field].iteritems():
- stats[pg_id][field][port] = self.get_rel([pg_id, field, port])
+ for port, pv in value[field].iteritems():
+ try:
+ int(port)
+ except ValueError:
+ continue
+ stats[int(pg_id)][field][int(port)] = self.get_rel([pg_id, field, port])
return stats
-
+
def get_streams_keys (self):
keys = []
- for user_id, user_id_data in self.reference_stats.iteritems():
+ for key in self.latest_stats.keys():
# ignore non user ID keys
try:
- keys.append(int(user_id))
+ int(key)
+ keys.append(key)
except ValueError:
continue
@@ -860,11 +954,9 @@ class CRxStats(CTRexStats):
def generate_stats (self):
- stats = self.get_stats()
- pg_ids = stats.keys()[:4]
+ pg_ids = self.get_streams_keys()
cnt = len(pg_ids)
-
formatted_stats = OrderedDict([ ('Tx pps', []),
('Tx bps L2', []),
('Tx bps L1', []),
@@ -911,3 +1003,4 @@ class CRxStats(CTRexStats):
if __name__ == "__main__":
pass
+
diff --git a/src/flow_stat.cpp b/src/flow_stat.cpp
index 4e05de8a..b2714b1e 100644
--- a/src/flow_stat.cpp
+++ b/src/flow_stat.cpp
@@ -630,8 +630,12 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool force_sync) {
root["name"] = "flow_stats";
root["type"] = 0;
- Json::Value &data_section = root["data"];
+ if (force_sync) {
+ root["sync"] = true;
+ }
+
+ Json::Value &data_section = root["data"];
data_section["ts"]["value"] = Json::Value::UInt64(os_get_hr_tick_64());
data_section["ts"]["freq"] = Json::Value::UInt64(os_get_hr_freq());
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index 3f53f83c..2d087c8c 100644
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -2307,7 +2307,7 @@ public:
public:
void Dump(FILE *fd,DumpFormat mode);
void DumpAllPorts(FILE *fd);
- void dump_json(std::string & json);
+ void dump_json(std::string & json, bool force_sync);
private:
std::string get_field(std::string name,float &f);
std::string get_field(std::string name,uint64_t &f);
@@ -2341,8 +2341,15 @@ std::string CGlobalStats::get_field_port(int port,std::string name,uint64_t &f){
}
-void CGlobalStats::dump_json(std::string & json){
- json="{\"name\":\"trex-global\",\"type\":0,\"data\":{";
+void CGlobalStats::dump_json(std::string & json, bool force_sync){
+ /* refactor this to JSON */
+
+ json="{\"name\":\"trex-global\",\"type\":0,";
+ if (force_sync) {
+ json += "\"sync\": true,";
+ }
+
+ json +="\"data\":{";
#define GET_FIELD(f) get_field(std::string(#f),f)
#define GET_FIELD_PORT(p,f) get_field_port(p,std::string(#f),lp->f)
@@ -3565,7 +3572,7 @@ CGlobalTRex::publish_async_data(bool sync_now) {
get_stats(m_stats);
}
- m_stats.dump_json(json);
+ m_stats.dump_json(json, sync_now);
m_zmq_publisher.publish_json(json);
/* generator json , all cores are the same just sample the first one */