summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/client/trex_async_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane/client/trex_async_client.py')
-rw-r--r--scripts/automation/trex_control_plane/client/trex_async_client.py97
1 files changed, 70 insertions, 27 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py
index 49ef9506..adb91d97 100644
--- a/scripts/automation/trex_control_plane/client/trex_async_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_async_client.py
@@ -8,6 +8,8 @@ except ImportError:
import client.outer_packages
from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage
+from common.text_opts import *
+
import json
import threading
import time
@@ -25,6 +27,15 @@ class TrexAsyncStats(object):
self.current = {}
self.last_update_ts = datetime.datetime.now()
+ def __format_num (self, size, suffix = ""):
+
+ for unit in ['','K','M','G','T','P']:
+ if abs(size) < 1000.0:
+ return "%3.2f %s%s" % (size, unit, suffix)
+ size /= 1000.0
+
+ return "NaN"
+
def update (self, snapshot):
#update
@@ -36,18 +47,25 @@ class TrexAsyncStats(object):
self.ref_point = self.current
- def get (self, field):
+ def get (self, field, format = False, suffix = ""):
if not field in self.current:
- return 0
+ return "N/A"
+
+ if not format:
+ return self.current[field]
+ else:
+ return self.__format_num(self.current[field], suffix)
- return self.current[field]
- def get_rel (self, field):
+ def get_rel (self, field, format = False, suffix = ""):
if not field in self.current:
- return 0
+ return "N/A"
- return self.current[field] - self.ref_point[field]
+ if not format:
+ return (self.current[field] - self.ref_point[field])
+ else:
+ return self.__format_num(self.current[field] - self.ref_point[field], suffix)
# return true if new data has arrived in the past 2 seconds
@@ -66,6 +84,8 @@ class TrexAsyncStatsPort(TrexAsyncStats):
def __init__ (self):
super(TrexAsyncStatsPort, self).__init__()
+ def get_stream_stats (self, stream_id):
+ return None
# stats manager
class TrexAsyncStatsManager():
@@ -80,19 +100,14 @@ class TrexAsyncStatsManager():
def get_port_stats (self, port_id):
- if not port_id in self.port_stats:
+ if not str(port_id) in self.port_stats:
return None
- return self.port_stats[port_id]
+ return self.port_stats[str(port_id)]
-
- def update (self, snapshot):
-
- if snapshot['name'] == 'trex-global':
- self.__handle_snapshot(snapshot['data'])
- else:
- # for now ignore the rest
- return
+
+ def update (self, data):
+ self.__handle_snapshot(data)
def __handle_snapshot (self, snapshot):
@@ -103,14 +118,16 @@ class TrexAsyncStatsManager():
for key, value in snapshot.iteritems():
# match a pattern of ports
- m = re.search('.*\-([0-8])', key)
+ m = re.search('(.*)\-([0-8])', key)
if m:
- port_id = m.group(1)
+
+ port_id = m.group(2)
+ field_name = m.group(1)
if not port_id in port_stats:
port_stats[port_id] = {}
- port_stats[port_id][key] = value
+ port_stats[port_id][field_name] = value
else:
# no port match - general stats
@@ -132,27 +149,30 @@ class TrexAsyncStatsManager():
class CTRexAsyncClient():
- def __init__ (self, port):
+ def __init__ (self, server, port, stateless_client):
self.port = port
+ self.server = server
+ self.stateless_client = stateless_client
self.raw_snapshot = {}
self.stats = TrexAsyncStatsManager()
- self.tr = "tcp://localhost:{0}".format(self.port)
+ self.tr = "tcp://{0}:{1}".format(self.server, self.port)
print "\nConnecting To ZMQ Publisher At {0}".format(self.tr)
self.active = True
- self.t = threading.Thread(target = self._run)
+ self.t = threading.Thread(target = self.run)
# kill this thread on exit and don't add it to the join list
self.t.setDaemon(True)
self.t.start()
- def _run (self):
+
+ def run (self):
# Socket to talk to server
self.context = zmq.Context()
@@ -162,12 +182,15 @@ class CTRexAsyncClient():
self.socket.setsockopt(zmq.SUBSCRIBE, '')
while self.active:
- msg = json.loads(self.socket.recv_string())
+ line = self.socket.recv_string();
+ msg = json.loads(line)
- key = msg['name']
- self.raw_snapshot[key] = msg['data']
+ name = msg['name']
+ data = msg['data']
+ type = msg['type']
+ self.raw_snapshot[name] = data
- self.stats.update(msg)
+ self.__dispatch(name, type, data)
def get_stats (self):
@@ -178,6 +201,26 @@ class CTRexAsyncClient():
return self.raw_snapshot
+ # dispatch the message to the right place
+ def __dispatch (self, name, type, data):
+ # stats
+ if name == "trex-global":
+ self.stats.update(data)
+ # events
+ elif name == "trex-event":
+ self.__handle_async_event(type, data)
+ else:
+ # ignore
+ pass
+
+ def __handle_async_event (self, type, data):
+ # DP stopped
+ if (type == 0):
+ port_id = int(data['port_id'])
+ print format_text("\n[Event] - Port {0} Stopped".format(port_id), 'bold')
+ # call the handler
+ self.stateless_client.async_event_port_stopped(port_id)
+
def stop (self):
self.active = False
self.t.join()