summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/client/trex_stateless_client.py
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2016-01-20 11:09:25 -0500
committerimarom <imarom@cisco.com>2016-01-21 10:11:56 -0500
commitb726b5682fca2b1e032380401457d1afb47e1713 (patch)
treeb56576cd385cc662453095917d8c6d87ff4e62c8 /scripts/automation/trex_control_plane/client/trex_stateless_client.py
parentc93acc26bf2517c872da716198e76bcf566b836a (diff)
draft #3
Diffstat (limited to 'scripts/automation/trex_control_plane/client/trex_stateless_client.py')
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py409
1 files changed, 278 insertions, 131 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index 28e55088..69cc9838 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -51,6 +51,12 @@ class STLStateError(STLError):
self.msg = "Operation '{0}' is not valid while '{1}'".format(op, state)
+# port state error
+class STLPortStateError(STLError):
+ def __init__ (self, port, op, state):
+ self.msg = "Operation '{0}' on port '{1}' is not valid for state '{2}'".format(op, port, state)
+
+
# raised when argument is not valid for operation
class STLArgumentError(STLError):
def __init__ (self, name, got, valid_values = None, extended = None):
@@ -62,6 +68,12 @@ class STLArgumentError(STLError):
self.msg += "\n{0}".format(extended)
+class STLTimeoutError(STLError):
+ def __init__ (self, timeout):
+ self.msg = "Timeout: operation took more than '{0}' seconds".format(timeout)
+
+
+
############################ logger #############################
############################ #############################
############################ #############################
@@ -109,9 +121,38 @@ class LoggerApi(object):
def async_log (self, msg, level = VERBOSE_REGULAR, newline = True):
self.log(msg, level, newline)
- # annotates an action with a RC - writes to log the result
- def annotate (self, rc, desc = None, show_status = True):
- rc.annotate(self.log, desc, show_status)
+
+ def pre_cmd (self, desc):
+ self.log(format_text('\n{:<60}'.format(desc), 'bold'), newline = False)
+ self.flush()
+
+ def post_cmd (self, rc):
+ if rc:
+ self.log(format_text("[SUCCESS]\n", 'green', 'bold'))
+ else:
+ self.log(format_text("[FAILED]\n", 'red', 'bold'))
+
+
+ def log_cmd (self, desc):
+ self.pre_cmd(desc)
+ self.post_cmd(True)
+
+
+ # supress object getter
+ def supress (self):
+ class Supress(object):
+ def __init__ (self, logger):
+ self.logger = logger
+
+ def __enter__ (self):
+ self.saved_level = self.logger.get_verbose()
+ self.logger.set_verbose(LoggerApi.VERBOSE_QUIET)
+
+ def __exit__ (self, type, value, traceback):
+ self.logger.set_verbose(self.saved_level)
+
+ return Supress(self)
+
# default logger - to stdout
@@ -288,7 +329,7 @@ class AsyncEventHandler(object):
def __async_event_port_forced_acquired (self, port_id):
self.client.ports[port_id].async_event_forced_acquired()
- self.client.read_only = True
+
def __async_event_server_stopped (self):
self.client.connected = False
@@ -386,7 +427,7 @@ class CTRexStatelessClient(object):
server = "localhost",
sync_port = 4501,
async_port = 4500,
- verbose_level = LoggerApi.VERBOSE_REGULAR,
+ verbose_level = LoggerApi.VERBOSE_QUIET,
logger = None,
virtual = False):
@@ -398,7 +439,6 @@ class CTRexStatelessClient(object):
self.server_version = {}
self.system_info = {}
self.session_id = random.getrandbits(32)
- self.read_only = False
self.connected = False
# logger
@@ -634,8 +674,7 @@ class CTRexStatelessClient(object):
# connect to server
- # mode can be RW - read / write, RWF - read write with force , RO - read only
- def __connect(self, mode = "RW"):
+ def __connect(self):
# first disconnect if already connected
if self.is_connected():
@@ -645,12 +684,18 @@ class CTRexStatelessClient(object):
self.connected = False
# connect sync channel
+ self.logger.pre_cmd("connecting to RPC server on {0}:{1}".format(self.connection_info['server'], self.connection_info['sync_port']))
rc = self.comm_link.connect()
+ self.logger.post_cmd(rc)
+
if not rc:
return rc
# connect async channel
+ self.logger.pre_cmd("connecting to publisher server on {0}:{1}".format(self.connection_info['server'], self.connection_info['async_port']))
rc = self.async_client.connect()
+ self.logger.post_cmd(rc)
+
if not rc:
return rc
@@ -694,32 +739,6 @@ class CTRexStatelessClient(object):
if not rc:
return rc
- # acquire all ports
- if mode == "RW":
- rc = self.__acquire(force = False)
-
- # fallback to read only if failed
- if not rc:
- self.logger.annotate(rc, show_status = False)
- self.logger.log(format_text("Switching to read only mode - only few commands will be available", 'bold'))
-
- self.__release(self.get_acquired_ports())
- self.read_only = True
- else:
- self.read_only = False
-
- elif mode == "RWF":
- rc = self.__acquire(force = True)
- if not rc:
- return rc
- self.read_only = False
-
- elif mode == "RO":
- # no acquire on read only
- rc = RC_OK()
- self.read_only = True
-
-
self.connected = True
return RC_OK()
@@ -760,27 +779,36 @@ class CTRexStatelessClient(object):
return rc
+ self.logger.pre_cmd("Removing all streams from port(s) {0}:".format(port_id_list))
rc = self.__remove_all_streams(port_id_list)
- self.logger.annotate(rc, "Removing all streams from port(s) {0}:".format(port_id_list))
- if rc.bad():
+ self.logger.post_cmd(rc)
+
+ if not rc:
return rc
+ self.logger.pre_cmd("Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list))
rc = self.__add_stream_pack(stream_list, port_id_list)
- self.logger.annotate(rc, "Attaching {0} streams to port(s) {1}:".format(len(stream_list.compiled), port_id_list))
- if rc.bad():
+ self.logger.post_cmd(rc)
+
+ if not rc:
return rc
# when not on dry - start the traffic , otherwise validate only
if not dry:
+
+ self.logger.pre_cmd("Starting traffic on port(s) {0}:".format(port_id_list))
rc = self.__start_traffic(mult, duration, port_id_list)
- self.logger.annotate(rc,"Starting traffic on port(s) {0}:".format(port_id_list))
-
+ self.logger.post_cmd(rc)
+
return rc
else:
+
+ self.logger.pre_cmd("Validating traffic profile on port(s) {0}:".format(port_id_list))
rc = self.__validate(port_id_list)
- self.logger.annotate(rc,"Validating traffic profile on port(s) {0}:".format(port_id_list))
-
+ self.logger.post_cmd(rc)
+
+
if rc.bad():
return rc
@@ -801,8 +829,11 @@ class CTRexStatelessClient(object):
self.logger.log(format_text(msg, 'bold'))
return RC_WARN(msg)
+
+ self.logger.pre_cmd("Stopping traffic on port(s) {0}:".format(port_id_list))
rc = self.__stop_traffic(active_ports)
- self.logger.annotate(rc, "Stopping traffic on port(s) {0}:".format(port_id_list))
+ self.logger.post_cmd(rc)
+
if not rc:
return rc
@@ -819,8 +850,9 @@ class CTRexStatelessClient(object):
self.logger.log(format_text(msg, 'bold'))
return RC_WARN(msg)
+ self.logger.pre_cmd("Updating traffic on port(s) {0}:".format(port_id_list))
rc = self.__update_traffic(mult, active_ports)
- self.logger.annotate(rc, "Updating traffic on port(s) {0}:".format(port_id_list))
+ self.logger.post_cmd(rc)
return rc
@@ -836,8 +868,10 @@ class CTRexStatelessClient(object):
self.logger.log(format_text(msg, 'bold'))
return RC_WARN(msg)
+ self.logger.pre_cmd("Pausing traffic on port(s) {0}:".format(port_id_list))
rc = self.__pause_traffic(active_ports)
- self.logger.annotate(rc, "Pausing traffic on port(s) {0}:".format(port_id_list))
+ self.logger.post_cmd(rc)
+
return rc
@@ -852,24 +886,51 @@ class CTRexStatelessClient(object):
self.logger.log(format_text(msg, 'bold'))
return RC_WARN(msg)
+ self.logger.pre_cmd("Resume traffic on port(s) {0}:".format(port_id_list))
rc = self.__resume_traffic(active_ports)
- self.logger.annotate(rc, "Resume traffic on port(s) {0}:".format(port_id_list))
+ self.logger.post_cmd(rc)
+
return rc
# clear stats
- def __clear_stats(self, port_id_list):
+ def __clear_stats(self, port_id_list, clear_global):
for port_id in port_id_list:
self.ports[port_id].clear_stats()
- self.global_stats.clear_stats()
+ if clear_global:
+ self.global_stats.clear_stats()
+ self.logger.pre_cmd("clearing stats on port(s) {0}:".format(port_id_list))
rc = RC_OK()
- self.logger.annotate(rc, "clearing stats on port(s) {0}:".format(port_id_list))
+ self.logger.post_cmd(rc)
+
return RC
+ # get stats
+ def __get_stats (self, port_id_list):
+ stats = {}
+
+ stats['global'] = self.global_stats.get_stats()
+
+ total = {}
+ for port_id in port_id_list:
+ port_stats = self.ports[port_id].get_stats()
+ stats["port {0}".format(port_id)] = port_stats
+
+ for k, v in port_stats.iteritems():
+ if not k in total:
+ total[k] = v
+ else:
+ total[k] += v
+
+ stats['total'] = total
+
+ return stats
+
+
def __process_profiles (self, profiles, out):
for profile in (profiles if isinstance(profiles, list) else [profiles]):
@@ -883,7 +944,6 @@ class CTRexStatelessClient(object):
stream_list = self.streams_db.load_yaml_file(profile)
except Exception as e:
rc = RC_ERR(str(e))
- self.logger.annotate(rc)
return rc
out.append(stream_list)
@@ -895,31 +955,6 @@ class CTRexStatelessClient(object):
return RC_OK()
-
- # stream list
- if opts.db:
- stream_list = self.streams_db.get_stream_pack(opts.db)
- rc = RC(stream_list != None)
- self.logger.annotate(rc,"Load stream pack (from DB):")
- if rc.bad():
- return RC_ERR("Failed to load stream pack")
-
- else:
- # load streams from file
- stream_list = None
- try:
- stream_list = self.streams_db.load_yaml_file(opts.file[0])
- except Exception as e:
- s = str(e)
- rc=RC_ERR(s)
- self.logger.annotate(rc)
- return rc
-
- rc = RC(stream_list != None)
- self.logger.annotate(rc,"Load stream pack (from file):")
- if stream_list == None:
- return RC_ERR("Failed to load stream pack")
-
############ functions used by other classes but not users ##############
def _verify_port_id_list (self, port_id_list):
@@ -938,7 +973,8 @@ class CTRexStatelessClient(object):
return False
# check each item of the sequence
- return port_id_list and all([port_id in self.get_all_ports() for port_id in port_id_list])
+ return (port_id_list and all([port_id in self.get_all_ports() for port_id in port_id_list]))
+
# transmit request on the RPC link
@@ -986,8 +1022,10 @@ class CTRexStatelessClient(object):
# validate port(s) profile
def cmd_validate (self, port_id_list):
+ self.logger.pre_cmd("Validating streams on port(s) {0}:".format(port_id_list))
rc = self.__validate(port_id_list)
- self.logger.annotate(rc,"Validating streams on port(s) {0}:".format(port_id_list))
+ self.logger.post_cmd(rc)
+
return rc
@@ -1025,17 +1063,22 @@ class CTRexStatelessClient(object):
return {k: v for k, v in namespace.__dict__.items() if k in ok_values}
- # verify decorator - throws exception is client is disconnected
- def __verify_connected(f):
- def wrap(*args, **kwargs):
- inst = args[0]
- func_name = f.__name__
+ # API decorator - double wrap because of argument
+ def __api_check(connected = True):
+
+ def wrap (f):
+ def wrap2(*args, **kwargs):
+ client = args[0]
- if not inst.is_connected():
- raise STLStateError(func_name, 'disconnected')
+ func_name = f.__name__
- ret = f(*args, **kwargs)
- return ret
+ # check connection
+ if connected and not client.is_connected():
+ raise STLStateError(func_name, 'disconnected')
+
+ ret = f(*args, **kwargs)
+ return ret
+ return wrap2
return wrap
@@ -1044,7 +1087,14 @@ class CTRexStatelessClient(object):
############################ API #############################
############################ #############################
############################ #############################
+ def __enter__ (self):
+ self.connect(mode = "RWF")
+ return self
+ def __exit__ (self, type, value, traceback):
+ if self.get_active_ports():
+ self.stop(self.get_active_ports())
+ self.disconnect()
############################ Getters #############################
############################ #############################
@@ -1056,8 +1106,8 @@ class CTRexStatelessClient(object):
return self.logger.get_verbose()
# is the client on read only mode ?
- def is_read_only (self):
- return self.read_only
+ def is_all_ports_acquired (self):
+ return not (self.get_all_ports() == self.get_acquired_ports())
# is the client connected ?
def is_connected (self):
@@ -1088,9 +1138,9 @@ class CTRexStatelessClient(object):
# returns the port object
def get_port (self, port_id):
- port_id = self.ports.get(port_id, None)
- if (port_id != None):
- return port_id
+ port = self.ports.get(port_id, None)
+ if (port != None):
+ return port
else:
raise STLArgumentError('port id', port_id, valid_values = self.get_all_ports())
@@ -1139,26 +1189,66 @@ class CTRexStatelessClient(object):
# 'RO' - read only
# 'RW' - read/write
# 'RWF' - read write forced (take ownership)
+ @__api_check(False)
def connect (self, mode = "RW"):
modes = ['RO', 'RW', 'RWF']
if not mode in modes:
raise STLArgumentError('mode', mode, modes)
- rc = self.__connect(mode)
- self.logger.annotate(rc)
+ rc = self.__connect()
+ if not rc:
+ raise STLError(rc)
+
+ # acquire all ports for 'RW' or 'RWF'
+ if (mode == "RW") or (mode == "RWF"):
+ self.acquire(ports = self.get_all_ports(), force = True if mode == "RWF" else False)
+
+
+
+
+ # acquire ports
+ # this is not needed if connect was called with "RW" or "RWF"
+ # but for "RO" this might be needed
+ @__api_check(True)
+ def acquire (self, ports = None, force = False):
+ # by default use all ports
+ if ports == None:
+ ports = self.get_all_ports()
+
+ # verify ports
+ rc = self._validate_port_list(ports)
+ if not rc:
+ raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
+
+ # verify valid port id list
+ if force:
+ self.logger.pre_cmd("Force acquiring ports {0}:".format(ports))
+ else:
+ self.logger.pre_cmd("Acquiring ports {0}:".format(ports))
+
+ rc = self.__acquire(ports, force)
+
+ self.logger.post_cmd(rc)
if not rc:
+ self.__release(ports)
raise STLError(rc)
- return rc
+
+
+ # force connect syntatic sugar
+ @__api_check(False)
+ def fconnect (self):
+ self.connect(mode = "RWF")
# disconnects from the server
- def disconnect (self, annotate = True):
+ @__api_check(False)
+ def disconnect (self, log = True):
rc = self.__disconnect()
- if annotate:
- self.logger.annotate(rc, "Disconnecting from server at '{0}':'{1}'".format(self.connection_info['server'],
- self.connection_info['sync_port']))
+ if log:
+ self.logger.log_cmd("Disconnecting from server at '{0}':'{1}'".format(self.connection_info['server'],
+ self.connection_info['sync_port']))
if not rc:
raise STLError(rc)
@@ -1166,29 +1256,27 @@ class CTRexStatelessClient(object):
# teardown - call after test is done
+ # NEVER throws an exception
+ @__api_check(False)
def teardown (self, stop_traffic = True):
- # stop traffic
- if stop_traffic:
- rc = self.stop()
- if not rc:
- raise STLError(rc)
+ # try to stop traffic
+ if stop_traffic and self.get_active_ports():
+ try:
+ self.stop()
+ except STLError:
+ pass
# disconnect
- rc = self.__disconnect()
- if not rc:
- raise STLError(rc)
+ self.__disconnect()
-
-
- return rc
# pings the server on the RPC channel
- @__verify_connected
+ @__api_check(True)
def ping(self):
rc = self.__ping()
- self.logger.annotate(rc, "Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'],
+ self.logger.pre_cmd( "Pinging the server on '{0}' port '{1}': ".format(self.connection_info['server'],
self.connection_info['sync_port']))
if not rc:
@@ -1199,25 +1287,31 @@ class CTRexStatelessClient(object):
# reset the server by performing
# force acquire, stop, and remove all streams
- @__verify_connected
+ @__api_check(True)
def reset(self):
+ self.logger.pre_cmd("Force acquiring all ports:")
rc = self.__acquire(force = True)
- self.logger.annotate(rc, "Force acquiring all ports:")
+ self.logger.post_cmd(rc)
+
if not rc:
raise STLError(rc)
# force stop all ports
+ self.logger.pre_cmd("Stop traffic on all ports:")
rc = self.__stop_traffic(self.get_all_ports(), True)
- self.logger.annotate(rc,"Stop traffic on all ports:")
+ self.logger.post_cmd(rc)
+
if not rc:
raise STLError(rc)
# remove all streams
+ self.logger.pre_cmd("Removing all streams from all ports:")
rc = self.__remove_all_streams(self.get_all_ports())
- self.logger.annotate(rc,"Removing all streams from all ports:")
+ self.logger.post_cmd(rc)
+
if not rc:
raise STLError(rc)
@@ -1226,7 +1320,7 @@ class CTRexStatelessClient(object):
# start cmd
- @__verify_connected
+ @__api_check(True)
def start (self,
profiles,
ports = None,
@@ -1246,7 +1340,6 @@ class CTRexStatelessClient(object):
if not rc:
raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
-
# verify multiplier
mult_obj = parsing_opts.decode_multiplier(mult,
allow_update = False,
@@ -1272,21 +1365,20 @@ class CTRexStatelessClient(object):
if not rc:
raise STLError(rc)
-
# dry run
if dry:
self.logger.log(format_text("\n*** DRY RUN ***", 'bold'))
# call private method to start
+
rc = self.__start(ports, stream_list[0], mult_obj, force, duration, dry)
if not rc:
raise STLError(rc)
- return rc
# stop traffic on ports
- @__verify_connected
+ @__api_check(True)
def stop (self, ports = None):
# by default use all ports
if ports == None:
@@ -1301,11 +1393,10 @@ class CTRexStatelessClient(object):
if not rc:
raise STLError(rc)
- return rc
# update traffic
- @__verify_connected
+ @__api_check(True)
def update (self, ports = None, mult = "1", total = False):
# by default use all ports
@@ -1339,7 +1430,7 @@ class CTRexStatelessClient(object):
# pause traffic on ports
- @__verify_connected
+ @__api_check(True)
def pause (self, ports = None):
# by default use all ports
if ports == None:
@@ -1358,7 +1449,7 @@ class CTRexStatelessClient(object):
# resume traffic on ports
- @__verify_connected
+ @__api_check(True)
def resume (self, ports = None):
# by default use all ports
if ports == None:
@@ -1377,7 +1468,8 @@ class CTRexStatelessClient(object):
# clear stats
- def clear_stats (self, ports = None):
+ @__api_check(False)
+ def clear_stats (self, ports = None, clear_global = True):
# by default use all ports
if ports == None:
ports = self.get_all_ports()
@@ -1387,11 +1479,64 @@ class CTRexStatelessClient(object):
if not rc:
raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
- rc = self.__clear_stats(ports)
+ # verify clear global
+ if not type(clear_global) is bool:
+ raise STLArgumentError('clear_global', clear_global)
+
+
+ rc = self.__clear_stats(ports, clear_global)
if not rc:
raise STLError(rc)
+ # get stats
+ @__api_check(False)
+ def get_stats (self, ports = None, async_barrier = True):
+ # by default use all ports
+ if ports == None:
+ ports = self.get_all_ports()
+
+ # verify valid port id list
+ rc = self._validate_port_list(ports)
+ if not rc:
+ raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
+
+ # check async barrier
+ if not type(async_barrier) is bool:
+ raise STLArgumentError('async_barrier', async_barrier)
+
+
+ # if the user requested a barrier - use it
+ if async_barrier:
+ rc = self.async_client.barrier()
+ if not rc:
+ raise STLError(rc)
+
+ return self.__get_stats(ports)
+
+
+ # wait while traffic is on, on timeout throw STLTimeoutError
+ @__api_check(True)
+ def wait_on_traffic (self, ports = None, timeout = 60):
+ # by default use all ports
+ if ports == None:
+ ports = self.get_all_ports()
+
+ # verify valid port id list
+ rc = self._validate_port_list(ports)
+ if not rc:
+ raise STLArgumentError('ports', ports, valid_values = self.get_all_ports())
+
+ expr = time.time() + timeout
+
+ # wait while any of the required ports are active
+ while set(self.get_active_ports()).intersection(ports):
+ time.sleep(0.01)
+ if time.time() > expr:
+ raise STLTimeoutError(timeout)
+
+
+
############################ Line #############################
############################ Commands #############################
@@ -1406,7 +1551,7 @@ class CTRexStatelessClient(object):
try:
rc = f(*args)
except STLError as e:
- client.logger.log(format_text("\n" + e.brief() + "\n", 'bold'))
+ client.logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold'))
return
# don't want to print on error
@@ -1567,19 +1712,19 @@ class CTRexStatelessClient(object):
@__console
- def show_stats_line (self, line):
+ def print_formatted_stats_line (self, line):
'''Fetch statistics from TRex server by port\n'''
# define a parser
parser = parsing_opts.gen_parser(self,
"stats",
- self.cmd_stats_line.__doc__,
+ self.show_stats_line.__doc__,
parsing_opts.PORT_LIST_WITH_ALL,
parsing_opts.STATS_MASK)
opts = parser.parse_args(line.split())
if opts is None:
- return RC_ERR("bad command line parameters")
+ return None
# determine stats mask
mask = self.__get_mask_keys(**self.__filter_namespace_args(opts, trex_stats.ALL_STATS_OPTS))
@@ -1587,7 +1732,9 @@ class CTRexStatelessClient(object):
# set to show all stats if no filter was given
mask = trex_stats.ALL_STATS_OPTS
- stats = self.cmd_stats(opts.ports, mask)
+
+ self.print_formatted_stats()
+ stats = self.get_stats(opts.ports, mask)
# print stats to screen
for stat_type, stat_data in stats.iteritems():