summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2015-11-12 16:33:05 +0200
committerimarom <imarom@cisco.com>2015-11-12 16:33:05 +0200
commit78c6593c5a2d3d2242be7fc659d15eac6b869358 (patch)
tree3a0c11b7a42e10afc34497a8582d6a4f428c8e87 /scripts/automation/trex_control_plane
parente9c6fde1c28b2c51ea164b0df929c9c44ee6f444 (diff)
DRAFT - only for internal purpose
Diffstat (limited to 'scripts/automation/trex_control_plane')
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py804
-rwxr-xr-xscripts/automation/trex_control_plane/client_utils/jsonrpc_client.py273
-rwxr-xr-xscripts/automation/trex_control_plane/console/trex_console.py96
3 files changed, 515 insertions, 658 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index 168853b3..5a7b1873 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -24,148 +24,331 @@ class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg']
msg=self.msg,
stat="success" if self.success else "fail")
-# RpcResponseStatus = namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])
+# simple class to represent complex return value
+class RC:
+
+ def __init__ (self, rc, data):
+ self.rc = rc
+ self.data = data
+
+ def good (self):
+ return self.rc
+
+ def bad (self):
+ return not self.rc
+
+ def data (self):
+ if self.good():
+ return self.data
+ else:
+ return ""
+
+ def err (self):
+ if self.bad():
+ return self.data
+ else:
+ return ""
+
+RC_OK = RC(True, "")
+def RC_ERR (err):
+ return RC(False, err)
+
+class RC_LIST:
+ def __init__ (self):
+ self.rc_list = []
+
+ def add (self, rc):
+ self.rc_list.append(rc)
+
+ def good(self):
+ return all([x.good() for x in self.rc_list])
+
+ def bad (self):
+ not self.good()
+
+ def data (self):
+ return [x.data() for x in self.rc_list]
+
+ def err (self):
+ return [x.err() for x in self.rc_list]
+
+
+# describes a single port
+class Port:
+
+ STATE_DOWN = 0
+ STATE_IDLE = 1
+ STATE_STREAMS = 2
+ STATE_TX = 3
+ STATE_PAUSE = 4
+
+ def __init__ (self, port_id, user, transmit):
+ self.port_id = port_id
+ self.state = self.STATE_IDLE
+ self.handler = None
+ self.transmit = transmit
+ self.user = user
+
+ self.streams = {}
+
+ def err (self, msg):
+ return RC_ERR("port {0} : {1}".format(self.port_id, msg))
+
+ # take the port
+ def acquire (self, force = False):
+ params = {"port_id": self.port_id,
+ "user": self.user,
+ "force": force}
+
+ command = RpcCmdData("acquire", params)
+ rc = self.transmit(command.method, command.params)
+ if rc.success:
+ self.handler = rc.data
+ return RC_OK
+ else:
+ return RC_ERR(rc.data)
+
+
+ # release the port
+ def release (self):
+ params = {"port_id": self.port_id,
+ "handler": self.handler}
+
+ command = RpcCmdData("release", params)
+ rc = self.transmit(command.method, command.params)
+ if rc.success:
+ self.handler = rc.data
+ return RC_OK
+ else:
+ return RC_ERR(rc.data)
+
+ def is_acquired (self):
+ return (self.handler != None)
+
+ def is_active (self):
+ return (self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE)
+
+ def sync (self, sync_data):
+
+ self.handler = sync_data['handler']
+
+ if sync_data['state'] == "DOWN":
+ self.state = self.STATE_DOWN
+ elif sync_data['state'] == "IDLE":
+ self.state = self.STATE_IDLE
+ elif sync_data['state'] == "STREAMS":
+ self.state = self.STATE_STREAMS
+ elif sync_data['state'] == "TX":
+ self.state = self.STATE_TX
+ elif sync_data['state'] == "PAUSE":
+ self.state = self.STATE_PAUSE
+ else:
+ raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, sync_data['state']))
+
+ return RC_OK
+
+
+ # return TRUE if write commands
+ def is_port_writeable (self):
+ # operations on port can be done on state idle or state sreams
+ return ((self.state == STATE_IDLE) or (self.state == STATE_STREAMS))
+
+ # add stream to the port
+ def add_stream (self, stream_id, stream_obj):
+
+ if not self.is_port_writeable():
+ return self.err("Please stop port before attempting to add streams")
+
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "stream_id": stream_id,
+ "stream": stream_obj.dump()}
+
+ rc, data = self.transmit("add_stream", params)
+ if not rc:
+ return self.err(data)
+
+ # add the stream
+ self.streams[stream_id] = stream_obj
+
+ # the only valid state now
+ self.state = self.STATE_STREAMS
+
+ return RC_OK
+
+ # remove stream from port
+ def remove_stream (self, stream_id):
+
+ if not stream_id in self.streams:
+ return self.err("stream {0} does not exists".format(stream_id))
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "stream_id": stream_id}
+
+
+ rc, data = self.transmit("remove_stream", params)
+ if not rc:
+ return self.err(data)
+
+ self.streams[stream_id] = None
+
+ return RC_OK
+
+ # remove all the streams
+ def remove_all_streams (self):
+ for stream_id in self.streams.keys():
+ rc = self.remove_stream(stream_id)
+ if rc.bad():
+ return rc
+
+ return RC_OK
+
+ # start traffic
+ def start (self, mul):
+ if self.state == self.STATE_DOWN:
+ return self.err("Unable to start traffic - port is down")
+
+ if self.state == self.STATE_IDLE:
+ return self.err("Unable to start traffic - no streams attached to port")
+
+ if self.state == self.STATE_TX:
+ return self.err("Unable to start traffic - port is already transmitting")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "mul": mul}
+
+ rc, data = self.transmit("remove_stream", params)
+ if not rc:
+ return self.err(data)
+
+ self.state = self.STATE_TX
+
+ return RC_OK
+
+ def stop (self):
+ if (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE):
+ return self.err("Unable to stop traffic - port is down")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc, data = self.transmit("stop_traffic", params)
+ if not rc:
+ return self.err(data)
+
+ # only valid state after stop
+ self.state = self.STREAMS
+
+ return RC_OK
+
class CTRexStatelessClient(object):
"""docstring for CTRexStatelessClient"""
- def __init__(self, username, server="localhost", sync_port=5050, async_port = 4500, virtual=False):
+ def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, virtual=False):
super(CTRexStatelessClient, self).__init__()
self.user = username
self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual)
self.verbose = False
self._conn_handler = {}
self._active_ports = set()
- self._stats = CTRexStatsManager("port", "stream")
self._system_info = None
self._server_version = None
self.__err_log = None
self._async_client = CTRexAsyncClient(async_port)
+ self.connected = False
- # ----- decorator methods ----- #
- def acquired(func):
- def wrapper_f(self, *args, **kwargs):
- # print func.__name__
- # print args
- # print kwargs
- port_ids = kwargs.get("port_id")
- # if not port_ids:
- # # print "FROM ARGS!"
- # # print args
- # port_ids = args[0]
- if isinstance(port_ids, int):
- # make sure port_ids is a list
- port_ids = [port_ids]
- bad_ids = set()
- # print "============="
- # print port_ids
- for port_id in port_ids:
- port_owned = self._conn_handler.get(port_id)
- if not port_owned:
- bad_ids.add(port_id)
- # elif active_and_owned: # stronger condition than just owned, hence gets precedence
- # if port_owned and port_id in self._active_ports:
- # continue
- # else:
- # bad_ids.add(port_id)
- else:
- continue
- if bad_ids:
- # Some port IDs are not according to desires status
- raise ValueError("The requested method ('{0}') cannot be invoked since port IDs {1} aren't "
- "acquired".format(func.__name__, list(bad_ids)))
- else:
- return func(self, *args, **kwargs)
- return wrapper_f
-
- def force_status(owned=True, active_and_owned=False):
- def wrapper(func):
- def wrapper_f(self, *args, **kwargs):
- # print args
- # print kwargs
- port_ids = kwargs.get("port_id")
- if not port_ids:
- #print "FROM ARGS!"
- #print args
- port_ids = args[0]
- if isinstance(port_ids, int):
- # make sure port_ids is a list
- port_ids = [port_ids]
- bad_ids = set()
- # print "============="
- # print port_ids
- for port_id in port_ids:
- port_owned = self._conn_handler.get(port_id)
- if owned and not port_owned:
- bad_ids.add(port_id)
- elif active_and_owned: # stronger condition than just owned, hence gets precedence
- if port_owned and port_id in self._active_ports:
- continue
- else:
- bad_ids.add(port_id)
- else:
- continue
- if bad_ids:
- # Some port IDs are not according to desires status
- raise ValueError("The requested method ('{0}') cannot be invoked since port IDs {1} are not "
- "at allowed states".format(func.__name__, list(bad_ids)))
- else:
- return func(self, *args, **kwargs)
- return wrapper_f
- return wrapper
-
- @property
- def system_info(self):
- if not self._system_info:
- rc, info = self.get_system_info()
- if rc:
- self._system_info = info
- else:
- self.__err_log = info
- return self._system_info if self._system_info else "Unknown"
+ ############# helper functions section ##############
- @property
- def server_version(self):
- if not self._server_version:
- rc, ver_info = self.get_version()
- if rc:
- self._server_version = ver_info
- else:
- self.__err_log = ver_info
- return self._server_version if self._server_version else "Unknown"
+ def __validate_port_list(self, port_id):
+ if isinstance(port_id, list) or isinstance(port_id, set):
+ # check each item of the sequence
+ return all([self._is_ports_valid(port)
+ for port in port_id])
+ elif (isinstance(port_id, int)) and (port_id >= 0) and (port_id <= self.get_port_count()):
+ return True
+ else:
+ return False
+
+ def __ports (self, port_id_list):
+ if port_id_list == None:
+ return range(0, self.get_port_count())
+
+ for port_id in port_id_list:
+ if not isinstance(port_id, int) or (port_id < 0) or (port_id > self.get_port_count()):
+ raise ValueError("bad port id {0}".format(port_id))
+
+ return [port_id_list] if isinstance(port_id_list, int) else port_id_list
+
+ ############ boot up section ################
- def is_connected(self):
- return self.comm_link.is_connected
+ # connection sequence
+ def connect (self):
- # ----- user-access methods ----- #
- def connect(self):
- rc, err = self.comm_link.connect()
+ self.connected = False
+
+ # connect
+ rc, data = self.comm_link.connect()
if not rc:
- return rc, err
- return self._init_sync()
+ return RC_ERR(data)
- def get_stats_async (self):
- return self._async_client.get_stats()
- def get_connection_port (self):
- return self.comm_link.port
+ # cache system info
+ rc, data = self.transmit("get_system_info")
+ if not rc:
+ return RC_ERR(data)
+
+ self.system_info = data
+
+ # cache supported cmds
+ rc, data = self.transmit("get_supported_cmds")
+ if not rc:
+ return RC_ERR(data)
+
+ self.supported_cmds = data
+
+ # create ports
+ self.ports = []
+ for port_id in xrange(0, self.get_port_count()):
+ self.ports.append(Port(port_id, self.user, self.transmit))
+
+ # acquire all ports
+ rc = self.acquire()
+ if rc.bad():
+ return rc
+
+ rc = self.sync_with_server()
+ if rc.bad():
+ return rc
+
+ self.connected = True
+
+ return RC_OK
+
+ def is_connected (self):
+ return self.connected
+
def disconnect(self):
- return self.comm_link.disconnect()
+ self.connected = False
+ self.comm_link.disconnect()
- def ping(self):
- return self.transmit("ping")
+
+ ########### cached queries (no server traffic) ###########
def get_supported_cmds(self):
- return self.transmit("get_supported_cmds")
+ return self.supported_cmds
def get_version(self):
- return self.transmit("get_version")
+ return self.server_version
def get_system_info(self):
- return self.transmit("get_system_info")
+ return self.system_info
def get_port_count(self):
return self.system_info.get("port_count")
@@ -177,205 +360,167 @@ class CTRexStatelessClient(object):
else:
return port_ids
- def sync_user(self, sync_streams=False):
- return self.transmit("sync_user", {"user": self.user, "sync_streams": sync_streams})
+ def get_stats_async (self):
+ return self._async_client.get_stats()
+
+ def get_connection_port (self):
+ return self.comm_link.port
def get_acquired_ports(self):
- return self._conn_handler.keys()
+ return [port for port in self.ports if port.is_acquired()]
+
def get_active_ports(self):
- return list(self._active_ports)
+ return [port for port in self.ports if port.is_active()]
def set_verbose(self, mode):
self.comm_link.set_verbose(mode)
self.verbose = mode
- def acquire(self, port_id, force=False):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- return self._process_batch_result(commands, resp_list, self._handle_acquire_response)
- else:
- params = {"port_id": port_id,
- "user": self.user,
- "force": force}
- command = RpcCmdData("acquire", params)
- return self._handle_acquire_response(command,
- self.transmit(command.method, command.params),
- self.default_success_test)
-
- @force_status(owned=True)
- def release(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- return self._process_batch_result(commands, resp_list, self._handle_release_response,
- success_test=self.ack_success_test)
- else:
- self._conn_handler.pop(port_id)
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- command = RpcCmdData("release", params)
- return self._handle_release_response(command,
- self.transmit(command.method, command.params),
- self.ack_success_test)
+ ############# server actions ################
- @acquired
- def add_stream(self, stream_id, stream_obj, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
+ # ping server
+ def ping(self):
+ rc, info = self.transmit("ping")
+ return RC(rc, info)
+
+
+ def sync_with_server(self, sync_streams=False):
+ rc, data = self.transmit("sync_user", {"user": self.user, "sync_streams": sync_streams})
+ if not rc:
+ return RC_ERR(data)
+
+ for port_info in data:
+
+ rc = self.ports[port_info['port_id']].sync(port_info)
+ if rc.bad():
+ return rc
+
+ return RC_OK
+
+
+
+ ########## port commands ##############
+ # acquire ports, if port_list is none - get all
+ def acquire (self, port_id_list = None, force = False):
+ port_id_list = self.__ports(port_id_list)
+
+ rc_list = RC_LIST()
+
+ for port_id in port_id_list:
+ rc = self.ports[port_id].acquire(force)
+ rc_list.add(rc)
+
+ return rc_list
+
+ # release ports
+ def release (self, port_id_list = None):
+ port_id_list = self.__ports(port_id_list)
+
+ rc_list = RC_LIST()
+
+ for port_id in port_id_list:
+ rc, msg = self.ports[port_id].release(force)
+ rc_list.add(rc)
+
+ return rc_list
+
+
+ def add_stream(self, stream_id, stream_obj, port_id_list = None):
assert isinstance(stream_obj, CStream)
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id,
- "stream_id": stream_id,
- "stream": stream_obj.dump()}
- return self.transmit("add_stream", params)
- @acquired
- def add_stream_pack(self, stream_pack_list, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
+ port_id_list = self.__ports(port_id_list)
+
+ rc_list = RC_LIST()
+
+ for port_id in port_id_list:
+ rc = self.ports[port_id].add_stream(stream_id, stream_obj)
+ rc_list.add(rc)
+
+ return rc_list
+
+
+ def add_stream_pack(self, stream_pack_list, port_id_list = None):
+
+ port_id_list = self.__ports(port_id_list)
+
+ rc_list = RC_LIST()
- # since almost every run contains more than one transaction with server, handle all as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = []
for stream_pack in stream_pack_list:
- commands.extend([RpcCmdData("add_stream", {"port_id": p_id,
- "handler": self._conn_handler.get(p_id),
- "stream_id": stream_pack.stream_id,
- "stream": stream_pack.stream}
- )
- for p_id in port_ids]
- )
- res_ok, resp_list = self.transmit_batch(commands)
- if not res_ok:
- return res_ok, resp_list
-
- return self._process_batch_result(commands, resp_list, self._handle_add_stream_response,
- success_test=self.ack_success_test)
-
- @force_status(owned=True)
- def remove_stream(self, stream_id, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id,
- "stream_id": stream_id}
- return self.transmit("remove_stream", params)
+ rc = self.add_stream(stream_pack.stream_id, stream_pack.stream, port_id_list)
+ rc_list.add(rc)
+
+ return rc_list
- def remove_all_streams(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("remove_all_streams", {"port_id": p_id, "handler": self._conn_handler.get(p_id)})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- return self._process_batch_result(commands, resp_list, self._handle_remove_streams_response,
- success_test=self.ack_success_test)
- else:
- params = {"port_id": port_id,
- "handler": self._conn_handler.get(port_id)}
- command = RpcCmdData("remove_all_streams", params)
- return self._handle_remove_streams_response(command,
- self.transmit(command.method, command.params),
- self.ack_success_test)
- pass
- @force_status(owned=True)#, active_and_owned=True)
- def get_all_streams(self, port_id, get_pkt = False):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id,
- "get_pkt": get_pkt}
- return self.transmit("get_all_streams", params)
+ def remove_stream(self, stream_id, port_id_list = None):
+ port_id_list = self.__ports(port_id_list)
+
+ rc_list = RC_LIST()
+
+ for port_id in port_id_list:
+ rc = self.ports[port_id].remove_stream(stream_id)
+ rc_list.add(rc)
+
+ return rc_list
+
- @force_status(owned=True)#, active_and_owned=True)
- def get_stream_id_list(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- return self.transmit("get_stream_list", params)
- @force_status(owned=True, active_and_owned=True)
+ def remove_all_streams(self, port_id_list = None):
+ port_id_list = self.__ports(port_id_list)
+
+ rc_list = RC_LIST()
+
+ for port_id in port_id_list:
+ rc = self.ports[port_id].remove_all_streams()
+ rc_list.add(rc)
+
+ return rc_list
+
+
def get_stream(self, stream_id, port_id, get_pkt = False):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id,
- "stream_id": stream_id,
- "get_pkt": get_pkt}
- return self.transmit("get_stream_list", params)
- def start_traffic(self, multiplier, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id),
- "port_id": p_id,
- "mul": multiplier})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- return self._process_batch_result(commands, resp_list, self._handle_start_traffic_response,
- success_test=self.ack_success_test)
- else:
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id,
- "mul": multiplier}
- command = RpcCmdData("start_traffic", params)
- return self._handle_start_traffic_response(command,
- self.transmit(command.method, command.params),
- self.ack_success_test)
-
- def stop_traffic(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- if not port_ids:
- # don't invoke if port ids is empty
- return True, []
- commands = [RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- return self._process_batch_result(commands, resp_list, self._handle_stop_traffic_response,
- success_test=self.ack_success_test)
- else:
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- command = RpcCmdData("stop_traffic", params)
- return self._handle_stop_traffic_response(command,
- self.transmit(command.method, command.params),
- self.ack_success_test)
+ return self.ports[port_id].get_stream(stream_id)
+
+
+ def get_all_streams(self, port_id, get_pkt = False):
+
+ return self.ports[port_id].get_all_streams()
+
+
+ def get_stream_id_list(self, port_id):
+
+ return self.ports[port_id].get_stream_id_list()
+
+
+ def start_traffic (self, multiplier, port_id_list = None):
+
+ port_id_list = self.__ports(port_id_list)
+
+ rc_list = RC_LIST()
+
+ for port_id in port_id_list:
+ rc = self.ports[port_id].start(multiplier)
+ rc_list.add(rc)
+
+ return rc_list
+
+
+
+ def stop_traffic (self, port_id_list = None):
+
+ port_id_list = self.__ports(port_id_list)
+
+ rc_list = RC_LIST()
+
+ for port_id in port_id_list:
+ rc = self.ports[port_id].stop()
+ rc_list.add(rc)
+
+ return rc_list
-# def get_global_stats(self):
-# command = RpcCmdData("get_global_stats", {})
-# return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params))
-# # return self.transmit("get_global_stats")
- @force_status(owned=True, active_and_owned=True)
def get_port_stats(self, port_id=None):
if not self._is_ports_valid(port_id):
raise ValueError("Provided illegal port id input")
@@ -393,7 +538,6 @@ class CTRexStatelessClient(object):
command = RpcCmdData("get_port_stats", params)
return self._handle_get_port_stats_response(command, self.transmit(command.method, command.params))
- @force_status(owned=True, active_and_owned=True)
def get_stream_stats(self, port_id=None):
if not self._is_ports_valid(port_id):
raise ValueError("Provided illegal port id input")
@@ -411,26 +555,6 @@ class CTRexStatelessClient(object):
command = RpcCmdData("get_stream_stats", params)
return self._handle_get_stream_stats_response(command, self.transmit(command.method, command.params))
- # ----- internal methods ----- #
- def _init_sync(self):
- # get server version and system info
- err = False
- if self.server_version == "Unknown" or self.system_info == "Unknown":
- self.disconnect()
- return False, self.__err_log
- # sync with previous session
- res_ok, port_info = self.sync_user()
- if not res_ok:
- self.disconnect()
- return False, port_info
- else:
- # handle sync data
- for port in port_info:
- self._conn_handler[port.get("port_id")] = port.get("handler")
- if port.get("state") == "transmitting":
- # port is active
- self._active_ports.add(port.get("port_id"))
- return True, ""
def transmit(self, method_name, params={}):
@@ -439,17 +563,6 @@ class CTRexStatelessClient(object):
def transmit_batch(self, batch_list):
return self.comm_link.transmit_batch(batch_list)
- @staticmethod
- def _object_decoder(obj_type, obj_data):
- if obj_type == "global":
- return CGlobalStats(**obj_data)
- elif obj_type == "port":
- return CPortStats(**obj_data)
- elif obj_type == "stream":
- return CStreamStats(**obj_data)
- else:
- # Do not serialize the data into class
- return obj_data
@staticmethod
def default_success_test(result_obj):
@@ -474,35 +587,37 @@ class CTRexStatelessClient(object):
#
def cmd_reset (self, annotate_func):
- ports = self.get_port_ids()
# sync with the server
- rc, log = self._init_sync()
- annotate_func("Syncing with the server:", rc, log)
- if not rc:
- return False
-
+ rc = self.sync_with_server()
+ annotate_func("Syncing with the server:", rc.good(), rc.err())
+ if rc.bad():
+ return rc
# force acquire all ports
- rc, log = self.acquire(ports, force = True)
- annotate_func("Force acquiring all ports:", rc, log)
- if not rc:
- return False
+ rc = self.acquire(force = True)
+ annotate_func("Force acquiring all ports:", rc.good(), rc.err())
+ if rc.bad():
+ return rc
- # force stop
- rc, log = self.stop_traffic(ports)
- annotate_func("Stop traffic on all ports:", rc, log)
- if not rc:
- return False
+
+ # force stop all ports
+ port_id_list = self.get_active_ports()
+ rc = self.stop_traffic(port_id_list)
+ annotate_func("Stop traffic on all ports:", rc.good(), rc.err())
+ if rc.bad():
+ return rc
+
+ return
# remove all streams
- rc, log = self.remove_all_streams(ports)
- annotate_func("Removing all streams from all ports:", rc, log)
- if not rc:
- return False
+ rc = self.remove_all_streams(ports)
+ annotate_func("Removing all streams from all ports:", rc.good(), rc.err())
+ if rc.bad():
+ return rc
# TODO: clear stats
- return True
+ return RC_OK
# stop cmd
@@ -511,7 +626,7 @@ class CTRexStatelessClient(object):
# find the relveant ports
active_ports = set(self.get_active_ports()).intersection(ports)
if not active_ports:
- annotate_func("No active traffic on porivded ports")
+ annotate_func("No active traffic on porvided ports")
return True
rc, log = self.stop_traffic(active_ports)
@@ -524,7 +639,7 @@ class CTRexStatelessClient(object):
# start cmd
def cmd_start (self, ports, stream_list, mult, force, annotate_func):
- if force:
+ if (force and set(self.get_active_ports()).intersection(ports)):
rc = self.cmd_stop(ports, annotate_func)
if not rc:
return False
@@ -626,15 +741,6 @@ class CTRexStatelessClient(object):
else:
return False
- def _is_ports_valid(self, port_id):
- if isinstance(port_id, list) or isinstance(port_id, set):
- # check each item of the sequence
- return all([self._is_ports_valid(port)
- for port in port_id])
- elif (isinstance(port_id, int)) and (port_id >= 0) and (port_id <= self.get_port_count()):
- return True
- else:
- return False
def _process_batch_result(self, req_list, resp_list, handler_func=None, success_test=default_success_test):
res_ok = True
diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
index 58491aba..077c82ad 100755
--- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
+++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py
@@ -110,7 +110,7 @@ class JsonRpcClient(object):
return id, msg
- def invoke_rpc_method (self, method_name, params = {}, block = False):
+ def invoke_rpc_method (self, method_name, params = {}, block = True):
if not self.connected:
return False, "Not connected to server"
@@ -120,7 +120,7 @@ class JsonRpcClient(object):
# low level send of string message
- def send_raw_msg (self, msg, block = False):
+ def send_raw_msg (self, msg, block = True):
self.verbose_msg("Sending Request To Server:\n\n" + self.pretty_json(msg) + "\n")
if block:
@@ -248,272 +248,3 @@ class JsonRpcClient(object):
if hasattr(self, "context"):
self.context.destroy(linger=0)
-# MOVE THIS TO DAN'S FILE
-class TrexStatelessClient(JsonRpcClient):
-
- def __init__ (self, server, port, user):
-
- super(TrexStatelessClient, self).__init__(server, port)
-
- self.user = user
- self.port_handlers = {}
-
- self.supported_cmds = []
- self.system_info = None
- self.server_version = None
-
-
- def whoami (self):
- return self.user
-
- def ping_rpc_server(self):
-
- return self.invoke_rpc_method("ping", block = False)
-
- def get_rpc_server_version (self):
- return self.server_version
-
- def get_system_info (self):
- if not self.system_info:
- return {}
-
- return self.system_info
-
- def get_supported_cmds(self):
- if not self.supported_cmds:
- return {}
-
- return self.supported_cmds
-
- def get_port_count (self):
- if not self.system_info:
- return 0
-
- return self.system_info["port_count"]
-
- # sync the client with all the server required data
- def sync (self):
-
- # get server version
- rc, msg = self.invoke_rpc_method("get_version")
- if not rc:
- self.disconnect()
- return rc, msg
-
- self.server_version = msg
-
- # get supported commands
- rc, msg = self.invoke_rpc_method("get_supported_cmds")
- if not rc:
- self.disconnect()
- return rc, msg
-
- self.supported_cmds = [str(x) for x in msg if x]
-
- # get system info
- rc, msg = self.invoke_rpc_method("get_system_info")
- if not rc:
- self.disconnect()
- return rc, msg
-
- self.system_info = msg
-
- return True, ""
-
- def connect (self):
- rc, err = super(TrexStatelessClient, self).connect()
- if not rc:
- return rc, err
-
- return self.sync()
-
-
- # take ownership over ports
- def take_ownership (self, port_id_array, force = False):
- if not self.connected:
- return False, "Not connected to server"
-
- batch = self.create_batch()
-
- for port_id in port_id_array:
- batch.add("acquire", params = {"port_id":port_id, "user":self.user, "force":force})
-
- rc, resp_list = batch.invoke()
- if not rc:
- return rc, resp_list
-
- for i, rc in enumerate(resp_list):
- if rc[0]:
- self.port_handlers[port_id_array[i]] = rc[1]
-
- return True, resp_list
-
-
- def release_ports (self, port_id_array):
- batch = self.create_batch()
-
- for port_id in port_id_array:
-
- # let the server handle un-acquired errors
- if self.port_handlers.get(port_id):
- handler = self.port_handlers[port_id]
- else:
- handler = ""
-
- batch.add("release", params = {"port_id":port_id, "handler":handler})
-
-
- rc, resp_list = batch.invoke()
- if not rc:
- return rc, resp_list
-
- for i, rc in enumerate(resp_list):
- if rc[0]:
- self.port_handlers.pop(port_id_array[i])
-
- return True, resp_list
-
- def get_owned_ports (self):
- return self.port_handlers.keys()
-
- # fetch port stats
- def get_port_stats (self, port_id_array):
- if not self.connected:
- return False, "Not connected to server"
-
- batch = self.create_batch()
-
- # empty list means all
- if port_id_array == []:
- port_id_array = list([x for x in xrange(0, self.system_info["port_count"])])
-
- for port_id in port_id_array:
-
- # let the server handle un-acquired errors
- if self.port_handlers.get(port_id):
- handler = self.port_handlers[port_id]
- else:
- handler = ""
-
- batch.add("get_port_stats", params = {"port_id":port_id, "handler":handler})
-
-
- rc, resp_list = batch.invoke()
-
- return rc, resp_list
-
- # snapshot will take a snapshot of all your owned ports for streams and etc.
- def snapshot(self):
-
-
- if len(self.get_owned_ports()) == 0:
- return {}
-
- snap = {}
-
- batch = self.create_batch()
-
- for port_id in self.get_owned_ports():
-
- batch.add("get_port_stats", params = {"port_id": port_id, "handler": self.port_handlers[port_id]})
- batch.add("get_stream_list", params = {"port_id": port_id, "handler": self.port_handlers[port_id]})
-
- rc, resp_list = batch.invoke()
- if not rc:
- return rc, resp_list
-
- # split the list to 2s
- index = 0
- for port_id in self.get_owned_ports():
- if not resp_list[index] or not resp_list[index + 1]:
- snap[port_id] = None
- continue
-
- # fetch the first two
- stats = resp_list[index][1]
- stream_list = resp_list[index + 1][1]
-
- port = {}
- port['status'] = stats['status']
- port['stream_list'] = []
-
- # get all the streams
- if len(stream_list) > 0:
- batch = self.create_batch()
- for stream_id in stream_list:
- batch.add("get_stream", params = {"port_id": port_id, "stream_id": stream_id, "handler": self.port_handlers[port_id]})
-
- rc, stream_resp_list = batch.invoke()
- if not rc:
- port = {}
-
- port['streams'] = {}
- for i, resp in enumerate(stream_resp_list):
- if resp[0]:
- port['streams'][stream_list[i]] = resp[1]
-
- snap[port_id] = port
-
- # move to next one
- index += 2
-
-
- return snap
-
- # add stream
- # def add_stream (self, port_id, stream_id, isg, next_stream_id, packet, vm=[]):
- # if not port_id in self.get_owned_ports():
- # return False, "Port {0} is not owned... please take ownership before adding streams".format(port_id)
- #
- # handler = self.port_handlers[port_id]
- #
- # stream = {}
- # stream['enabled'] = True
- # stream['self_start'] = True
- # stream['isg'] = isg
- # stream['next_stream_id'] = next_stream_id
- # stream['packet'] = {}
- # stream['packet']['binary'] = packet
- # stream['packet']['meta'] = ""
- # stream['vm'] = vm
- # stream['rx_stats'] = {}
- # stream['rx_stats']['enabled'] = False
- #
- # stream['mode'] = {}
- # stream['mode']['type'] = 'continuous'
- # stream['mode']['pps'] = 10.0
- #
- # params = {}
- # params['handler'] = handler
- # params['stream'] = stream
- # params['port_id'] = port_id
- # params['stream_id'] = stream_id
- #
- # print params
- # return self.invoke_rpc_method('add_stream', params = params)
-
- def add_stream(self, port_id_array, stream_pack_list):
- batch = self.create_batch()
-
- for port_id in port_id_array:
- for stream_pack in stream_pack_list:
- params = {"port_id": port_id,
- "handler": self.port_handlers[port_id],
- "stream_id": stream_pack.stream_id,
- "stream": stream_pack.stream}
- batch.add("add_stream", params=params)
- rc, resp_list = batch.invoke()
- if not rc:
- return rc, resp_list
-
- for i, rc in enumerate(resp_list):
- if rc[0]:
- print "Stream {0} - {1}".format(i, rc[1])
- # self.port_handlers[port_id_array[i]] = rc[1]
-
- return True, resp_list
-
- # return self.invoke_rpc_method('add_stream', params = params)
-
-if __name__ == "__main__":
- pass \ No newline at end of file
diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py
index 06ae762a..2be643ab 100755
--- a/scripts/automation/trex_control_plane/console/trex_console.py
+++ b/scripts/automation/trex_control_plane/console/trex_console.py
@@ -160,7 +160,51 @@ class TRexConsole(cmd.Cmd):
for x in os.listdir(path)
if x.startswith(start_string)]
+ # annotation method
+ @staticmethod
+ def annotate (desc, rc = None, err_log = None, ext_err_msg = None):
+ print format_text('\n{:<40}'.format(desc), 'bold'),
+ if rc == None:
+ print "\n"
+ return
+
+ if rc == False:
+ # do we have a complex log object ?
+ if isinstance(err_log, list):
+ print ""
+ for func in err_log:
+ if func:
+ print func
+ print ""
+
+ elif isinstance(err_log, str):
+ print "\n" + err_log + "\n"
+
+ print format_text("[FAILED]\n", 'red', 'bold')
+ if ext_err_msg:
+ print format_text(ext_err_msg + "\n", 'blue', 'bold')
+
+ return False
+
+ else:
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+ return True
+
+
####################### shell commands #######################
+ def do_ping (self, line):
+ '''Ping the server\n'''
+
+ rc = self.stateless_client.ping()
+ if rc.good():
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+ else:
+ print "\n*** " + rc.err() + "\n"
+ print format_text("[FAILED]\n", 'red', 'bold')
+ return
+
+ def do_test (self, line):
+ print self.stateless_client.get_acquired_ports()
# set verbose on / off
def do_verbose(self, line):
@@ -171,65 +215,41 @@ class TRexConsole(cmd.Cmd):
elif line == "on":
self.verbose = True
self.stateless_client.set_verbose(True)
- print green("\nverbose set to on\n")
+ print format_text("\nverbose set to on\n", 'green', 'bold')
elif line == "off":
self.verbose = False
self.stateless_client.set_verbose(False)
- print green("\nverbose set to off\n")
+ print format_text("\nverbose set to off\n", 'green', 'bold')
else:
- print magenta("\nplease specify 'on' or 'off'\n")
+ print format_text("\nplease specify 'on' or 'off'\n", 'bold')
+
############### connect
def do_connect (self, line):
'''Connects to the server\n'''
- res_ok, msg = self.stateless_client.connect()
- if res_ok:
+ rc = self.stateless_client.connect()
+ if rc.good():
print format_text("[SUCCESS]\n", 'green', 'bold')
else:
- print "\n*** " + msg + "\n"
+ print "\n*** " + rc.err() + "\n"
print format_text("[FAILED]\n", 'red', 'bold')
return
- self.supported_rpc = self.stateless_client.get_supported_cmds().data
- if self.acquire_all_ports:
- res_ok, log = self.stateless_client.acquire(self.stateless_client.get_port_ids())
- if not res_ok:
- print "\n*** Failed to acquire all ports... exiting..."""
+ def do_disconnect (self, line):
+ '''Disconnect from the server\n'''
- @staticmethod
- def annotate (desc, rc = None, err_log = None, ext_err_msg = None):
- print format_text('\n{:<40}'.format(desc), 'bold'),
- if rc == None:
- print "\n"
+ if not self.stateless_client.is_connected():
+ print "Not connected to server\n"
return
- if rc == False:
- # do we have a complex log object ?
- if isinstance(err_log, list):
- print ""
- for func in err_log:
- if func:
- print func
- print ""
-
- elif isinstance(err_log, str):
- print "\n" + err_log + "\n"
-
- print format_text("[FAILED]\n", 'red', 'bold')
- if ext_err_msg:
- print format_text(ext_err_msg + "\n", 'blue', 'bold')
-
- return False
-
- else:
- print format_text("[SUCCESS]\n", 'green', 'bold')
- return True
-
+ self.stateless_client.disconnect()
+ print format_text("[SUCCESS]\n", 'green', 'bold')
+
############### start
def complete_start(self, text, line, begidx, endidx):