summaryrefslogtreecommitdiffstats
path: root/scripts
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2015-11-02 16:14:02 +0200
committerimarom <imarom@cisco.com>2015-11-02 16:14:02 +0200
commit1586ab131f28c03ea65373d9e702e4051ffb9a56 (patch)
tree506444d4e0b0dad8325e8ac467583ee2024308ad /scripts
parent7d7767e17b1a4e54a8934ded724f54dc5b6228ce (diff)
status is back online + ZMQ async stats
Diffstat (limited to 'scripts')
-rw-r--r--scripts/automation/trex_control_plane/client/trex_async_client.py172
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py23
-rwxr-xr-xscripts/automation/trex_control_plane/console/trex_console.py11
-rw-r--r--scripts/automation/trex_control_plane/console/trex_status.py284
4 files changed, 335 insertions, 155 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_async_client.py b/scripts/automation/trex_control_plane/client/trex_async_client.py
new file mode 100644
index 00000000..ea716057
--- /dev/null
+++ b/scripts/automation/trex_control_plane/client/trex_async_client.py
@@ -0,0 +1,172 @@
+#!/router/bin/python
+
+try:
+ # support import for Python 2
+ import outer_packages
+except ImportError:
+ # support import for Python 3
+ import client.outer_packages
+from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage
+
+import json
+import threading
+import time
+import zmq
+import re
+
+from common.trex_stats import *
+from common.trex_streams import *
+
+# basic async stats class
+class TrexAsyncStats(object):
+ def __init__ (self):
+ self.ref_point = None
+
+ def update (self, snapshot):
+
+ self.current = snapshot
+
+ if self.ref_point == None:
+ self.ref_point = self.current
+
+
+ def get (self, field):
+
+ if not field in self.current:
+ return None
+
+ return self.current[field]
+
+ def get_rel (self, field):
+ if not field in self.current:
+ return None
+
+ return self.current[field] - self.ref_point[field]
+
+
+# describes the general stats provided by TRex
+class TrexAsyncStatsGeneral(TrexAsyncStats):
+ def __init__ (self):
+ super(TrexAsyncStatsGeneral, self).__init__()
+
+
+# per port stats
+class TrexAsyncStatsPort(TrexAsyncStats):
+ def __init__ (self):
+ super(TrexAsyncStatsPort, self).__init__()
+
+
+# stats manager
+class TrexAsyncStatsManager():
+ def __init__ (self, port_count):
+ self.port_count = port_count
+
+ self.general_stats = TrexAsyncStatsGeneral()
+ self.port_stats = {}
+
+ def get_general_stats (self):
+ return self.general_stats
+
+ def get_port_stats (self, port_id):
+
+ if not port_id in self.port_stats:
+ return None
+
+ return self.port_stats[port_id]
+
+
+ def update (self, snapshot):
+
+ if snapshot['name'] == 'trex-global':
+ self.__handle_snapshot(snapshot['data'])
+ else:
+ # for now ignore the rest
+ return
+
+ def __handle_snapshot (self, snapshot):
+
+ general_stats = {}
+ port_stats = {}
+
+ # filter the values per port and general
+ for key, value in snapshot.iteritems():
+
+ # match a pattern of ports
+ m = re.search('.*\-([0-8])', key)
+ if m:
+ port_id = m.group(1)
+
+ if not port_id in port_stats:
+ port_stats[port_id] = {}
+
+ port_stats[port_id][key] = value
+
+ else:
+ # no port match - general stats
+ general_stats[key] = value
+
+ # update the general object with the snapshot
+ self.general_stats.update(general_stats)
+
+ # update all ports
+ for port_id, data in port_stats.iteritems():
+
+ if not port_id in self.port_stats:
+ self.port_stats[port_id] = TrexAsyncStatsPort()
+
+ self.port_stats[port_id].update(data)
+
+
+
+
+
+class TrexAsyncClient():
+ def __init__ (self, port):
+
+ self.port = port
+
+ self.raw_snapshot = {}
+
+ self.stats = TrexAsyncStatsManager(1)
+
+ self.active = True
+ self.t = threading.Thread(target = self._run)
+
+ # kill this thread on exit and don't add it to the join list
+ self.t.setDaemon(True)
+ self.t.start()
+
+
+ def _run (self):
+
+ # Socket to talk to server
+ self.context = zmq.Context()
+ self.socket = self.context.socket(zmq.SUB)
+
+ self.c = "tcp://localhost:{0}".format(self.port)
+ print "Connecting To ZMQ Publisher At {0}".format(self.c)
+
+ self.socket.connect(self.c)
+ self.socket.setsockopt(zmq.SUBSCRIBE, '')
+
+ while self.active:
+ msg = json.loads(self.socket.recv_string())
+
+ key = msg['name']
+ self.raw_snapshot[key] = msg['data']
+
+ self.stats.update(msg)
+
+
+ def get_stats (self):
+ return self.stats
+
+ def get_raw_snapshot (self):
+ #return str(self.stats.global_stats.get('m_total_tx_bytes')) + " / " + str(self.stats.global_stats.get_rel('m_total_tx_bytes'))
+ return self.raw_snapshot
+
+
+ def stop (self):
+ self.active = False
+ self.t.join()
+
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index 8231fe33..c180e0d1 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -13,6 +13,8 @@ from common.trex_stats import *
from common.trex_streams import *
from collections import namedtuple
+from trex_async_client import TrexAsyncClient
+
RpcCmdData = namedtuple('RpcCmdData', ['method', 'params'])
class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])):
@@ -27,10 +29,10 @@ class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg']
class CTRexStatelessClient(object):
"""docstring for CTRexStatelessClient"""
- def __init__(self, username, server="localhost", port=5050, virtual=False):
+ def __init__(self, username, server="localhost", sync_port=5050, async_port = 4500, virtual=False):
super(CTRexStatelessClient, self).__init__()
self.user = username
- self.comm_link = CTRexStatelessClient.CCommLink(server, port, virtual)
+ self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual)
self.verbose = False
self._conn_handler = {}
self._active_ports = set()
@@ -39,6 +41,9 @@ class CTRexStatelessClient(object):
self._server_version = None
self.__err_log = None
+ self._async_client = TrexAsyncClient(async_port)
+
+
# ----- decorator methods ----- #
def force_status(owned=True, active_and_owned=False):
def wrapper(func):
@@ -100,6 +105,12 @@ class CTRexStatelessClient(object):
return rc, err
return self._init_sync()
+ def get_stats_async (self):
+ return self._async_client.get_stats()
+
+ def get_connection_port (self):
+ return self.comm_link.port
+
def disconnect(self):
return self.comm_link.disconnect()
@@ -300,10 +311,10 @@ class CTRexStatelessClient(object):
self.transmit(command.method, command.params),
self.ack_success_test)
- def get_global_stats(self):
- command = RpcCmdData("get_global_stats", {})
- return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params))
- # return self.transmit("get_global_stats")
+# def get_global_stats(self):
+# command = RpcCmdData("get_global_stats", {})
+# return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params))
+# # return self.transmit("get_global_stats")
@force_status(owned=True, active_and_owned=True)
def get_port_stats(self, port_id=None):
diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py
index ec23eb0c..bd79cb42 100755
--- a/scripts/automation/trex_control_plane/console/trex_console.py
+++ b/scripts/automation/trex_control_plane/console/trex_console.py
@@ -34,7 +34,6 @@ from common.text_opts import *
from client_utils.general_utils import user_input
-from client_utils.jsonrpc_client import TrexStatelessClient
import trex_status
from collections import namedtuple
@@ -177,6 +176,7 @@ class TRexConsole(cmd.Cmd):
dotext = 'do_'+text
return [a[3:]+' ' for a in self.get_names() if a.startswith(dotext)]
+
# set verbose on / off
def do_verbose (self, line):
'''Shows or set verbose mode\n'''
@@ -770,6 +770,10 @@ def setParserOptions():
default = 5050,
type = int)
+ parser.add_argument("-z", "--pub", help = "TRex Async Publisher Port [default is 4500]\n",
+ default = 4500,
+ type = int)
+
parser.add_argument("-u", "--user", help = "User Name [default is random generated]\n",
default = 'user_' + ''.join(random.choice(string.digits) for _ in range(5)),
type = str)
@@ -782,11 +786,10 @@ def setParserOptions():
def main():
parser = setParserOptions()
- options = parser.parse_args()#sys.argv[1:])
+ options = parser.parse_args()
# Stateless client connection
- # stateless_client = TrexStatelessClient(options.server, options.port, options.user)
- stateless_client = CTRexStatelessClient(options.user, options.server, options.port)
+ stateless_client = CTRexStatelessClient(options.user, options.server, options.port, options.pub)
# console
try:
diff --git a/scripts/automation/trex_control_plane/console/trex_status.py b/scripts/automation/trex_control_plane/console/trex_status.py
index 2c5a648f..4cd07358 100644
--- a/scripts/automation/trex_control_plane/console/trex_status.py
+++ b/scripts/automation/trex_control_plane/console/trex_status.py
@@ -11,6 +11,8 @@ import datetime
g_curses_active = False
+################### utils #################
+
# simple percetange show
def percentage (a, total):
x = int ((float(a) / total) * 100)
@@ -18,15 +20,25 @@ def percentage (a, total):
# simple float to human readable
def float_to_human_readable (size, suffix = "bps"):
- for unit in ['','K','M','G']:
- if abs(size) < 1024.0:
- return "%3.1f %s%s" % (size, unit, suffix)
- size /= 1024.0
+ for unit in ['','K','M','G','T']:
+ if abs(size) < 1000.0:
+ return "%3.2f %s%s" % (size, unit, suffix)
+ size /= 1000.0
return "NaN"
+
+################### panels #################
+
# panel object
class TrexStatusPanel(object):
- def __init__ (self, h, l, y, x, headline):
+ def __init__ (self, h, l, y, x, headline, status_obj):
+
+ self.status_obj = status_obj
+
+ self.log = status_obj.log
+ self.stateless_client = status_obj.stateless_client
+ self.general_stats = status_obj.general_stats
+
self.h = h
self.l = l
self.y = y
@@ -53,64 +65,26 @@ class TrexStatusPanel(object):
return self.win
-# total stats (ports + global)
-class Stats():
- def __init__ (self, rpc_client, port_list, interval = 100):
-
- self.rpc_client = rpc_client
-
- self.port_list = port_list
- self.port_stats = {}
-
- self.interval = interval
- self.delay_count = 0
-
- def get_port_stats (self, port_id):
- if self.port_stats.get(port_id):
- return self.port_stats[port_id]
- else:
- return None
-
- def query_sync (self):
- self.delay_count += 1
- if self.delay_count < self.interval:
- return
-
- self.delay_count = 0
-
- # query global stats
-
- # query port stats
-
- rc, resp_list = self.rpc_client.get_port_stats(self.port_list)
- if not rc:
- return
-
- for i, rc in enumerate(resp_list):
- if rc[0]:
- self.port_stats[self.port_list[i]] = rc[1]
-
-
# various kinds of panels
# Server Info Panel
class ServerInfoPanel(TrexStatusPanel):
def __init__ (self, h, l, y, x, status_obj):
- super(ServerInfoPanel, self).__init__(h, l, y ,x ,"Server Info:")
-
- self.status_obj = status_obj
+ super(ServerInfoPanel, self).__init__(h, l, y ,x ,"Server Info:", status_obj)
def draw (self):
- if self.status_obj.server_version == None:
+ if not self.status_obj.server_version :
return
- self.clear()
+ if not self.status_obj.server_sys_info:
+ return
- connection_details = self.status_obj.rpc_client.get_connection_details()
- self.getwin().addstr(3, 2, "{:<30} {:30}".format("Server:",self.status_obj.server_sys_info["hostname"] + ":" + str(connection_details['port'])))
+ self.clear()
+
+ self.getwin().addstr(3, 2, "{:<30} {:30}".format("Server:",self.status_obj.server_sys_info["hostname"] + ":" + str(self.stateless_client.get_connection_port())))
self.getwin().addstr(4, 2, "{:<30} {:30}".format("Version:", self.status_obj.server_version["version"]))
self.getwin().addstr(5, 2, "{:<30} {:30}".format("Build:",
self.status_obj.server_version["build_date"] + " @ " +
@@ -123,7 +97,7 @@ class ServerInfoPanel(TrexStatusPanel):
self.getwin().addstr(9, 2, "{:<30} {:<30}".format("Ports Count:", self.status_obj.server_sys_info["port_count"]))
- ports_owned = " ".join(str(x) for x in self.status_obj.rpc_client.get_owned_ports())
+ ports_owned = " ".join(str(x) for x in self.status_obj.owned_ports)
if not ports_owned:
ports_owned = "None"
@@ -134,26 +108,39 @@ class ServerInfoPanel(TrexStatusPanel):
class GeneralInfoPanel(TrexStatusPanel):
def __init__ (self, h, l, y, x, status_obj):
- super(GeneralInfoPanel, self).__init__(h, l, y ,x ,"General Info:")
-
- self.status_obj = status_obj
+ super(GeneralInfoPanel, self).__init__(h, l, y ,x ,"General Info:", status_obj)
def draw (self):
- pass
+ self.clear()
+
+ self.getwin().addstr(3, 2, "{:<30} {:0.2f} %".format("CPU util.:", self.general_stats.get("m_cpu_util")))
+
+ self.getwin().addstr(5, 2, "{:<30} {:} / {:}".format("Total Tx. rate:",
+ float_to_human_readable(self.general_stats.get("m_tx_bps")),
+ float_to_human_readable(self.general_stats.get("m_tx_pps"), suffix = "pps")))
+
+ # missing RX field
+ #self.getwin().addstr(5, 2, "{:<30} {:} / {:}".format("Total Rx. rate:",
+ # float_to_human_readable(self.general_stats.get("m_rx_bps")),
+ # float_to_human_readable(self.general_stats.get("m_rx_pps"), suffix = "pps")))
+
+ self.getwin().addstr(7, 2, "{:<30} {:} / {:}".format("Total Tx:",
+ float_to_human_readable(self.general_stats.get_rel("m_total_tx_bytes"), suffix = "B"),
+ float_to_human_readable(self.general_stats.get_rel("m_total_tx_pkts"), suffix = "pkts")))
# all ports stats
class PortsStatsPanel(TrexStatusPanel):
def __init__ (self, h, l, y, x, status_obj):
- super(PortsStatsPanel, self).__init__(h, l, y ,x ,"Trex Ports:")
+ super(PortsStatsPanel, self).__init__(h, l, y ,x ,"Trex Ports:", status_obj)
- self.status_obj = status_obj
def draw (self):
self.clear()
+ return
- owned_ports = self.status_obj.rpc_client.get_owned_ports()
+ owned_ports = self.status_obj.owned_ports
if not owned_ports:
self.getwin().addstr(3, 2, "No Owned Ports - Please Acquire One Or More Ports")
return
@@ -193,33 +180,23 @@ class PortsStatsPanel(TrexStatusPanel):
class ControlPanel(TrexStatusPanel):
def __init__ (self, h, l, y, x, status_obj):
- super(ControlPanel, self).__init__(h, l, y, x, "")
+ super(ControlPanel, self).__init__(h, l, y, x, "", status_obj)
- self.status_obj = status_obj
def draw (self):
self.clear()
self.getwin().addstr(1, 2, "'g' - general, '0-{0}' - specific port, 'f' - freeze, 'c' - clear stats, 'p' - ping server, 'q' - quit"
- .format(self.status_obj.rpc_client.get_port_count() - 1))
+ .format(self.status_obj.stateless_client.get_port_count() - 1))
- index = 3
-
- cut = len(self.status_obj.log) - 4
- if cut < 0:
- cut = 0
-
- for l in self.status_obj.log[cut:]:
- self.getwin().addstr(index, 2, l)
- index += 1
+ self.log.draw(self.getwin(), 2, 3)
# specific ports panels
class SinglePortPanel(TrexStatusPanel):
def __init__ (self, h, l, y, x, status_obj, port_id):
- super(SinglePortPanel, self).__init__(h, l, y, x, "Port {0}".format(port_id))
+ super(SinglePortPanel, self).__init__(h, l, y, x, "Port {0}".format(port_id), status_obj)
- self.status_obj = status_obj
self.port_id = port_id
def draw (self):
@@ -227,7 +204,7 @@ class SinglePortPanel(TrexStatusPanel):
self.clear()
- if not self.port_id in self.status_obj.rpc_client.get_owned_ports():
+ if not self.port_id in self.status_obj.stateless_client.get_owned_ports():
self.getwin().addstr(y, 2, "Port {0} is not owned by you, please acquire the port for more info".format(self.port_id))
return
@@ -292,96 +269,119 @@ class SinglePortPanel(TrexStatusPanel):
y += 2
-# status object
-class TrexStatus():
- def __init__ (self, stdscr, rpc_client):
- self.stdscr = stdscr
+################### main objects #################
+
+# status log
+class TrexStatusLog():
+ def __init__ (self):
self.log = []
- self.rpc_client = rpc_client
- self.snapshot = self.rpc_client.snapshot()
+ def add_event (self, msg):
+ self.log.append("[{0}] {1}".format(str(datetime.datetime.now().time()), msg))
- # fetch server info
- self.get_server_info()
+ def draw (self, window, x, y, max_lines = 4):
+ index = y
+
+ cut = len(self.log) - max_lines
+ if cut < 0:
+ cut = 0
+
+ for msg in self.log[cut:]:
+ window.addstr(index, x, msg)
+ index += 1
+
+# status commands
+class TrexStatusCommands():
+ def __init__ (self, status_object):
+
+ self.status_object = status_object
- # create stats objects
- self.stats = Stats(rpc_client, self.rpc_client.get_owned_ports())
+ self.stateless_client = status_object.stateless_client
+ self.log = self.status_object.log
- # register actions
self.actions = {}
- self.actions[ord('q')] = self.action_quit
- self.actions[ord('p')] = self.action_ping
- self.actions[ord('f')] = self.action_freeze
+ self.actions[ord('q')] = self._quit
+ self.actions[ord('p')] = self._ping
+ self.actions[ord('f')] = self._freeze
- self.actions[ord('g')] = self.action_show_ports_stats
+ self.actions[ord('g')] = self._show_ports_stats
- for port_id in xrange(0, self.rpc_client.get_port_count()):
- self.actions[ord('0') + port_id] = self.action_show_port_generator(port_id)
+ # register all the available ports shortcuts
+ for port_id in xrange(0, self.stateless_client.get_port_count()):
+ self.actions[ord('0') + port_id] = self._show_port_generator(port_id)
+
+
+ # handle a key pressed
+ def handle (self, ch):
+ if ch in self.actions:
+ return self.actions[ch]()
+ else:
+ self.log.add_event("Unknown key pressed, please see legend")
+ return True
+
+ # show all ports
+ def _show_ports_stats (self):
+ self.log.add_event("Switching to all ports view")
+ self.status_object.stats_panel = self.status_object.ports_stats_panel
-
- # all ports stats
- def action_show_ports_stats (self):
- self.add_log_event("Switching to all ports view")
- self.stats_panel = self.ports_stats_panel
-
return True
- # function generator for different ports requests
- def action_show_port_generator (self, port_id):
- def action_show_port():
- self.add_log_event("Switching panel to port {0}".format(port_id))
- self.stats_panel = self.ports_panels[port_id]
+
+ # function generator for different ports requests
+ def _show_port_generator (self, port_id):
+ def _show_port():
+ self.log.add_event("Switching panel to port {0}".format(port_id))
+ self.status_object.stats_panel = self.status_object.ports_panels[port_id]
return True
- return action_show_port
+ return _show_port
- def action_freeze (self):
- self.update_active = not self.update_active
- self.add_log_event("Update continued" if self.update_active else "Update stopped")
+ def _freeze (self):
+ self.status_object.update_active = not self.status_object.update_active
+ self.log.add_event("Update continued" if self.status_object.update_active else "Update stopped")
return True
- def action_quit(self):
+ def _quit(self):
return False
- def action_ping (self):
- self.add_log_event("Pinging RPC server")
+ def _ping (self):
+ self.log.add_event("Pinging RPC server")
- rc, msg = self.rpc_client.ping_rpc_server()
+ rc, msg = self.stateless_client.ping()
if rc:
- self.add_log_event("Server replied: '{0}'".format(msg))
+ self.log.add_event("Server replied: '{0}'".format(msg))
else:
- self.add_log_event("Failed to get reply")
+ self.log.add_event("Failed to get reply")
return True
- def get_server_info (self):
-
- self.server_version = self.rpc_client.get_rpc_server_version()
- self.server_sys_info = self.rpc_client.get_system_info()
-
-
- def add_log_event (self, msg):
- self.log.append("[{0}] {1}".format(str(datetime.datetime.now().time()), msg))
-
- # control panel
- def update_control (self):
- self.control_panel.clear()
+# status object
+#
+#
+#
+class TrexStatus():
+ def __init__ (self, stdscr, stateless_client):
+ self.stdscr = stdscr
- self.control_panel.getwin().addstr(1, 2, "'g' - general, '0-{0}' - specific port, 'f' - freeze, 'c' - clear stats, 'p' - ping server, 'q' - quit"
- .format(self.rpc_client.get_port_count() - 1))
+ self.stateless_client = stateless_client
+ self.general_stats = stateless_client.get_stats_async().get_general_stats()
- index = 3
+ # fetch server info
+ rc, self.server_sys_info = self.stateless_client.get_system_info()
+ if not rc:
+ return
- cut = len(self.log) - 4
- if cut < 0:
- cut = 0
+ rc, self.server_version = self.stateless_client.get_version()
+ if not rc:
+ return
- for l in self.log[cut:]:
- self.control_panel.getwin().addstr(index, 2, l)
- index += 1
+ self.owned_ports = self.stateless_client.get_acquired_ports()
+ self.log = TrexStatusLog()
+ self.cmds = TrexStatusCommands(self)
+
def generate_layout (self):
self.max_y = self.stdscr.getmaxyx()[0]
self.max_x = self.stdscr.getmaxyx()[1]
@@ -394,7 +394,7 @@ class TrexStatus():
self.ports_stats_panel = PortsStatsPanel(int(self.max_y * 0.8), self.max_x / 2, 0, 0, self)
self.ports_panels = {}
- for i in xrange(0, self.rpc_client.get_port_count()):
+ for i in xrange(0, self.stateless_client.get_port_count()):
self.ports_panels[i] = SinglePortPanel(int(self.max_y * 0.8), self.max_x / 2, 0, 0, self, i)
# at start time we point to the main one
@@ -411,14 +411,8 @@ class TrexStatus():
# no key , continue
if ch == curses.ERR:
return True
-
- # check for registered function
- if ch in self.actions:
- return self.actions[ch]()
- else:
- self.add_log_event("Unknown key pressed, please see legend")
-
- return True
+
+ return self.cmds.handle(ch)
# main run entry point
def run (self):
@@ -454,14 +448,14 @@ class TrexStatus():
sleep(0.01)
-def show_trex_status_internal (stdscr, rpc_client):
- trex_status = TrexStatus(stdscr, rpc_client)
+def show_trex_status_internal (stdscr, stateless_client):
+ trex_status = TrexStatus(stdscr, stateless_client)
trex_status.run()
-def show_trex_status (rpc_client):
+def show_trex_status (stateless_client):
try:
- curses.wrapper(show_trex_status_internal, rpc_client)
+ curses.wrapper(show_trex_status_internal, stateless_client)
except KeyboardInterrupt:
curses.endwin()