summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/client
diff options
context:
space:
mode:
authorDan Klein <danklein10@gmail.com>2015-11-03 09:37:58 +0200
committerDan Klein <danklein10@gmail.com>2015-11-03 09:37:58 +0200
commitc5078068c4659f5445d9c684c67b55ee2c7e10d6 (patch)
tree124c04254f8e79e3ab1792b256e4cb113f81e3a4 /scripts/automation/trex_control_plane/client
parent2636c09cfb74c7981c27d84bcc72d00929fdbbbb (diff)
parent0ceddc74c938a023c515be4ed2c37198fd66e87e (diff)
Merge branch 'rpc_intg1' into dan_stateless
Diffstat (limited to 'scripts/automation/trex_control_plane/client')
-rw-r--r--scripts/automation/trex_control_plane/client/trex_async_client.py184
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py32
2 files changed, 208 insertions, 8 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py
new file mode 100644
index 00000000..49ef9506
--- /dev/null
+++ b/scripts/automation/trex_control_plane/client/trex_async_client.py
@@ -0,0 +1,184 @@
+#!/router/bin/python
+
+try:
+ # support import for Python 2
+ import outer_packages
+except ImportError:
+ # support import for Python 3
+ import client.outer_packages
+from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage
+
+import json
+import threading
+import time
+import datetime
+import zmq
+import re
+
+from common.trex_stats import *
+from common.trex_streams import *
+
+# basic async stats class
+class TrexAsyncStats(object):
+ def __init__ (self):
+ self.ref_point = None
+ self.current = {}
+ self.last_update_ts = datetime.datetime.now()
+
+ def update (self, snapshot):
+
+ #update
+ self.last_update_ts = datetime.datetime.now()
+
+ self.current = snapshot
+
+ if self.ref_point == None:
+ self.ref_point = self.current
+
+
+ def get (self, field):
+
+ if not field in self.current:
+ return 0
+
+ return self.current[field]
+
+ def get_rel (self, field):
+ if not field in self.current:
+ return 0
+
+ return self.current[field] - self.ref_point[field]
+
+
+ # return true if new data has arrived in the past 2 seconds
+ def is_online (self):
+ delta_ms = (datetime.datetime.now() - self.last_update_ts).total_seconds() * 1000
+ return (delta_ms < 2000)
+
+# describes the general stats provided by TRex
+class TrexAsyncStatsGeneral(TrexAsyncStats):
+ def __init__ (self):
+ super(TrexAsyncStatsGeneral, self).__init__()
+
+
+# per port stats
+class TrexAsyncStatsPort(TrexAsyncStats):
+ def __init__ (self):
+ super(TrexAsyncStatsPort, self).__init__()
+
+
+# stats manager
+class TrexAsyncStatsManager():
+ def __init__ (self):
+
+ self.general_stats = TrexAsyncStatsGeneral()
+ self.port_stats = {}
+
+
+ def get_general_stats (self):
+ return self.general_stats
+
+ def get_port_stats (self, port_id):
+
+ if not port_id in self.port_stats:
+ return None
+
+ return self.port_stats[port_id]
+
+
+ def update (self, snapshot):
+
+ if snapshot['name'] == 'trex-global':
+ self.__handle_snapshot(snapshot['data'])
+ else:
+ # for now ignore the rest
+ return
+
+ def __handle_snapshot (self, snapshot):
+
+ general_stats = {}
+ port_stats = {}
+
+ # filter the values per port and general
+ for key, value in snapshot.iteritems():
+
+ # match a pattern of ports
+ m = re.search('.*\-([0-8])', key)
+ if m:
+ port_id = m.group(1)
+
+ if not port_id in port_stats:
+ port_stats[port_id] = {}
+
+ port_stats[port_id][key] = value
+
+ else:
+ # no port match - general stats
+ general_stats[key] = value
+
+ # update the general object with the snapshot
+ self.general_stats.update(general_stats)
+
+ # update all ports
+ for port_id, data in port_stats.iteritems():
+
+ if not port_id in self.port_stats:
+ self.port_stats[port_id] = TrexAsyncStatsPort()
+
+ self.port_stats[port_id].update(data)
+
+
+
+
+
+class CTRexAsyncClient():
+ def __init__ (self, port):
+
+ self.port = port
+
+ self.raw_snapshot = {}
+
+ self.stats = TrexAsyncStatsManager()
+
+
+ self.tr = "tcp://localhost:{0}".format(self.port)
+ print "\nConnecting To ZMQ Publisher At {0}".format(self.tr)
+
+ self.active = True
+ self.t = threading.Thread(target = self._run)
+
+ # kill this thread on exit and don't add it to the join list
+ self.t.setDaemon(True)
+ self.t.start()
+
+
+ def _run (self):
+
+ # Socket to talk to server
+ self.context = zmq.Context()
+ self.socket = self.context.socket(zmq.SUB)
+
+ self.socket.connect(self.tr)
+ self.socket.setsockopt(zmq.SUBSCRIBE, '')
+
+ while self.active:
+ msg = json.loads(self.socket.recv_string())
+
+ key = msg['name']
+ self.raw_snapshot[key] = msg['data']
+
+ self.stats.update(msg)
+
+
+ def get_stats (self):
+ return self.stats
+
+ def get_raw_snapshot (self):
+ #return str(self.stats.global_stats.get('m_total_tx_bytes')) + " / " + str(self.stats.global_stats.get_rel('m_total_tx_bytes'))
+ return self.raw_snapshot
+
+
+ def stop (self):
+ self.active = False
+ self.t.join()
+
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index 9e1c7cf3..6b1b7b94 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -13,6 +13,8 @@ from common.trex_stats import *
from common.trex_streams import *
from collections import namedtuple
+from trex_async_client import CTRexAsyncClient
+
RpcCmdData = namedtuple('RpcCmdData', ['method', 'params'])
class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])):
@@ -27,10 +29,10 @@ class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg']
class CTRexStatelessClient(object):
"""docstring for CTRexStatelessClient"""
- def __init__(self, username, server="localhost", port=5050, virtual=False):
+ def __init__(self, username, server="localhost", sync_port=5050, async_port = 4500, virtual=False):
super(CTRexStatelessClient, self).__init__()
self.user = username
- self.comm_link = CTRexStatelessClient.CCommLink(server, port, virtual)
+ self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual)
self.verbose = False
self._conn_handler = {}
self._active_ports = set()
@@ -39,6 +41,9 @@ class CTRexStatelessClient(object):
self._server_version = None
self.__err_log = None
+ self._async_client = CTRexAsyncClient(async_port)
+
+
# ----- decorator methods ----- #
def force_status(owned=True, active_and_owned=False):
def wrapper(func):
@@ -100,6 +105,12 @@ class CTRexStatelessClient(object):
return rc, err
return self._init_sync()
+ def get_stats_async (self):
+ return self._async_client.get_stats()
+
+ def get_connection_port (self):
+ return self.comm_link.port
+
def disconnect(self):
return self.comm_link.disconnect()
@@ -125,6 +136,10 @@ class CTRexStatelessClient(object):
else:
return port_ids
+ def sync_user(self):
+ return self.transmit("sync_user")
+
+
def get_acquired_ports(self):
return self._conn_handler.keys()
@@ -264,7 +279,7 @@ class CTRexStatelessClient(object):
if isinstance(port_id, list) or isinstance(port_id, set):
# handle as batch mode
port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ commands = [RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id, "mul": 1.0})
for p_id in port_ids]
rc, resp_list = self.transmit_batch(commands)
if rc:
@@ -272,7 +287,8 @@ class CTRexStatelessClient(object):
success_test=self.ack_success_test)
else:
params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
+ "port_id": port_id,
+ "mul": 1.0}
command = RpcCmdData("start_traffic", params)
return self._handle_start_traffic_response(command,
self.transmit(command.method, command.params),
@@ -299,10 +315,10 @@ class CTRexStatelessClient(object):
self.transmit(command.method, command.params),
self.ack_success_test)
- def get_global_stats(self):
- command = RpcCmdData("get_global_stats", {})
- return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params))
- # return self.transmit("get_global_stats")
+# def get_global_stats(self):
+# command = RpcCmdData("get_global_stats", {})
+# return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params))
+# # return self.transmit("get_global_stats")
@force_status(owned=True, active_and_owned=True)
def get_port_stats(self, port_id=None):