summaryrefslogtreecommitdiffstats
path: root/scripts/automation
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2015-11-12 18:28:21 +0200
committerimarom <imarom@cisco.com>2015-11-12 18:28:21 +0200
commit513581840e5787e73161de049aa59552f23e719d (patch)
treeebb128f76c53e353c46b0af3f0a183280257a863 /scripts/automation
parent78c6593c5a2d3d2242be7fc659d15eac6b869358 (diff)
modifying stateless client to a simpler lightweight module
Diffstat (limited to 'scripts/automation')
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py406
-rwxr-xr-xscripts/automation/trex_control_plane/console/parsing_opts.py4
-rwxr-xr-xscripts/automation/trex_control_plane/console/trex_console.py6
3 files changed, 144 insertions, 272 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index 5a7b1873..93b36f82 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -12,6 +12,7 @@ import json
from common.trex_stats import *
from common.trex_streams import *
from collections import namedtuple
+from common.text_opts import *
from trex_async_client import CTRexAsyncClient
@@ -27,50 +28,48 @@ class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg']
# simple class to represent complex return value
class RC:
- def __init__ (self, rc, data):
- self.rc = rc
- self.data = data
+ def __init__ (self, rc = None, data = None):
+ self.rc_list = []
+
+ if (rc != None) and (data != None):
+ tuple_rc = namedtuple('RC', ['rc', 'data'])
+ self.rc_list.append(tuple_rc(rc, data))
+
+ def add (self, rc):
+ self.rc_list += rc.rc_list
def good (self):
- return self.rc
+ return all([x.rc for x in self.rc_list])
def bad (self):
- return not self.rc
+ return not self.good()
def data (self):
- if self.good():
- return self.data
- else:
- return ""
+ return all([x.data if x.rc else "" for x in self.rc_list])
def err (self):
- if self.bad():
- return self.data
- else:
- return ""
+ return all([x.data if not x.rc else "" for x in self.rc_list])
-RC_OK = RC(True, "")
-def RC_ERR (err):
- return RC(False, err)
+ def annotate (self, desc):
+ print format_text('\n{:<40}'.format(desc), 'bold'),
-class RC_LIST:
- def __init__ (self):
- self.rc_list = []
+ if self.bad():
+ # print all the errors
+ for x in self.rc_list:
+ if not x.rc:
+ print format_text("\n{0}".format(x.data), 'bold')
- def add (self, rc):
- self.rc_list.append(rc)
+ print format_text("[FAILED]\n", 'red', 'bold')
- def good(self):
- return all([x.good() for x in self.rc_list])
- def bad (self):
- not self.good()
+ else:
+ print format_text("[SUCCESS]\n", 'green', 'bold')
- def data (self):
- return [x.data() for x in self.rc_list]
- def err (self):
- return [x.err() for x in self.rc_list]
+def RC_OK():
+ return RC(True, "")
+def RC_ERR (err):
+ return RC(False, err)
# describes a single port
@@ -104,7 +103,7 @@ class Port:
rc = self.transmit(command.method, command.params)
if rc.success:
self.handler = rc.data
- return RC_OK
+ return RC_OK()
else:
return RC_ERR(rc.data)
@@ -118,7 +117,7 @@ class Port:
rc = self.transmit(command.method, command.params)
if rc.success:
self.handler = rc.data
- return RC_OK
+ return RC_OK()
else:
return RC_ERR(rc.data)
@@ -145,13 +144,13 @@ class Port:
else:
raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, sync_data['state']))
- return RC_OK
+ return RC_OK()
# return TRUE if write commands
def is_port_writeable (self):
# operations on port can be done on state idle or state sreams
- return ((self.state == STATE_IDLE) or (self.state == STATE_STREAMS))
+ return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS))
# add stream to the port
def add_stream (self, stream_id, stream_obj):
@@ -163,11 +162,12 @@ class Port:
params = {"handler": self.handler,
"port_id": self.port_id,
"stream_id": stream_id,
- "stream": stream_obj.dump()}
+ "stream": stream_obj}
rc, data = self.transmit("add_stream", params)
if not rc:
- return self.err(data)
+ r = self.err(data)
+ print r.good()
# add the stream
self.streams[stream_id] = stream_obj
@@ -175,7 +175,7 @@ class Port:
# the only valid state now
self.state = self.STATE_STREAMS
- return RC_OK
+ return RC_OK()
# remove stream from port
def remove_stream (self, stream_id):
@@ -194,16 +194,21 @@ class Port:
self.streams[stream_id] = None
- return RC_OK
+ return RC_OK()
# remove all the streams
def remove_all_streams (self):
- for stream_id in self.streams.keys():
- rc = self.remove_stream(stream_id)
- if rc.bad():
- return rc
- return RC_OK
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc, data = self.transmit("remove_all_streams", params)
+ if not rc:
+ return self.err(data)
+
+ self.streams = {}
+
+ return RC_OK()
# start traffic
def start (self, mul):
@@ -220,17 +225,20 @@ class Port:
"port_id": self.port_id,
"mul": mul}
- rc, data = self.transmit("remove_stream", params)
+ rc, data = self.transmit("start_traffic", params)
if not rc:
return self.err(data)
self.state = self.STATE_TX
- return RC_OK
+ return RC_OK()
+
+ # stop traffic
+ # with force ignores the cached state and sends the command
+ def stop (self, force = False):
- def stop (self):
- if (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE):
- return self.err("Unable to stop traffic - port is down")
+ if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE):
+ return self.err("port is not transmitting")
params = {"handler": self.handler,
"port_id": self.port_id}
@@ -240,9 +248,10 @@ class Port:
return self.err(data)
# only valid state after stop
- self.state = self.STREAMS
+ self.state = self.STATE_STREAMS
+
+ return RC_OK()
- return RC_OK
class CTRexStatelessClient(object):
@@ -265,7 +274,7 @@ class CTRexStatelessClient(object):
############# helper functions section ##############
- def __validate_port_list(self, port_id):
+ def validate_port_list(self, port_id):
if isinstance(port_id, list) or isinstance(port_id, set):
# check each item of the sequence
return all([self._is_ports_valid(port)
@@ -275,15 +284,25 @@ class CTRexStatelessClient(object):
else:
return False
+ # some preprocessing for port argument
def __ports (self, port_id_list):
+
+ # none means all
if port_id_list == None:
return range(0, self.get_port_count())
+ # always list
+ if isinstance(port_id_list, int):
+ port_id_list = [port_id_list]
+
+ if not isinstance(port_id_list, list):
+ raise ValueError("bad port id list: {0}".format(port_id_list))
+
for port_id in port_id_list:
if not isinstance(port_id, int) or (port_id < 0) or (port_id > self.get_port_count()):
raise ValueError("bad port id {0}".format(port_id))
- return [port_id_list] if isinstance(port_id_list, int) else port_id_list
+ return port_id_list
############ boot up section ################
@@ -297,7 +316,6 @@ class CTRexStatelessClient(object):
if not rc:
return RC_ERR(data)
-
# cache system info
rc, data = self.transmit("get_system_info")
if not rc:
@@ -328,7 +346,7 @@ class CTRexStatelessClient(object):
self.connected = True
- return RC_OK
+ return RC_OK()
def is_connected (self):
return self.connected
@@ -367,11 +385,11 @@ class CTRexStatelessClient(object):
return self.comm_link.port
def get_acquired_ports(self):
- return [port for port in self.ports if port.is_acquired()]
+ return [port.port_id for port in self.ports if port.is_acquired()]
def get_active_ports(self):
- return [port for port in self.ports if port.is_active()]
+ return [port.port_id for port in self.ports if port.is_active()]
def set_verbose(self, mode):
self.comm_link.set_verbose(mode)
@@ -396,7 +414,7 @@ class CTRexStatelessClient(object):
if rc.bad():
return rc
- return RC_OK
+ return RC_OK()
@@ -405,78 +423,71 @@ class CTRexStatelessClient(object):
def acquire (self, port_id_list = None, force = False):
port_id_list = self.__ports(port_id_list)
- rc_list = RC_LIST()
+ rc = RC()
for port_id in port_id_list:
- rc = self.ports[port_id].acquire(force)
- rc_list.add(rc)
-
- return rc_list
+ rc.add(self.ports[port_id].acquire(force))
+
+ return rc
# release ports
def release (self, port_id_list = None):
port_id_list = self.__ports(port_id_list)
- rc_list = RC_LIST()
+ rc = RC()
for port_id in port_id_list:
- rc, msg = self.ports[port_id].release(force)
- rc_list.add(rc)
+ rc.add(self.ports[port_id].release(force))
- return rc_list
+ return rc
def add_stream(self, stream_id, stream_obj, port_id_list = None):
- assert isinstance(stream_obj, CStream)
port_id_list = self.__ports(port_id_list)
- rc_list = RC_LIST()
+ rc = RC()
for port_id in port_id_list:
- rc = self.ports[port_id].add_stream(stream_id, stream_obj)
- rc_list.add(rc)
+ rc.add(self.ports[port_id].add_stream(stream_id, stream_obj))
- return rc_list
+ return rc
def add_stream_pack(self, stream_pack_list, port_id_list = None):
port_id_list = self.__ports(port_id_list)
- rc_list = RC_LIST()
+ rc = RC()
for stream_pack in stream_pack_list:
- rc = self.add_stream(stream_pack.stream_id, stream_pack.stream, port_id_list)
- rc_list.add(rc)
+ rc.add(self.add_stream(stream_pack.stream_id, stream_pack.stream, port_id_list))
- return rc_list
+ return rc
def remove_stream(self, stream_id, port_id_list = None):
port_id_list = self.__ports(port_id_list)
- rc_list = RC_LIST()
+ rc = RC()
for port_id in port_id_list:
- rc = self.ports[port_id].remove_stream(stream_id)
- rc_list.add(rc)
+ rc.add(self.ports[port_id].remove_stream(stream_id))
- return rc_list
+ return rc
def remove_all_streams(self, port_id_list = None):
port_id_list = self.__ports(port_id_list)
- rc_list = RC_LIST()
+ rc = RC()
for port_id in port_id_list:
- rc = self.ports[port_id].remove_all_streams()
- rc_list.add(rc)
+ rc.add(self.ports[port_id].remove_all_streams())
- return rc_list
+ return rc
def get_stream(self, stream_id, port_id, get_pkt = False):
@@ -498,85 +509,36 @@ class CTRexStatelessClient(object):
port_id_list = self.__ports(port_id_list)
- rc_list = RC_LIST()
+ rc = RC()
for port_id in port_id_list:
- rc = self.ports[port_id].start(multiplier)
- rc_list.add(rc)
+ rc.add(self.ports[port_id].start(multiplier))
- return rc_list
+ return rc
- def stop_traffic (self, port_id_list = None):
+ def stop_traffic (self, port_id_list = None, force = False):
port_id_list = self.__ports(port_id_list)
-
- rc_list = RC_LIST()
+ rc = RC()
for port_id in port_id_list:
- rc = self.ports[port_id].stop()
- rc_list.add(rc)
+ rc.add(self.ports[port_id].stop(force))
- return rc_list
+ return rc
def get_port_stats(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- self._process_batch_result(commands, resp_list, self._handle_get_port_stats_response)
- else:
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- command = RpcCmdData("get_port_stats", params)
- return self._handle_get_port_stats_response(command, self.transmit(command.method, command.params))
+ pass
def get_stream_stats(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- self._process_batch_result(commands, resp_list, self._handle_get_stream_stats_response)
- else:
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- command = RpcCmdData("get_stream_stats", params)
- return self._handle_get_stream_stats_response(command, self.transmit(command.method, command.params))
-
+ pass
def transmit(self, method_name, params={}):
return self.comm_link.transmit(method_name, params)
- def transmit_batch(self, batch_list):
- return self.comm_link.transmit_batch(batch_list)
-
-
- @staticmethod
- def default_success_test(result_obj):
- if result_obj.success:
- return True
- else:
- return False
-
- @staticmethod
- def ack_success_test(result_obj):
- if result_obj.success and result_obj.data == "ACK":
- return True
- else:
- return False
######################### Console (high level) API #########################
@@ -585,181 +547,91 @@ class CTRexStatelessClient(object):
# acquire, stop, remove streams and clear stats
#
#
- def cmd_reset (self, annotate_func):
+ def cmd_reset (self):
# sync with the server
rc = self.sync_with_server()
- annotate_func("Syncing with the server:", rc.good(), rc.err())
+ rc.annotate("Syncing with the server:")
if rc.bad():
return rc
- # force acquire all ports
rc = self.acquire(force = True)
- annotate_func("Force acquiring all ports:", rc.good(), rc.err())
+ rc.annotate("Force acquiring all ports:")
if rc.bad():
return rc
# force stop all ports
- port_id_list = self.get_active_ports()
- rc = self.stop_traffic(port_id_list)
- annotate_func("Stop traffic on all ports:", rc.good(), rc.err())
+ rc = self.stop_traffic(self.get_port_ids(), True)
+ rc.annotate("Stop traffic on all ports:")
if rc.bad():
return rc
- return
# remove all streams
- rc = self.remove_all_streams(ports)
- annotate_func("Removing all streams from all ports:", rc.good(), rc.err())
+ rc = self.remove_all_streams(self.get_port_ids())
+ rc.annotate("Removing all streams from all ports:")
if rc.bad():
return rc
# TODO: clear stats
- return RC_OK
+ return RC_OK()
# stop cmd
- def cmd_stop (self, ports, annotate_func):
+ def cmd_stop (self, port_id_list):
# find the relveant ports
- active_ports = set(self.get_active_ports()).intersection(ports)
+ active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
+
if not active_ports:
- annotate_func("No active traffic on porvided ports")
+ print format_text("No active traffic on porvided ports", 'bold')
return True
- rc, log = self.stop_traffic(active_ports)
- annotate_func("Stopping traffic on ports {0}:".format([port for port in active_ports]), rc, log)
- if not rc:
+ rc = self.stop_traffic(active_ports)
+ rc.annotate("Stopping traffic on port(s) {0}:".format(port_id_list))
+ if rc.bad():
return False
return True
# start cmd
- def cmd_start (self, ports, stream_list, mult, force, annotate_func):
+ def cmd_start (self, port_id_list, stream_list, mult, force):
- if (force and set(self.get_active_ports()).intersection(ports)):
- rc = self.cmd_stop(ports, annotate_func)
- if not rc:
- return False
+ active_ports = list(set(self.get_active_ports()).intersection(port_id_list))
- rc, log = self.remove_all_streams(ports)
- annotate_func("Removing all streams from ports {0}:".format([port for port in ports]), rc, log,
- "Please either retry with --force or stop traffic")
- if not rc:
- return False
+ if active_ports:
+ if not force:
+ print format_text("Port(s) {0} are active - please stop them or add '--force'".format(active_ports), 'bold')
+ return False
+ else:
+ rc = self.cmd_stop(active_ports)
+ if not rc:
+ return False
- rc, log = self.add_stream_pack(stream_list.compiled, port_id= ports)
- annotate_func("Attaching streams to port {0}:".format([port for port in ports]), rc, log)
- if not rc:
- return False
- # finally, start the traffic
- rc, log = self.start_traffic(mult, ports)
- annotate_func("Starting traffic on ports {0}:".format([port for port in ports]), rc, log)
- if not rc:
+ rc = self.remove_all_streams(port_id_list)
+ rc.annotate("Removing all streams from ports {0}:".format(port_id_list))
+ if rc.bad():
return False
- return True
-
- # ----- handler internal methods ----- #
- def _handle_general_response(self, request, response, msg, success_test=None):
- port_id = request.params.get("port_id")
- if not success_test:
- success_test = self.default_success_test
- if success_test(response):
- self._conn_handler[port_id] = response.data
- return RpcResponseStatus(True, port_id, msg)
- else:
- return RpcResponseStatus(False, port_id, response.data)
-
- def _handle_acquire_response(self, request, response, success_test):
- port_id = request.params.get("port_id")
- if success_test(response):
- self._conn_handler[port_id] = response.data
- return RpcResponseStatus(True, port_id, "Acquired")
- else:
- return RpcResponseStatus(False, port_id, response.data)
-
- def _handle_add_stream_response(self, request, response, success_test):
- port_id = request.params.get("port_id")
- stream_id = request.params.get("stream_id")
- if success_test(response):
- return RpcResponseStatus(True, port_id, "Stream {0} added".format(stream_id))
- else:
- return RpcResponseStatus(False, port_id, response.data)
-
- def _handle_remove_streams_response(self, request, response, success_test):
- port_id = request.params.get("port_id")
- if success_test(response):
- return RpcResponseStatus(True, port_id, "Removed")
- else:
- return RpcResponseStatus(False, port_id, response.data)
-
- def _handle_release_response(self, request, response, success_test):
- port_id = request.params.get("port_id")
- if success_test(response):
- del self._conn_handler[port_id]
- return RpcResponseStatus(True, port_id, "Released")
- else:
- return RpcResponseStatus(False, port_id, response.data)
-
- def _handle_start_traffic_response(self, request, response, success_test):
- port_id = request.params.get("port_id")
- if success_test(response):
- self._active_ports.add(port_id)
- return RpcResponseStatus(True, port_id, "Traffic started")
- else:
- return RpcResponseStatus(False, port_id, response.data)
-
- def _handle_stop_traffic_response(self, request, response, success_test):
- port_id = request.params.get("port_id")
- if success_test(response):
- if port_id in self._active_ports:
- self._active_ports.remove(port_id)
- return RpcResponseStatus(True, port_id, "Traffic stopped")
- else:
- return RpcResponseStatus(False, port_id, response.data)
-
- def _handle_get_global_stats_response(self, request, response, success_test):
- if response.success:
- return CGlobalStats(**response.success)
- else:
+ rc = self.add_stream_pack(stream_list.compiled, port_id_list)
+ rc.annotate("Attaching streams to port {0}:".format(port_id_list))
+ if rc.bad():
return False
- def _handle_get_port_stats_response(self, request, response, success_test):
- if response.success:
- return CPortStats(**response.success)
- else:
- return False
- def _handle_get_stream_stats_response(self, request, response, success_test):
- if response.success:
- return CStreamStats(**response.success)
- else:
+ # finally, start the traffic
+ rc = self.start_traffic(mult, port_id_list)
+ rc.annotate("Starting traffic on ports {0}:".format(port_id_list))
+ if rc.bad():
return False
+ return True
- def _process_batch_result(self, req_list, resp_list, handler_func=None, success_test=default_success_test):
- res_ok = True
- responses = []
- if isinstance(success_test, staticmethod):
- success_test = success_test.__func__
- for i, response in enumerate(resp_list):
- # run handler method with its params
- processed_response = handler_func(req_list[i], response, success_test)
- responses.append(processed_response)
- if not processed_response.success:
- res_ok = False
- # else:
- # res_ok = False # TODO: mark in this case somehow the bad result
- # print res_ok
- # print responses
- return res_ok, responses
-
-
+
# ------ private classes ------ #
class CCommLink(object):
"""describes the connectivity of the stateless client method"""
diff --git a/scripts/automation/trex_control_plane/console/parsing_opts.py b/scripts/automation/trex_control_plane/console/parsing_opts.py
index f983d837..252d33bf 100755
--- a/scripts/automation/trex_control_plane/console/parsing_opts.py
+++ b/scripts/automation/trex_control_plane/console/parsing_opts.py
@@ -139,8 +139,8 @@ class CCmdArgParser(argparse.ArgumentParser):
opts.ports = self.stateless_client.get_port_ids()
for port in opts.ports:
- if not self.stateless_client._is_ports_valid(port):
- self.error("port id {0} is not a valid\n".format(port))
+ if not self.stateless_client.validate_port_list(port):
+ self.error("port id {0} is not a valid port id\n".format(port))
return opts
diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py
index 2be643ab..5ba82dcb 100755
--- a/scripts/automation/trex_control_plane/console/trex_console.py
+++ b/scripts/automation/trex_control_plane/console/trex_console.py
@@ -296,7 +296,7 @@ class TRexConsole(cmd.Cmd):
return
- self.stateless_client.cmd_start(opts.ports, stream_list, opts.mult, opts.force, self.annotate)
+ self.stateless_client.cmd_start(opts.ports, stream_list, opts.mult, opts.force)
return
@@ -315,7 +315,7 @@ class TRexConsole(cmd.Cmd):
if opts is None:
return
- self.stateless_client.cmd_stop(opts.ports, self.annotate)
+ self.stateless_client.cmd_stop(opts.ports)
return
def help_stop(self):
@@ -324,7 +324,7 @@ class TRexConsole(cmd.Cmd):
########## reset
def do_reset (self, line):
'''force stop all ports\n'''
- self.stateless_client.cmd_reset(self.annotate)
+ self.stateless_client.cmd_reset()
# tui