summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/client
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2015-11-02 16:14:02 +0200
committerimarom <imarom@cisco.com>2015-11-02 16:14:02 +0200
commit1586ab131f28c03ea65373d9e702e4051ffb9a56 (patch)
tree506444d4e0b0dad8325e8ac467583ee2024308ad /scripts/automation/trex_control_plane/client
parent7d7767e17b1a4e54a8934ded724f54dc5b6228ce (diff)
status is back online + ZMQ async stats
Diffstat (limited to 'scripts/automation/trex_control_plane/client')
-rw-r--r--scripts/automation/trex_control_plane/client/trex_async_client.py172
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py23
2 files changed, 189 insertions, 6 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py
new file mode 100644
index 00000000..ea716057
--- /dev/null
+++ b/scripts/automation/trex_control_plane/client/trex_async_client.py
@@ -0,0 +1,172 @@
+#!/router/bin/python
+
+try:
+ # support import for Python 2
+ import outer_packages
+except ImportError:
+ # support import for Python 3
+ import client.outer_packages
+from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage
+
+import json
+import threading
+import time
+import zmq
+import re
+
+from common.trex_stats import *
+from common.trex_streams import *
+
+# basic async stats class
+class TrexAsyncStats(object):
+ def __init__ (self):
+ self.ref_point = None
+
+ def update (self, snapshot):
+
+ self.current = snapshot
+
+ if self.ref_point == None:
+ self.ref_point = self.current
+
+
+ def get (self, field):
+
+ if not field in self.current:
+ return None
+
+ return self.current[field]
+
+ def get_rel (self, field):
+ if not field in self.current:
+ return None
+
+ return self.current[field] - self.ref_point[field]
+
+
+# describes the general stats provided by TRex
+class TrexAsyncStatsGeneral(TrexAsyncStats):
+ def __init__ (self):
+ super(TrexAsyncStatsGeneral, self).__init__()
+
+
+# per port stats
+class TrexAsyncStatsPort(TrexAsyncStats):
+ def __init__ (self):
+ super(TrexAsyncStatsPort, self).__init__()
+
+
+# stats manager
+class TrexAsyncStatsManager():
+ def __init__ (self, port_count):
+ self.port_count = port_count
+
+ self.general_stats = TrexAsyncStatsGeneral()
+ self.port_stats = {}
+
+ def get_general_stats (self):
+ return self.general_stats
+
+ def get_port_stats (self, port_id):
+
+ if not port_id in self.port_stats:
+ return None
+
+ return self.port_stats[port_id]
+
+
+ def update (self, snapshot):
+
+ if snapshot['name'] == 'trex-global':
+ self.__handle_snapshot(snapshot['data'])
+ else:
+ # for now ignore the rest
+ return
+
+ def __handle_snapshot (self, snapshot):
+
+ general_stats = {}
+ port_stats = {}
+
+ # filter the values per port and general
+ for key, value in snapshot.iteritems():
+
+ # match a pattern of ports
+ m = re.search('.*\-([0-8])', key)
+ if m:
+ port_id = m.group(1)
+
+ if not port_id in port_stats:
+ port_stats[port_id] = {}
+
+ port_stats[port_id][key] = value
+
+ else:
+ # no port match - general stats
+ general_stats[key] = value
+
+ # update the general object with the snapshot
+ self.general_stats.update(general_stats)
+
+ # update all ports
+ for port_id, data in port_stats.iteritems():
+
+ if not port_id in self.port_stats:
+ self.port_stats[port_id] = TrexAsyncStatsPort()
+
+ self.port_stats[port_id].update(data)
+
+
+
+
+
+class TrexAsyncClient():
+ def __init__ (self, port):
+
+ self.port = port
+
+ self.raw_snapshot = {}
+
+ self.stats = TrexAsyncStatsManager(1)
+
+ self.active = True
+ self.t = threading.Thread(target = self._run)
+
+ # kill this thread on exit and don't add it to the join list
+ self.t.setDaemon(True)
+ self.t.start()
+
+
+ def _run (self):
+
+ # Socket to talk to server
+ self.context = zmq.Context()
+ self.socket = self.context.socket(zmq.SUB)
+
+ self.c = "tcp://localhost:{0}".format(self.port)
+ print "Connecting To ZMQ Publisher At {0}".format(self.c)
+
+ self.socket.connect(self.c)
+ self.socket.setsockopt(zmq.SUBSCRIBE, '')
+
+ while self.active:
+ msg = json.loads(self.socket.recv_string())
+
+ key = msg['name']
+ self.raw_snapshot[key] = msg['data']
+
+ self.stats.update(msg)
+
+
+ def get_stats (self):
+ return self.stats
+
+ def get_raw_snapshot (self):
+ #return str(self.stats.global_stats.get('m_total_tx_bytes')) + " / " + str(self.stats.global_stats.get_rel('m_total_tx_bytes'))
+ return self.raw_snapshot
+
+
+ def stop (self):
+ self.active = False
+ self.t.join()
+
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index 8231fe33..c180e0d1 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -13,6 +13,8 @@ from common.trex_stats import *
from common.trex_streams import *
from collections import namedtuple
+from trex_async_client import TrexAsyncClient
+
RpcCmdData = namedtuple('RpcCmdData', ['method', 'params'])
class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])):
@@ -27,10 +29,10 @@ class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg']
class CTRexStatelessClient(object):
"""docstring for CTRexStatelessClient"""
- def __init__(self, username, server="localhost", port=5050, virtual=False):
+ def __init__(self, username, server="localhost", sync_port=5050, async_port = 4500, virtual=False):
super(CTRexStatelessClient, self).__init__()
self.user = username
- self.comm_link = CTRexStatelessClient.CCommLink(server, port, virtual)
+ self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual)
self.verbose = False
self._conn_handler = {}
self._active_ports = set()
@@ -39,6 +41,9 @@ class CTRexStatelessClient(object):
self._server_version = None
self.__err_log = None
+ self._async_client = TrexAsyncClient(async_port)
+
+
# ----- decorator methods ----- #
def force_status(owned=True, active_and_owned=False):
def wrapper(func):
@@ -100,6 +105,12 @@ class CTRexStatelessClient(object):
return rc, err
return self._init_sync()
+ def get_stats_async (self):
+ return self._async_client.get_stats()
+
+ def get_connection_port (self):
+ return self.comm_link.port
+
def disconnect(self):
return self.comm_link.disconnect()
@@ -300,10 +311,10 @@ class CTRexStatelessClient(object):
self.transmit(command.method, command.params),
self.ack_success_test)
- def get_global_stats(self):
- command = RpcCmdData("get_global_stats", {})
- return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params))
- # return self.transmit("get_global_stats")
+# def get_global_stats(self):
+# command = RpcCmdData("get_global_stats", {})
+# return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params))
+# # return self.transmit("get_global_stats")
@force_status(owned=True, active_and_owned=True)
def get_port_stats(self, port_id=None):