From 1586ab131f28c03ea65373d9e702e4051ffb9a56 Mon Sep 17 00:00:00 2001 From: imarom Date: Mon, 2 Nov 2015 16:14:02 +0200 Subject: status is back online + ZMQ async stats --- .../trex_control_plane/client/trex_async_client.py | 172 +++++++++++++++++++++ 1 file changed, 172 insertions(+) create mode 100644 scripts/automation/trex_control_plane/client/trex_async_client.py (limited to 'scripts/automation/trex_control_plane/client/trex_async_client.py') diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py new file mode 100644 index 00000000..ea716057 --- /dev/null +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -0,0 +1,172 @@ +#!/router/bin/python + +try: + # support import for Python 2 + import outer_packages +except ImportError: + # support import for Python 3 + import client.outer_packages +from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage + +import json +import threading +import time +import zmq +import re + +from common.trex_stats import * +from common.trex_streams import * + +# basic async stats class +class TrexAsyncStats(object): + def __init__ (self): + self.ref_point = None + + def update (self, snapshot): + + self.current = snapshot + + if self.ref_point == None: + self.ref_point = self.current + + + def get (self, field): + + if not field in self.current: + return None + + return self.current[field] + + def get_rel (self, field): + if not field in self.current: + return None + + return self.current[field] - self.ref_point[field] + + +# describes the general stats provided by TRex +class TrexAsyncStatsGeneral(TrexAsyncStats): + def __init__ (self): + super(TrexAsyncStatsGeneral, self).__init__() + + +# per port stats +class TrexAsyncStatsPort(TrexAsyncStats): + def __init__ (self): + super(TrexAsyncStatsPort, self).__init__() + + +# stats manager +class TrexAsyncStatsManager(): + def __init__ (self, port_count): + self.port_count = port_count + + self.general_stats = TrexAsyncStatsGeneral() + self.port_stats = {} + + def get_general_stats (self): + return self.general_stats + + def get_port_stats (self, port_id): + + if not port_id in self.port_stats: + return None + + return self.port_stats[port_id] + + + def update (self, snapshot): + + if snapshot['name'] == 'trex-global': + self.__handle_snapshot(snapshot['data']) + else: + # for now ignore the rest + return + + def __handle_snapshot (self, snapshot): + + general_stats = {} + port_stats = {} + + # filter the values per port and general + for key, value in snapshot.iteritems(): + + # match a pattern of ports + m = re.search('.*\-([0-8])', key) + if m: + port_id = m.group(1) + + if not port_id in port_stats: + port_stats[port_id] = {} + + port_stats[port_id][key] = value + + else: + # no port match - general stats + general_stats[key] = value + + # update the general object with the snapshot + self.general_stats.update(general_stats) + + # update all ports + for port_id, data in port_stats.iteritems(): + + if not port_id in self.port_stats: + self.port_stats[port_id] = TrexAsyncStatsPort() + + self.port_stats[port_id].update(data) + + + + + +class TrexAsyncClient(): + def __init__ (self, port): + + self.port = port + + self.raw_snapshot = {} + + self.stats = TrexAsyncStatsManager(1) + + self.active = True + self.t = threading.Thread(target = self._run) + + # kill this thread on exit and don't add it to the join list + self.t.setDaemon(True) + self.t.start() + + + def _run (self): + + # Socket to talk to server + self.context = zmq.Context() + self.socket = self.context.socket(zmq.SUB) + + self.c = "tcp://localhost:{0}".format(self.port) + print "Connecting To ZMQ Publisher At {0}".format(self.c) + + self.socket.connect(self.c) + self.socket.setsockopt(zmq.SUBSCRIBE, '') + + while self.active: + msg = json.loads(self.socket.recv_string()) + + key = msg['name'] + self.raw_snapshot[key] = msg['data'] + + self.stats.update(msg) + + + def get_stats (self): + return self.stats + + def get_raw_snapshot (self): + #return str(self.stats.global_stats.get('m_total_tx_bytes')) + " / " + str(self.stats.global_stats.get_rel('m_total_tx_bytes')) + return self.raw_snapshot + + + def stop (self): + self.active = False + self.t.join() + -- cgit From 3fb4e4c130da10e58af07e1f783f093515e90f96 Mon Sep 17 00:00:00 2001 From: imarom Date: Mon, 2 Nov 2015 16:40:34 +0200 Subject: few bug fixes for last commit --- .../trex_control_plane/client/trex_async_client.py | 30 +++++++++++++++------- 1 file changed, 21 insertions(+), 9 deletions(-) (limited to 'scripts/automation/trex_control_plane/client/trex_async_client.py') diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index ea716057..1ce10288 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -11,6 +11,7 @@ from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage import json import threading import time +import datetime import zmq import re @@ -21,9 +22,14 @@ from common.trex_streams import * class TrexAsyncStats(object): def __init__ (self): self.ref_point = None + self.current = {} + self.last_update_ts = datetime.datetime.now() def update (self, snapshot): + #update + self.last_update_ts = datetime.datetime.now() + self.current = snapshot if self.ref_point == None: @@ -33,17 +39,22 @@ class TrexAsyncStats(object): def get (self, field): if not field in self.current: - return None + return 0 return self.current[field] def get_rel (self, field): if not field in self.current: - return None + return 0 return self.current[field] - self.ref_point[field] + # return true if new data has arrived in the past 2 seconds + def is_online (self): + delta_ms = (datetime.datetime.now() - self.last_update_ts).total_seconds() * 1000 + return (delta_ms < 2000) + # describes the general stats provided by TRex class TrexAsyncStatsGeneral(TrexAsyncStats): def __init__ (self): @@ -58,12 +69,12 @@ class TrexAsyncStatsPort(TrexAsyncStats): # stats manager class TrexAsyncStatsManager(): - def __init__ (self, port_count): - self.port_count = port_count + def __init__ (self): self.general_stats = TrexAsyncStatsGeneral() self.port_stats = {} + def get_general_stats (self): return self.general_stats @@ -127,7 +138,11 @@ class TrexAsyncClient(): self.raw_snapshot = {} - self.stats = TrexAsyncStatsManager(1) + self.stats = TrexAsyncStatsManager() + + + self.tr = "tcp://localhost:{0}".format(self.port) + print "\nConnecting To ZMQ Publisher At {0}".format(self.tr) self.active = True self.t = threading.Thread(target = self._run) @@ -143,10 +158,7 @@ class TrexAsyncClient(): self.context = zmq.Context() self.socket = self.context.socket(zmq.SUB) - self.c = "tcp://localhost:{0}".format(self.port) - print "Connecting To ZMQ Publisher At {0}".format(self.c) - - self.socket.connect(self.c) + self.socket.connect(self.tr) self.socket.setsockopt(zmq.SUBSCRIBE, '') while self.active: -- cgit From 0ceddc74c938a023c515be4ed2c37198fd66e87e Mon Sep 17 00:00:00 2001 From: Dan Klein Date: Tue, 3 Nov 2015 09:37:42 +0200 Subject: first commit for advnaced options --- scripts/automation/trex_control_plane/client/trex_async_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'scripts/automation/trex_control_plane/client/trex_async_client.py') diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 1ce10288..49ef9506 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -131,7 +131,7 @@ class TrexAsyncStatsManager(): -class TrexAsyncClient(): +class CTRexAsyncClient(): def __init__ (self, port): self.port = port -- cgit From c2928039f199d3cef6d130201cf825d5b6b67937 Mon Sep 17 00:00:00 2001 From: imarom Date: Tue, 3 Nov 2015 10:10:52 +0200 Subject: fields for specific ports on the status screen --- .../trex_control_plane/client/trex_async_client.py | 40 ++++++++++++++++------ 1 file changed, 29 insertions(+), 11 deletions(-) (limited to 'scripts/automation/trex_control_plane/client/trex_async_client.py') diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 1ce10288..4c17603d 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -25,6 +25,15 @@ class TrexAsyncStats(object): self.current = {} self.last_update_ts = datetime.datetime.now() + def __format_num (self, size, suffix = ""): + + for unit in ['','K','M','G','T','P']: + if abs(size) < 1000.0: + return "%3.2f %s%s" % (size, unit, suffix) + size /= 1000.0 + + return "NaN" + def update (self, snapshot): #update @@ -36,18 +45,25 @@ class TrexAsyncStats(object): self.ref_point = self.current - def get (self, field): + def get (self, field, format = False, suffix = ""): if not field in self.current: - return 0 + return "N/A" - return self.current[field] + if not format: + return self.current[field] + else: + return self.__format_num(self.current[field], suffix) - def get_rel (self, field): + + def get_rel (self, field, format = False, suffix = ""): if not field in self.current: - return 0 + return "N/A" - return self.current[field] - self.ref_point[field] + if not format: + return (self.current[field] - self.ref_point[field]) + else: + return self.__format_num(self.current[field] - self.ref_point[field], suffix) # return true if new data has arrived in the past 2 seconds @@ -80,10 +96,10 @@ class TrexAsyncStatsManager(): def get_port_stats (self, port_id): - if not port_id in self.port_stats: + if not str(port_id) in self.port_stats: return None - return self.port_stats[port_id] + return self.port_stats[str(port_id)] def update (self, snapshot): @@ -103,14 +119,16 @@ class TrexAsyncStatsManager(): for key, value in snapshot.iteritems(): # match a pattern of ports - m = re.search('.*\-([0-8])', key) + m = re.search('(.*)\-([0-8])', key) if m: - port_id = m.group(1) + + port_id = m.group(2) + field_name = m.group(1) if not port_id in port_stats: port_stats[port_id] = {} - port_stats[port_id][key] = value + port_stats[port_id][field_name] = value else: # no port match - general stats -- cgit From e92507617ed8069b674fa5729b1e6a0c5d4b2662 Mon Sep 17 00:00:00 2001 From: imarom Date: Thu, 5 Nov 2015 09:55:03 +0200 Subject: bug fixes in status window --- scripts/automation/trex_control_plane/client/trex_async_client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'scripts/automation/trex_control_plane/client/trex_async_client.py') diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py index 419448bb..72cce5aa 100644 --- a/scripts/automation/trex_control_plane/client/trex_async_client.py +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -82,6 +82,8 @@ class TrexAsyncStatsPort(TrexAsyncStats): def __init__ (self): super(TrexAsyncStatsPort, self).__init__() + def get_stream_stats (self, stream_id): + return None # stats manager class TrexAsyncStatsManager(): @@ -101,7 +103,6 @@ class TrexAsyncStatsManager(): return self.port_stats[str(port_id)] - def update (self, snapshot): if snapshot['name'] == 'trex-global': -- cgit