summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/client/trex_stateless_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane/client/trex_stateless_client.py')
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_stateless_client.py804
1 files changed, 455 insertions, 349 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index 168853b3..5a7b1873 100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -24,148 +24,331 @@ class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg']
msg=self.msg,
stat="success" if self.success else "fail")
-# RpcResponseStatus = namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])
+# simple class to represent complex return value
+class RC:
+
+ def __init__ (self, rc, data):
+ self.rc = rc
+ self.data = data
+
+ def good (self):
+ return self.rc
+
+ def bad (self):
+ return not self.rc
+
+ def data (self):
+ if self.good():
+ return self.data
+ else:
+ return ""
+
+ def err (self):
+ if self.bad():
+ return self.data
+ else:
+ return ""
+
+RC_OK = RC(True, "")
+def RC_ERR (err):
+ return RC(False, err)
+
+class RC_LIST:
+ def __init__ (self):
+ self.rc_list = []
+
+ def add (self, rc):
+ self.rc_list.append(rc)
+
+ def good(self):
+ return all([x.good() for x in self.rc_list])
+
+ def bad (self):
+ not self.good()
+
+ def data (self):
+ return [x.data() for x in self.rc_list]
+
+ def err (self):
+ return [x.err() for x in self.rc_list]
+
+
+# describes a single port
+class Port:
+
+ STATE_DOWN = 0
+ STATE_IDLE = 1
+ STATE_STREAMS = 2
+ STATE_TX = 3
+ STATE_PAUSE = 4
+
+ def __init__ (self, port_id, user, transmit):
+ self.port_id = port_id
+ self.state = self.STATE_IDLE
+ self.handler = None
+ self.transmit = transmit
+ self.user = user
+
+ self.streams = {}
+
+ def err (self, msg):
+ return RC_ERR("port {0} : {1}".format(self.port_id, msg))
+
+ # take the port
+ def acquire (self, force = False):
+ params = {"port_id": self.port_id,
+ "user": self.user,
+ "force": force}
+
+ command = RpcCmdData("acquire", params)
+ rc = self.transmit(command.method, command.params)
+ if rc.success:
+ self.handler = rc.data
+ return RC_OK
+ else:
+ return RC_ERR(rc.data)
+
+
+ # release the port
+ def release (self):
+ params = {"port_id": self.port_id,
+ "handler": self.handler}
+
+ command = RpcCmdData("release", params)
+ rc = self.transmit(command.method, command.params)
+ if rc.success:
+ self.handler = rc.data
+ return RC_OK
+ else:
+ return RC_ERR(rc.data)
+
+ def is_acquired (self):
+ return (self.handler != None)
+
+ def is_active (self):
+ return (self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE)
+
+ def sync (self, sync_data):
+
+ self.handler = sync_data['handler']
+
+ if sync_data['state'] == "DOWN":
+ self.state = self.STATE_DOWN
+ elif sync_data['state'] == "IDLE":
+ self.state = self.STATE_IDLE
+ elif sync_data['state'] == "STREAMS":
+ self.state = self.STATE_STREAMS
+ elif sync_data['state'] == "TX":
+ self.state = self.STATE_TX
+ elif sync_data['state'] == "PAUSE":
+ self.state = self.STATE_PAUSE
+ else:
+ raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, sync_data['state']))
+
+ return RC_OK
+
+
+ # return TRUE if write commands
+ def is_port_writeable (self):
+ # operations on port can be done on state idle or state sreams
+ return ((self.state == STATE_IDLE) or (self.state == STATE_STREAMS))
+
+ # add stream to the port
+ def add_stream (self, stream_id, stream_obj):
+
+ if not self.is_port_writeable():
+ return self.err("Please stop port before attempting to add streams")
+
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "stream_id": stream_id,
+ "stream": stream_obj.dump()}
+
+ rc, data = self.transmit("add_stream", params)
+ if not rc:
+ return self.err(data)
+
+ # add the stream
+ self.streams[stream_id] = stream_obj
+
+ # the only valid state now
+ self.state = self.STATE_STREAMS
+
+ return RC_OK
+
+ # remove stream from port
+ def remove_stream (self, stream_id):
+
+ if not stream_id in self.streams:
+ return self.err("stream {0} does not exists".format(stream_id))
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "stream_id": stream_id}
+
+
+ rc, data = self.transmit("remove_stream", params)
+ if not rc:
+ return self.err(data)
+
+ self.streams[stream_id] = None
+
+ return RC_OK
+
+ # remove all the streams
+ def remove_all_streams (self):
+ for stream_id in self.streams.keys():
+ rc = self.remove_stream(stream_id)
+ if rc.bad():
+ return rc
+
+ return RC_OK
+
+ # start traffic
+ def start (self, mul):
+ if self.state == self.STATE_DOWN:
+ return self.err("Unable to start traffic - port is down")
+
+ if self.state == self.STATE_IDLE:
+ return self.err("Unable to start traffic - no streams attached to port")
+
+ if self.state == self.STATE_TX:
+ return self.err("Unable to start traffic - port is already transmitting")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id,
+ "mul": mul}
+
+ rc, data = self.transmit("remove_stream", params)
+ if not rc:
+ return self.err(data)
+
+ self.state = self.STATE_TX
+
+ return RC_OK
+
+ def stop (self):
+ if (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE):
+ return self.err("Unable to stop traffic - port is down")
+
+ params = {"handler": self.handler,
+ "port_id": self.port_id}
+
+ rc, data = self.transmit("stop_traffic", params)
+ if not rc:
+ return self.err(data)
+
+ # only valid state after stop
+ self.state = self.STREAMS
+
+ return RC_OK
+
class CTRexStatelessClient(object):
"""docstring for CTRexStatelessClient"""
- def __init__(self, username, server="localhost", sync_port=5050, async_port = 4500, virtual=False):
+ def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, virtual=False):
super(CTRexStatelessClient, self).__init__()
self.user = username
self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual)
self.verbose = False
self._conn_handler = {}
self._active_ports = set()
- self._stats = CTRexStatsManager("port", "stream")
self._system_info = None
self._server_version = None
self.__err_log = None
self._async_client = CTRexAsyncClient(async_port)
+ self.connected = False
- # ----- decorator methods ----- #
- def acquired(func):
- def wrapper_f(self, *args, **kwargs):
- # print func.__name__
- # print args
- # print kwargs
- port_ids = kwargs.get("port_id")
- # if not port_ids:
- # # print "FROM ARGS!"
- # # print args
- # port_ids = args[0]
- if isinstance(port_ids, int):
- # make sure port_ids is a list
- port_ids = [port_ids]
- bad_ids = set()
- # print "============="
- # print port_ids
- for port_id in port_ids:
- port_owned = self._conn_handler.get(port_id)
- if not port_owned:
- bad_ids.add(port_id)
- # elif active_and_owned: # stronger condition than just owned, hence gets precedence
- # if port_owned and port_id in self._active_ports:
- # continue
- # else:
- # bad_ids.add(port_id)
- else:
- continue
- if bad_ids:
- # Some port IDs are not according to desires status
- raise ValueError("The requested method ('{0}') cannot be invoked since port IDs {1} aren't "
- "acquired".format(func.__name__, list(bad_ids)))
- else:
- return func(self, *args, **kwargs)
- return wrapper_f
-
- def force_status(owned=True, active_and_owned=False):
- def wrapper(func):
- def wrapper_f(self, *args, **kwargs):
- # print args
- # print kwargs
- port_ids = kwargs.get("port_id")
- if not port_ids:
- #print "FROM ARGS!"
- #print args
- port_ids = args[0]
- if isinstance(port_ids, int):
- # make sure port_ids is a list
- port_ids = [port_ids]
- bad_ids = set()
- # print "============="
- # print port_ids
- for port_id in port_ids:
- port_owned = self._conn_handler.get(port_id)
- if owned and not port_owned:
- bad_ids.add(port_id)
- elif active_and_owned: # stronger condition than just owned, hence gets precedence
- if port_owned and port_id in self._active_ports:
- continue
- else:
- bad_ids.add(port_id)
- else:
- continue
- if bad_ids:
- # Some port IDs are not according to desires status
- raise ValueError("The requested method ('{0}') cannot be invoked since port IDs {1} are not "
- "at allowed states".format(func.__name__, list(bad_ids)))
- else:
- return func(self, *args, **kwargs)
- return wrapper_f
- return wrapper
-
- @property
- def system_info(self):
- if not self._system_info:
- rc, info = self.get_system_info()
- if rc:
- self._system_info = info
- else:
- self.__err_log = info
- return self._system_info if self._system_info else "Unknown"
+ ############# helper functions section ##############
- @property
- def server_version(self):
- if not self._server_version:
- rc, ver_info = self.get_version()
- if rc:
- self._server_version = ver_info
- else:
- self.__err_log = ver_info
- return self._server_version if self._server_version else "Unknown"
+ def __validate_port_list(self, port_id):
+ if isinstance(port_id, list) or isinstance(port_id, set):
+ # check each item of the sequence
+ return all([self._is_ports_valid(port)
+ for port in port_id])
+ elif (isinstance(port_id, int)) and (port_id >= 0) and (port_id <= self.get_port_count()):
+ return True
+ else:
+ return False
+
+ def __ports (self, port_id_list):
+ if port_id_list == None:
+ return range(0, self.get_port_count())
+
+ for port_id in port_id_list:
+ if not isinstance(port_id, int) or (port_id < 0) or (port_id > self.get_port_count()):
+ raise ValueError("bad port id {0}".format(port_id))
+
+ return [port_id_list] if isinstance(port_id_list, int) else port_id_list
+
+ ############ boot up section ################
- def is_connected(self):
- return self.comm_link.is_connected
+ # connection sequence
+ def connect (self):
- # ----- user-access methods ----- #
- def connect(self):
- rc, err = self.comm_link.connect()
+ self.connected = False
+
+ # connect
+ rc, data = self.comm_link.connect()
if not rc:
- return rc, err
- return self._init_sync()
+ return RC_ERR(data)
- def get_stats_async (self):
- return self._async_client.get_stats()
- def get_connection_port (self):
- return self.comm_link.port
+ # cache system info
+ rc, data = self.transmit("get_system_info")
+ if not rc:
+ return RC_ERR(data)
+
+ self.system_info = data
+
+ # cache supported cmds
+ rc, data = self.transmit("get_supported_cmds")
+ if not rc:
+ return RC_ERR(data)
+
+ self.supported_cmds = data
+
+ # create ports
+ self.ports = []
+ for port_id in xrange(0, self.get_port_count()):
+ self.ports.append(Port(port_id, self.user, self.transmit))
+
+ # acquire all ports
+ rc = self.acquire()
+ if rc.bad():
+ return rc
+
+ rc = self.sync_with_server()
+ if rc.bad():
+ return rc
+
+ self.connected = True
+
+ return RC_OK
+
+ def is_connected (self):
+ return self.connected
+
def disconnect(self):
- return self.comm_link.disconnect()
+ self.connected = False
+ self.comm_link.disconnect()
- def ping(self):
- return self.transmit("ping")
+
+ ########### cached queries (no server traffic) ###########
def get_supported_cmds(self):
- return self.transmit("get_supported_cmds")
+ return self.supported_cmds
def get_version(self):
- return self.transmit("get_version")
+ return self.server_version
def get_system_info(self):
- return self.transmit("get_system_info")
+ return self.system_info
def get_port_count(self):
return self.system_info.get("port_count")
@@ -177,205 +360,167 @@ class CTRexStatelessClient(object):
else:
return port_ids
- def sync_user(self, sync_streams=False):
- return self.transmit("sync_user", {"user": self.user, "sync_streams": sync_streams})
+ def get_stats_async (self):
+ return self._async_client.get_stats()
+
+ def get_connection_port (self):
+ return self.comm_link.port
def get_acquired_ports(self):
- return self._conn_handler.keys()
+ return [port for port in self.ports if port.is_acquired()]
+
def get_active_ports(self):
- return list(self._active_ports)
+ return [port for port in self.ports if port.is_active()]
def set_verbose(self, mode):
self.comm_link.set_verbose(mode)
self.verbose = mode
- def acquire(self, port_id, force=False):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- return self._process_batch_result(commands, resp_list, self._handle_acquire_response)
- else:
- params = {"port_id": port_id,
- "user": self.user,
- "force": force}
- command = RpcCmdData("acquire", params)
- return self._handle_acquire_response(command,
- self.transmit(command.method, command.params),
- self.default_success_test)
-
- @force_status(owned=True)
- def release(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- return self._process_batch_result(commands, resp_list, self._handle_release_response,
- success_test=self.ack_success_test)
- else:
- self._conn_handler.pop(port_id)
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- command = RpcCmdData("release", params)
- return self._handle_release_response(command,
- self.transmit(command.method, command.params),
- self.ack_success_test)
+ ############# server actions ################
- @acquired
- def add_stream(self, stream_id, stream_obj, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
+ # ping server
+ def ping(self):
+ rc, info = self.transmit("ping")
+ return RC(rc, info)
+
+
+ def sync_with_server(self, sync_streams=False):
+ rc, data = self.transmit("sync_user", {"user": self.user, "sync_streams": sync_streams})
+ if not rc:
+ return RC_ERR(data)
+
+ for port_info in data:
+
+ rc = self.ports[port_info['port_id']].sync(port_info)
+ if rc.bad():
+ return rc
+
+ return RC_OK
+
+
+
+ ########## port commands ##############
+ # acquire ports, if port_list is none - get all
+ def acquire (self, port_id_list = None, force = False):
+ port_id_list = self.__ports(port_id_list)
+
+ rc_list = RC_LIST()
+
+ for port_id in port_id_list:
+ rc = self.ports[port_id].acquire(force)
+ rc_list.add(rc)
+
+ return rc_list
+
+ # release ports
+ def release (self, port_id_list = None):
+ port_id_list = self.__ports(port_id_list)
+
+ rc_list = RC_LIST()
+
+ for port_id in port_id_list:
+ rc, msg = self.ports[port_id].release(force)
+ rc_list.add(rc)
+
+ return rc_list
+
+
+ def add_stream(self, stream_id, stream_obj, port_id_list = None):
assert isinstance(stream_obj, CStream)
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id,
- "stream_id": stream_id,
- "stream": stream_obj.dump()}
- return self.transmit("add_stream", params)
- @acquired
- def add_stream_pack(self, stream_pack_list, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
+ port_id_list = self.__ports(port_id_list)
+
+ rc_list = RC_LIST()
+
+ for port_id in port_id_list:
+ rc = self.ports[port_id].add_stream(stream_id, stream_obj)
+ rc_list.add(rc)
+
+ return rc_list
+
+
+ def add_stream_pack(self, stream_pack_list, port_id_list = None):
+
+ port_id_list = self.__ports(port_id_list)
+
+ rc_list = RC_LIST()
- # since almost every run contains more than one transaction with server, handle all as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = []
for stream_pack in stream_pack_list:
- commands.extend([RpcCmdData("add_stream", {"port_id": p_id,
- "handler": self._conn_handler.get(p_id),
- "stream_id": stream_pack.stream_id,
- "stream": stream_pack.stream}
- )
- for p_id in port_ids]
- )
- res_ok, resp_list = self.transmit_batch(commands)
- if not res_ok:
- return res_ok, resp_list
-
- return self._process_batch_result(commands, resp_list, self._handle_add_stream_response,
- success_test=self.ack_success_test)
-
- @force_status(owned=True)
- def remove_stream(self, stream_id, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id,
- "stream_id": stream_id}
- return self.transmit("remove_stream", params)
+ rc = self.add_stream(stream_pack.stream_id, stream_pack.stream, port_id_list)
+ rc_list.add(rc)
+
+ return rc_list
- def remove_all_streams(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("remove_all_streams", {"port_id": p_id, "handler": self._conn_handler.get(p_id)})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- return self._process_batch_result(commands, resp_list, self._handle_remove_streams_response,
- success_test=self.ack_success_test)
- else:
- params = {"port_id": port_id,
- "handler": self._conn_handler.get(port_id)}
- command = RpcCmdData("remove_all_streams", params)
- return self._handle_remove_streams_response(command,
- self.transmit(command.method, command.params),
- self.ack_success_test)
- pass
- @force_status(owned=True)#, active_and_owned=True)
- def get_all_streams(self, port_id, get_pkt = False):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id,
- "get_pkt": get_pkt}
- return self.transmit("get_all_streams", params)
+ def remove_stream(self, stream_id, port_id_list = None):
+ port_id_list = self.__ports(port_id_list)
+
+ rc_list = RC_LIST()
+
+ for port_id in port_id_list:
+ rc = self.ports[port_id].remove_stream(stream_id)
+ rc_list.add(rc)
+
+ return rc_list
+
- @force_status(owned=True)#, active_and_owned=True)
- def get_stream_id_list(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- return self.transmit("get_stream_list", params)
- @force_status(owned=True, active_and_owned=True)
+ def remove_all_streams(self, port_id_list = None):
+ port_id_list = self.__ports(port_id_list)
+
+ rc_list = RC_LIST()
+
+ for port_id in port_id_list:
+ rc = self.ports[port_id].remove_all_streams()
+ rc_list.add(rc)
+
+ return rc_list
+
+
def get_stream(self, stream_id, port_id, get_pkt = False):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id,
- "stream_id": stream_id,
- "get_pkt": get_pkt}
- return self.transmit("get_stream_list", params)
- def start_traffic(self, multiplier, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- commands = [RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id),
- "port_id": p_id,
- "mul": multiplier})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- return self._process_batch_result(commands, resp_list, self._handle_start_traffic_response,
- success_test=self.ack_success_test)
- else:
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id,
- "mul": multiplier}
- command = RpcCmdData("start_traffic", params)
- return self._handle_start_traffic_response(command,
- self.transmit(command.method, command.params),
- self.ack_success_test)
-
- def stop_traffic(self, port_id=None):
- if not self._is_ports_valid(port_id):
- raise ValueError("Provided illegal port id input")
- if isinstance(port_id, list) or isinstance(port_id, set):
- # handle as batch mode
- port_ids = set(port_id) # convert to set to avoid duplications
- if not port_ids:
- # don't invoke if port ids is empty
- return True, []
- commands = [RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
- for p_id in port_ids]
- rc, resp_list = self.transmit_batch(commands)
- if rc:
- return self._process_batch_result(commands, resp_list, self._handle_stop_traffic_response,
- success_test=self.ack_success_test)
- else:
- params = {"handler": self._conn_handler.get(port_id),
- "port_id": port_id}
- command = RpcCmdData("stop_traffic", params)
- return self._handle_stop_traffic_response(command,
- self.transmit(command.method, command.params),
- self.ack_success_test)
+ return self.ports[port_id].get_stream(stream_id)
+
+
+ def get_all_streams(self, port_id, get_pkt = False):
+
+ return self.ports[port_id].get_all_streams()
+
+
+ def get_stream_id_list(self, port_id):
+
+ return self.ports[port_id].get_stream_id_list()
+
+
+ def start_traffic (self, multiplier, port_id_list = None):
+
+ port_id_list = self.__ports(port_id_list)
+
+ rc_list = RC_LIST()
+
+ for port_id in port_id_list:
+ rc = self.ports[port_id].start(multiplier)
+ rc_list.add(rc)
+
+ return rc_list
+
+
+
+ def stop_traffic (self, port_id_list = None):
+
+ port_id_list = self.__ports(port_id_list)
+
+ rc_list = RC_LIST()
+
+ for port_id in port_id_list:
+ rc = self.ports[port_id].stop()
+ rc_list.add(rc)
+
+ return rc_list
-# def get_global_stats(self):
-# command = RpcCmdData("get_global_stats", {})
-# return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params))
-# # return self.transmit("get_global_stats")
- @force_status(owned=True, active_and_owned=True)
def get_port_stats(self, port_id=None):
if not self._is_ports_valid(port_id):
raise ValueError("Provided illegal port id input")
@@ -393,7 +538,6 @@ class CTRexStatelessClient(object):
command = RpcCmdData("get_port_stats", params)
return self._handle_get_port_stats_response(command, self.transmit(command.method, command.params))
- @force_status(owned=True, active_and_owned=True)
def get_stream_stats(self, port_id=None):
if not self._is_ports_valid(port_id):
raise ValueError("Provided illegal port id input")
@@ -411,26 +555,6 @@ class CTRexStatelessClient(object):
command = RpcCmdData("get_stream_stats", params)
return self._handle_get_stream_stats_response(command, self.transmit(command.method, command.params))
- # ----- internal methods ----- #
- def _init_sync(self):
- # get server version and system info
- err = False
- if self.server_version == "Unknown" or self.system_info == "Unknown":
- self.disconnect()
- return False, self.__err_log
- # sync with previous session
- res_ok, port_info = self.sync_user()
- if not res_ok:
- self.disconnect()
- return False, port_info
- else:
- # handle sync data
- for port in port_info:
- self._conn_handler[port.get("port_id")] = port.get("handler")
- if port.get("state") == "transmitting":
- # port is active
- self._active_ports.add(port.get("port_id"))
- return True, ""
def transmit(self, method_name, params={}):
@@ -439,17 +563,6 @@ class CTRexStatelessClient(object):
def transmit_batch(self, batch_list):
return self.comm_link.transmit_batch(batch_list)
- @staticmethod
- def _object_decoder(obj_type, obj_data):
- if obj_type == "global":
- return CGlobalStats(**obj_data)
- elif obj_type == "port":
- return CPortStats(**obj_data)
- elif obj_type == "stream":
- return CStreamStats(**obj_data)
- else:
- # Do not serialize the data into class
- return obj_data
@staticmethod
def default_success_test(result_obj):
@@ -474,35 +587,37 @@ class CTRexStatelessClient(object):
#
def cmd_reset (self, annotate_func):
- ports = self.get_port_ids()
# sync with the server
- rc, log = self._init_sync()
- annotate_func("Syncing with the server:", rc, log)
- if not rc:
- return False
-
+ rc = self.sync_with_server()
+ annotate_func("Syncing with the server:", rc.good(), rc.err())
+ if rc.bad():
+ return rc
# force acquire all ports
- rc, log = self.acquire(ports, force = True)
- annotate_func("Force acquiring all ports:", rc, log)
- if not rc:
- return False
+ rc = self.acquire(force = True)
+ annotate_func("Force acquiring all ports:", rc.good(), rc.err())
+ if rc.bad():
+ return rc
- # force stop
- rc, log = self.stop_traffic(ports)
- annotate_func("Stop traffic on all ports:", rc, log)
- if not rc:
- return False
+
+ # force stop all ports
+ port_id_list = self.get_active_ports()
+ rc = self.stop_traffic(port_id_list)
+ annotate_func("Stop traffic on all ports:", rc.good(), rc.err())
+ if rc.bad():
+ return rc
+
+ return
# remove all streams
- rc, log = self.remove_all_streams(ports)
- annotate_func("Removing all streams from all ports:", rc, log)
- if not rc:
- return False
+ rc = self.remove_all_streams(ports)
+ annotate_func("Removing all streams from all ports:", rc.good(), rc.err())
+ if rc.bad():
+ return rc
# TODO: clear stats
- return True
+ return RC_OK
# stop cmd
@@ -511,7 +626,7 @@ class CTRexStatelessClient(object):
# find the relveant ports
active_ports = set(self.get_active_ports()).intersection(ports)
if not active_ports:
- annotate_func("No active traffic on porivded ports")
+ annotate_func("No active traffic on porvided ports")
return True
rc, log = self.stop_traffic(active_ports)
@@ -524,7 +639,7 @@ class CTRexStatelessClient(object):
# start cmd
def cmd_start (self, ports, stream_list, mult, force, annotate_func):
- if force:
+ if (force and set(self.get_active_ports()).intersection(ports)):
rc = self.cmd_stop(ports, annotate_func)
if not rc:
return False
@@ -626,15 +741,6 @@ class CTRexStatelessClient(object):
else:
return False
- def _is_ports_valid(self, port_id):
- if isinstance(port_id, list) or isinstance(port_id, set):
- # check each item of the sequence
- return all([self._is_ports_valid(port)
- for port in port_id])
- elif (isinstance(port_id, int)) and (port_id >= 0) and (port_id <= self.get_port_count()):
- return True
- else:
- return False
def _process_batch_result(self, req_list, resp_list, handler_func=None, success_test=default_success_test):
res_ok = True