diff options
author | 2015-11-03 09:37:58 +0200 | |
---|---|---|
committer | 2015-11-03 09:37:58 +0200 | |
commit | c5078068c4659f5445d9c684c67b55ee2c7e10d6 (patch) | |
tree | 124c04254f8e79e3ab1792b256e4cb113f81e3a4 /scripts/automation/trex_control_plane/client | |
parent | 2636c09cfb74c7981c27d84bcc72d00929fdbbbb (diff) | |
parent | 0ceddc74c938a023c515be4ed2c37198fd66e87e (diff) |
Merge branch 'rpc_intg1' into dan_stateless
Diffstat (limited to 'scripts/automation/trex_control_plane/client')
-rw-r--r-- | scripts/automation/trex_control_plane/client/trex_async_client.py | 184 | ||||
-rwxr-xr-x | scripts/automation/trex_control_plane/client/trex_stateless_client.py | 32 |
2 files changed, 208 insertions, 8 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py new file mode 100644 index 00000000..49ef9506 --- /dev/null +++ b/scripts/automation/trex_control_plane/client/trex_async_client.py @@ -0,0 +1,184 @@ +#!/router/bin/python + +try: + # support import for Python 2 + import outer_packages +except ImportError: + # support import for Python 3 + import client.outer_packages +from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage + +import json +import threading +import time +import datetime +import zmq +import re + +from common.trex_stats import * +from common.trex_streams import * + +# basic async stats class +class TrexAsyncStats(object): + def __init__ (self): + self.ref_point = None + self.current = {} + self.last_update_ts = datetime.datetime.now() + + def update (self, snapshot): + + #update + self.last_update_ts = datetime.datetime.now() + + self.current = snapshot + + if self.ref_point == None: + self.ref_point = self.current + + + def get (self, field): + + if not field in self.current: + return 0 + + return self.current[field] + + def get_rel (self, field): + if not field in self.current: + return 0 + + return self.current[field] - self.ref_point[field] + + + # return true if new data has arrived in the past 2 seconds + def is_online (self): + delta_ms = (datetime.datetime.now() - self.last_update_ts).total_seconds() * 1000 + return (delta_ms < 2000) + +# describes the general stats provided by TRex +class TrexAsyncStatsGeneral(TrexAsyncStats): + def __init__ (self): + super(TrexAsyncStatsGeneral, self).__init__() + + +# per port stats +class TrexAsyncStatsPort(TrexAsyncStats): + def __init__ (self): + super(TrexAsyncStatsPort, self).__init__() + + +# stats manager +class TrexAsyncStatsManager(): + def __init__ (self): + + self.general_stats = TrexAsyncStatsGeneral() + self.port_stats = {} + + + def get_general_stats (self): + return self.general_stats + + def get_port_stats (self, port_id): + + if not port_id in self.port_stats: + return None + + return self.port_stats[port_id] + + + def update (self, snapshot): + + if snapshot['name'] == 'trex-global': + self.__handle_snapshot(snapshot['data']) + else: + # for now ignore the rest + return + + def __handle_snapshot (self, snapshot): + + general_stats = {} + port_stats = {} + + # filter the values per port and general + for key, value in snapshot.iteritems(): + + # match a pattern of ports + m = re.search('.*\-([0-8])', key) + if m: + port_id = m.group(1) + + if not port_id in port_stats: + port_stats[port_id] = {} + + port_stats[port_id][key] = value + + else: + # no port match - general stats + general_stats[key] = value + + # update the general object with the snapshot + self.general_stats.update(general_stats) + + # update all ports + for port_id, data in port_stats.iteritems(): + + if not port_id in self.port_stats: + self.port_stats[port_id] = TrexAsyncStatsPort() + + self.port_stats[port_id].update(data) + + + + + +class CTRexAsyncClient(): + def __init__ (self, port): + + self.port = port + + self.raw_snapshot = {} + + self.stats = TrexAsyncStatsManager() + + + self.tr = "tcp://localhost:{0}".format(self.port) + print "\nConnecting To ZMQ Publisher At {0}".format(self.tr) + + self.active = True + self.t = threading.Thread(target = self._run) + + # kill this thread on exit and don't add it to the join list + self.t.setDaemon(True) + self.t.start() + + + def _run (self): + + # Socket to talk to server + self.context = zmq.Context() + self.socket = self.context.socket(zmq.SUB) + + self.socket.connect(self.tr) + self.socket.setsockopt(zmq.SUBSCRIBE, '') + + while self.active: + msg = json.loads(self.socket.recv_string()) + + key = msg['name'] + self.raw_snapshot[key] = msg['data'] + + self.stats.update(msg) + + + def get_stats (self): + return self.stats + + def get_raw_snapshot (self): + #return str(self.stats.global_stats.get('m_total_tx_bytes')) + " / " + str(self.stats.global_stats.get_rel('m_total_tx_bytes')) + return self.raw_snapshot + + + def stop (self): + self.active = False + self.t.join() + diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 9e1c7cf3..6b1b7b94 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -13,6 +13,8 @@ from common.trex_stats import * from common.trex_streams import * from collections import namedtuple +from trex_async_client import CTRexAsyncClient + RpcCmdData = namedtuple('RpcCmdData', ['method', 'params']) class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])): @@ -27,10 +29,10 @@ class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'] class CTRexStatelessClient(object): """docstring for CTRexStatelessClient""" - def __init__(self, username, server="localhost", port=5050, virtual=False): + def __init__(self, username, server="localhost", sync_port=5050, async_port = 4500, virtual=False): super(CTRexStatelessClient, self).__init__() self.user = username - self.comm_link = CTRexStatelessClient.CCommLink(server, port, virtual) + self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual) self.verbose = False self._conn_handler = {} self._active_ports = set() @@ -39,6 +41,9 @@ class CTRexStatelessClient(object): self._server_version = None self.__err_log = None + self._async_client = CTRexAsyncClient(async_port) + + # ----- decorator methods ----- # def force_status(owned=True, active_and_owned=False): def wrapper(func): @@ -100,6 +105,12 @@ class CTRexStatelessClient(object): return rc, err return self._init_sync() + def get_stats_async (self): + return self._async_client.get_stats() + + def get_connection_port (self): + return self.comm_link.port + def disconnect(self): return self.comm_link.disconnect() @@ -125,6 +136,10 @@ class CTRexStatelessClient(object): else: return port_ids + def sync_user(self): + return self.transmit("sync_user") + + def get_acquired_ports(self): return self._conn_handler.keys() @@ -264,7 +279,7 @@ class CTRexStatelessClient(object): if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode port_ids = set(port_id) # convert to set to avoid duplications - commands = [RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + commands = [RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id, "mul": 1.0}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) if rc: @@ -272,7 +287,8 @@ class CTRexStatelessClient(object): success_test=self.ack_success_test) else: params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} + "port_id": port_id, + "mul": 1.0} command = RpcCmdData("start_traffic", params) return self._handle_start_traffic_response(command, self.transmit(command.method, command.params), @@ -299,10 +315,10 @@ class CTRexStatelessClient(object): self.transmit(command.method, command.params), self.ack_success_test) - def get_global_stats(self): - command = RpcCmdData("get_global_stats", {}) - return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params)) - # return self.transmit("get_global_stats") +# def get_global_stats(self): +# command = RpcCmdData("get_global_stats", {}) +# return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params)) +# # return self.transmit("get_global_stats") @force_status(owned=True, active_and_owned=True) def get_port_stats(self, port_id=None): |