diff options
author | 2016-03-09 18:04:31 +0200 | |
---|---|---|
committer | 2016-03-10 17:16:37 +0200 | |
commit | 8b0bb76f7987e33ff1b13b5bdf360a9e15f96c68 (patch) | |
tree | ce46b170f7b168726c33db7b1f5dbf1f491b15b8 /scripts/automation/trex_control_plane/stl/trex_stl_lib | |
parent | 094411ef99a485017d300e632e14aee10c8234c5 (diff) |
RX STATS !
Diffstat (limited to 'scripts/automation/trex_control_plane/stl/trex_stl_lib')
3 files changed, 198 insertions, 102 deletions
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py index 36103cae..82891b68 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_async_client.py @@ -245,9 +245,11 @@ class CTRexAsyncClient(): name = msg['name'] data = msg['data'] type = msg['type'] + sync = msg.get('sync', False) + self.raw_snapshot[name] = data - self.__dispatch(name, type, data) + self.__dispatch(name, type, data, sync) # closing of socket must be from the same thread @@ -268,10 +270,10 @@ class CTRexAsyncClient(): return self.raw_snapshot # dispatch the message to the right place - def __dispatch (self, name, type, data): + def __dispatch (self, name, type, data, sync): # stats if name == "trex-global": - self.event_handler.handle_async_stats_update(data) + self.event_handler.handle_async_stats_update(data, sync) # events elif name == "trex-event": @@ -282,7 +284,7 @@ class CTRexAsyncClient(): self.handle_async_barrier(type, data) elif name == "flow_stats": - self.event_handler.handle_async_rx_stats_event(data) + self.event_handler.handle_async_rx_stats_event(data, sync) else: pass diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index 7dc7ff32..1905d44f 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -155,12 +155,12 @@ class AsyncEventHandler(object): pass - def handle_async_rx_stats_event (self, data): - self.client.flow_stats.update(data) + def handle_async_rx_stats_event (self, data, sync): + self.client.flow_stats.update(data, sync) # handles an async stats update from the subscriber - def handle_async_stats_update(self, dump_data): + def handle_async_stats_update(self, dump_data, sync): global_stats = {} port_stats = {} @@ -182,11 +182,11 @@ class AsyncEventHandler(object): global_stats[key] = value # update the general object with the snapshot - self.client.global_stats.update(global_stats) + self.client.global_stats.update(global_stats, sync) # update all ports for port_id, data in port_stats.iteritems(): - self.client.ports[port_id].port_stats.update(data) + self.client.ports[port_id].port_stats.update(data, sync) # dispatcher for server async events (port started, port stopped and etc.) @@ -808,6 +808,7 @@ class STLClient(object): self.ports[port_id].invalidate_stats() self.global_stats.invalidate() + self.flow_stats.invalidate() return RC_OK() diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py index 418affb7..60c8229d 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py @@ -13,6 +13,7 @@ import re import math import copy import threading +import pprint GLOBAL_STATS = 'g' PORT_STATS = 'p' @@ -137,7 +138,7 @@ class CTRexInfoGenerator(object): stats_table = text_tables.TRexTextTable() stats_table.set_cols_align(["l"] + ["r"] * stream_count) - stats_table.set_cols_width([20] + [17] * stream_count) + stats_table.set_cols_width([10] + [17] * stream_count) stats_table.set_cols_dtype(['t'] + ['t'] * stream_count) stats_table.add_rows([[k] + v @@ -388,7 +389,7 @@ class CTRexStats(object): self.last_update_ts = time.time() self.history = deque(maxlen = 10) self.lock = threading.Lock() - + self.is_synced = False ######## abstract methods ########## @@ -401,37 +402,29 @@ class CTRexStats(object): raise NotImplementedError() # called when a snapshot arrives - add more fields - def preprocess_snapshot (self, snapshot): + def _update (self, snapshot, sync): raise NotImplementedError() - ######## END abstract methods ########## - def __update_ref (self): - deep_merge_dicts(self.reference_stats, self.latest_stats) + ######## END abstract methods ########## + def update(self, snapshot, sync): - def update(self, snapshot): + if not self.is_synced and not sync: + return - # some extended generated values (from base values) - self.preprocess_snapshot(snapshot) + rc = self._update(snapshot) + if not rc: + return - # update - self.latest_stats = snapshot + # sync one time + if not self.is_synced and sync: + self.reference_stats = copy.deepcopy(self.latest_stats) + self.is_synced = True + # save history with self.lock: - self.history.append(snapshot) - - diff_time = time.time() - self.last_update_ts - - # handle the reference (base) - self.__update_ref() - - # 3 seconds its a timeout - if diff_time > 3: - self.clear_stats() - - - self.last_update_ts = time.time() + self.history.append(self.latest_stats) def clear_stats(self): @@ -467,14 +460,13 @@ class CTRexStats(object): def get_rel(self, field, format=False, suffix=""): - ref_value = self._get(self.reference_stats, field) - if ref_value == None: - return "N/A" - - # if the latest does not have the value - its like the ref + + ref_value = self._get(self.reference_stats, field, default = 0) latest_value = self._get(self.latest_stats, field) + + # latest value is an aggregation - must contain the value if latest_value == None: - latest_value = ref_value + return "N/A" value = latest_value - ref_value @@ -564,7 +556,7 @@ class CGlobalStats(CTRexStats): return stats - def preprocess_snapshot (self, snapshot): + def pre_update (self, snapshot): # L1 bps bps = snapshot.get("m_tx_bps") pps = snapshot.get("m_tx_pps") @@ -578,6 +570,16 @@ class CGlobalStats(CTRexStats): snapshot['m_tx_bps_L1'] = bps_L1 + def _update(self, snapshot): + + self.pre_update(snapshot) + + # simple... + self.latest_stats = snapshot + + return True + + def generate_stats(self): return OrderedDict([("connection", "{host}, Port {port}".format(host=self.connection_info.get("server"), port=self.connection_info.get("sync_port"))), @@ -674,7 +676,7 @@ class CPortStats(CTRexStats): return stats - def preprocess_snapshot (self, snapshot): + def pre_update (self, snapshot): # L1 bps bps = snapshot.get("m_total_tx_bps") pps = snapshot.get("m_total_tx_pps") @@ -689,6 +691,16 @@ class CPortStats(CTRexStats): snapshot['m_percentage'] = (bps_L1 / self._port_obj.get_speed_bps()) * 100 + def _update(self, snapshot): + + self.pre_update(snapshot) + + # simple... + self.latest_stats = snapshot + + return True + + def generate_stats(self): state = self._port_obj.get_port_state_name() if self._port_obj else "" @@ -749,6 +761,9 @@ class CPortStats(CTRexStats): } + + +# RX stats objects - COMPLEX :-( class CRxStats(CTRexStats): def __init__(self): super(CRxStats, self).__init__() @@ -760,98 +775,177 @@ class CRxStats(CTRexStats): factor = bps / (pps * 8.0) return bps * ( 1 + (20 / factor) ) + def calculate_diff_sec (self, current, prev): + if not 'ts' in current: + raise ValueError("INTERNAL ERROR: RX stats snapshot MUST contain 'ts' field") - def preprocess_snapshot (self, snapshot): - # heavy pre-processing here... - new_snapshot = {} + if prev: + prev_ts = prev['ts'] + now_ts = current['ts'] + diff_sec = (now_ts['value'] - prev_ts['value']) / float(now_ts['freq']) + else: + diff_sec = 0.0 - if not 'ts' in snapshot: - raise ValueError("INTERNAL ERROR: RX stats snapshot MUST contain 'ts' field") + return diff_sec + + + def process_single_pg (self, current_pg, prev_pg): + + # start with the previous PG + output = copy.deepcopy(prev_pg) + + for field in ['tx_pkts', 'tx_bytes', 'rx_pkts']: + if not field in output: + output[field] = {} + + if field in current_pg: + for port, pv in current_pg[field].iteritems(): + if not self.is_intable(port): + continue - new_snapshot['ts'] = snapshot['ts'] + output[field][port] = pv - for key, value in snapshot.iteritems(): - # skip non int values (simply copy) - if key == 'ts': - new_snapshot[key] = value + # sum up + total = 0 + for port, pv in output[field].iteritems(): + if not self.is_intable(port): + continue + total += pv + + output[field]['total'] = total + + return output + + + def is_intable (self, value): + try: + int(value) + return True + except ValueError: + return False + + def process_snapshot (self, current, prev): + + # timestamp + diff_sec = self.calculate_diff_sec(current, prev) + + # final output + output = {} + + # copy timestamp field + output['ts'] = current['ts'] + + pg_ids = set(prev.keys() + current.keys()) + + for pg_id in pg_ids: + if not self.is_intable(pg_id): continue - # all the rest MUST be ints - try: - pg_id = int(key) - except ValueError: - assert(0) + current_pg = current.get(pg_id, {}) + prev_pg = prev.get(pg_id, {}) + + if current_pg.get('first_time'): + # new value - ignore history + output[pg_id] = self.process_single_pg(current_pg, {}) + self.reference_stats[pg_id] = {} + self.calculate_bw_for_pg(output[pg_id], prev_pg, 0) + else: + # aggregate + output[pg_id] = self.process_single_pg(current_pg, prev_pg) - # handle PG ID - new_snapshot[pg_id] = {} - for field in ['tx_pkts', 'tx_bytes', 'rx_pkts']: - new_snapshot[pg_id][field] = {'total': 0} - if field in value: - for port, pv in value[field].iteritems(): - new_snapshot[pg_id][field][int(port)] = pv - new_snapshot[pg_id][field]['total'] += pv + self.calculate_bw_for_pg(output[pg_id], prev_pg, diff_sec) - # add B/W calcs for a PG id - self.calculate_bw(new_snapshot, pg_id) - snapshot.clear() - snapshot.update(new_snapshot) + return output - def calculate_bw (self, snapshot, pg_id): - if not self.latest_stats: - snapshot[pg_id]['tx_pps'] = 0.0 - snapshot[pg_id]['tx_bps'] = 0.0 - snapshot[pg_id]['rx_pps'] = 0.0 - snapshot[pg_id]['rx_bps'] = 0.0 + + def calculate_bw_for_pg (self, pg_current, pg_prev, diff_sec): + + if diff_sec == 0: + pg_current['tx_pps'] = 0.0 + pg_current['tx_bps'] = 0.0 + pg_current['tx_bps_L1'] = 0.0 + pg_current['rx_pps'] = 0.0 + pg_current['rx_bps'] = 0.0 return + # prev - prev_ts = self._get(self.latest_stats, 'ts') - prev_tx_pkts = self._get(self.latest_stats, [pg_id, 'tx_pkts', 'total'], default = 0.0) - prev_tx_bytes = self._get(self.latest_stats, [pg_id, 'tx_bytes', 'total'], default = 0.0) - prev_tx_pps = self._get(self.latest_stats, [pg_id, 'tx_pps'], default = 0.0) - prev_tx_bps = self._get(self.latest_stats, [pg_id, 'tx_bps'], default = 0.0) + prev_tx_pkts = pg_prev['tx_pkts']['total'] + prev_tx_bytes = pg_prev['tx_bytes']['total'] + prev_tx_pps = pg_prev['tx_pps'] + prev_tx_bps = pg_prev['tx_bps'] # now - now_ts = snapshot['ts'] - now_tx_pkts = snapshot[pg_id]['tx_pkts']['total'] - now_tx_bytes = snapshot[pg_id]['tx_bytes']['total'] + now_tx_pkts = pg_current['tx_pkts']['total'] + now_tx_bytes = pg_current['tx_bytes']['total'] - # diff seconds - diff_sec = (now_ts['value'] - prev_ts['value']) / float(now_ts['freq']) - - # calculate fields - snapshot[pg_id]['tx_pps'] = (0.5 * prev_tx_pps) + (0.5 * ( (now_tx_pkts - prev_tx_pkts) / diff_sec) ) - snapshot[pg_id]['tx_bps'] = (0.5 * prev_tx_bps) + (0.5 * ( (now_tx_bytes - prev_tx_bytes) * 8 / diff_sec) ) + if not (now_tx_pkts >= prev_tx_pkts): + print "CURRENT:\n" + pprint.pprint(pg_current) + print "PREV:\n" + pprint.pprint(pg_prev) + assert(now_tx_pkts > prev_tx_pkts) + + pg_current['tx_pps'] = (0.5 * prev_tx_pps) + (0.5 * ( (now_tx_pkts - prev_tx_pkts) / diff_sec) ) + pg_current['tx_bps'] = (0.5 * prev_tx_bps) + (0.5 * ( (now_tx_bytes - prev_tx_bytes) * 8 / diff_sec) ) + + pg_current['rx_pps'] = 0.0 + pg_current['rx_bps'] = 0.0 + + pg_current['tx_bps_L1'] = self.bps_L1(pg_current['tx_bps'], pg_current['tx_pps']) - snapshot[pg_id]['rx_pps'] = 0.0 - snapshot[pg_id]['rx_bps'] = 0.0 - snapshot[pg_id]['tx_bps_L1'] = self.bps_L1(snapshot[pg_id]['tx_bps'], snapshot[pg_id]['tx_pps']) + def _update (self, snapshot): + + # generate a new snapshot + new_snapshot = self.process_snapshot(snapshot, self.latest_stats) + + #print new_snapshot + # advance + self.latest_stats = new_snapshot + + + return True + + + + # for API def get_stats (self): stats = {} - for pg_id in self.get_streams_keys(): - stats[pg_id] = {} + for pg_id, value in self.latest_stats.iteritems(): + # skip non ints + try: + int(pg_id) + except ValueError: + continue + + stats[int(pg_id)] = {} for field in ['tx_pkts', 'tx_bytes', 'rx_pkts']: - stats[pg_id][field] = {'total': self.get_rel([pg_id, field, 'total'])} + stats[int(pg_id)][field] = {'total': self.get_rel([pg_id, field, 'total'])} - for port, pv in self.reference_stats[pg_id][field].iteritems(): - stats[pg_id][field][port] = self.get_rel([pg_id, field, port]) + for port, pv in value[field].iteritems(): + try: + int(port) + except ValueError: + continue + stats[int(pg_id)][field][int(port)] = self.get_rel([pg_id, field, port]) return stats - + def get_streams_keys (self): keys = [] - for user_id, user_id_data in self.reference_stats.iteritems(): + for key in self.latest_stats.keys(): # ignore non user ID keys try: - keys.append(int(user_id)) + int(key) + keys.append(key) except ValueError: continue @@ -860,11 +954,9 @@ class CRxStats(CTRexStats): def generate_stats (self): - stats = self.get_stats() - pg_ids = stats.keys()[:4] + pg_ids = self.get_streams_keys() cnt = len(pg_ids) - formatted_stats = OrderedDict([ ('Tx pps', []), ('Tx bps L2', []), ('Tx bps L1', []), @@ -911,3 +1003,4 @@ class CRxStats(CTRexStats): if __name__ == "__main__": pass + |