diff options
Diffstat (limited to 'scripts/automation/trex_control_plane/client')
4 files changed, 525 insertions, 82 deletions
diff --git a/scripts/automation/trex_control_plane/client/outer_packages.py b/scripts/automation/trex_control_plane/client/outer_packages.py index 5facad20..206d4b4c 100755 --- a/scripts/automation/trex_control_plane/client/outer_packages.py +++ b/scripts/automation/trex_control_plane/client/outer_packages.py @@ -1,7 +1,6 @@ #!/router/bin/python
import sys
-import site
import os
CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
@@ -25,6 +24,6 @@ def import_module_list(modules_list): for p in modules_list:
full_path = os.path.join(PATH_TO_PYTHON_LIB, p)
fix_path = os.path.normcase(full_path) # (CURRENT_PATH+p)
- site.addsitedir(full_path)
+ sys.path.insert(1, full_path)
import_client_modules()
diff --git a/scripts/automation/trex_control_plane/client/trex_adv_client.py b/scripts/automation/trex_control_plane/client/trex_adv_client.py index b3fe3dad..bf7ccf58 100755 --- a/scripts/automation/trex_control_plane/client/trex_adv_client.py +++ b/scripts/automation/trex_control_plane/client/trex_adv_client.py @@ -8,7 +8,7 @@ class CTRexAdvClient(trex_client.CTRexClient): super(CTRexAdvClient, self).__init__(trex_host, max_history_size, trex_daemon_port, trex_zmq_port, verbose) pass - # T-REX KIWI advanced methods + # TRex KIWI advanced methods def start_quick_trex(self, pcap_file, d, delay, dual, ipv6, times, interfaces): try: return self.server.start_quick_trex(pcap_file = pcap_file, duration = d, dual = dual, delay = delay, ipv6 = ipv6, times = times, interfaces = interfaces) diff --git a/scripts/automation/trex_control_plane/client/trex_client.py b/scripts/automation/trex_control_plane/client/trex_client.py index 56775766..c3677132 100755 --- a/scripts/automation/trex_control_plane/client/trex_client.py +++ b/scripts/automation/trex_control_plane/client/trex_client.py @@ -30,18 +30,18 @@ from distutils.util import strtobool class CTRexClient(object): """ - This class defines the client side of the RESTfull interaction with T-Rex + This class defines the client side of the RESTfull interaction with TRex """ def __init__(self, trex_host, max_history_size = 100, trex_daemon_port = 8090, trex_zmq_port = 4500, verbose = False): """ - Instantiate a T-Rex client object, and connecting it to listening daemon-server + Instantiate a TRex client object, and connecting it to listening daemon-server :parameters: trex_host : str - a string of the t-rex ip address or hostname. + a string of the TRex ip address or hostname. max_history_size : int - a number to set the maximum history size of a single T-Rex run. Each sampling adds a new item to history. + a number to set the maximum history size of a single TRex run. Each sampling adds a new item to history. default value : **100** trex_daemon_port : int @@ -69,7 +69,7 @@ class CTRexClient(object): self.result_obj = CTRexResult(max_history_size) self.decoder = JSONDecoder() self.trex_server_path = "http://{hostname}:{port}/".format( hostname = trex_host, port = trex_daemon_port ) - self.__verbose_print("Connecting to T-Rex @ {trex_path} ...".format( trex_path = self.trex_server_path ) ) + self.__verbose_print("Connecting to TRex @ {trex_path} ...".format( trex_path = self.trex_server_path ) ) self.history = jsonrpclib.history.History() self.server = jsonrpclib.Server(self.trex_server_path, history = self.history) self.check_server_connectivity() @@ -90,7 +90,7 @@ class CTRexClient(object): def start_trex (self, f, d, block_to_success = True, timeout = 30, user = None, **trex_cmd_options): """ - Request to start a T-Rex run on server. + Request to start a TRex run on server. :parameters: f : str @@ -98,17 +98,17 @@ class CTRexClient(object): d : int the desired duration of the test. must be at least 30 seconds long. block_to_success : bool - determine if this method blocks until T-Rex changes state from 'Starting' to either 'Idle' or 'Running' + determine if this method blocks until TRex changes state from 'Starting' to either 'Idle' or 'Running' default value : **True** timeout : int - maximum time (in seconds) to wait in blocking state until T-Rex changes state from 'Starting' to either 'Idle' or 'Running' + maximum time (in seconds) to wait in blocking state until TRex changes state from 'Starting' to either 'Idle' or 'Running' default value: **30** user : str the identity of the the run issuer. trex_cmd_options : key, val - sets desired T-Rex options using key=val syntax, separated by comma. + sets desired TRex options using key=val syntax, separated by comma. for keys with no value, state key=True :return: @@ -117,8 +117,8 @@ class CTRexClient(object): :raises: + :exc:`ValueError`, in case 'd' parameter inserted with wrong value. + :exc:`trex_exceptions.TRexError`, in case one of the trex_cmd_options raised an exception at server. - + :exc:`trex_exceptions.TRexInUseError`, in case T-Rex is already taken. - + :exc:`trex_exceptions.TRexRequestDenied`, in case T-Rex is reserved for another user than the one trying start T-Rex. + + :exc:`trex_exceptions.TRexInUseError`, in case TRex is already taken. + + :exc:`trex_exceptions.TRexRequestDenied`, in case TRex is reserved for another user than the one trying start TRex. + ProtocolError, in case of error in JSON-RPC protocol. """ @@ -128,7 +128,7 @@ class CTRexClient(object): if d < 30: # specify a test should take at least 30 seconds long. raise ValueError except ValueError: - raise ValueError('d parameter must be integer, specifying how long T-Rex run, and must be larger than 30 secs.') + raise ValueError('d parameter must be integer, specifying how long TRex run, and must be larger than 30 secs.') trex_cmd_options.update( {'f' : f, 'd' : d} ) @@ -146,25 +146,25 @@ class CTRexClient(object): if retval!=0: self.seq = retval # update seq num only on successful submission return True - else: # T-Rex is has been started by another user - raise TRexInUseError('T-Rex is already being used by another user or process. Try again once T-Rex is back in IDLE state.') + else: # TRex is has been started by another user + raise TRexInUseError('TRex is already being used by another user or process. Try again once TRex is back in IDLE state.') def stop_trex (self): """ - Request to stop a T-Rex run on server. + Request to stop a TRex run on server. - The request is only valid if the stop initiator is the same client as the T-Rex run initiator. + The request is only valid if the stop initiator is the same client as the TRex run initiator. :parameters: None :return: + **True** on successful termination - + **False** if request issued but T-Rex wasn't running. + + **False** if request issued but TRex wasn't running. :raises: - + :exc:`trex_exceptions.TRexRequestDenied`, in case T-Rex ir running but started by another user. - + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed T-Rex run (unexpected termination). + + :exc:`trex_exceptions.TRexRequestDenied`, in case TRex ir running but started by another user. + + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed TRex run (unexpected termination). + ProtocolError, in case of error in JSON-RPC protocol. """ @@ -179,16 +179,16 @@ class CTRexClient(object): def force_kill (self, confirm = True): """ - Force killing of running T-Rex process (if exists) on the server. + Force killing of running TRex process (if exists) on the server. .. tip:: This method is a safety method and **overrides any running or reserved resources**, and as such isn't designed to be used on a regular basis. Always consider using :func:`trex_client.CTRexClient.stop_trex` instead. - In the end of this method, T-Rex will return to IDLE state with no reservation. + In the end of this method, TRex will return to IDLE state with no reservation. :parameters: confirm : bool - Prompt a user confirmation before continue terminating T-Rex session + Prompt a user confirmation before continue terminating TRex session :return: + **True** on successful termination @@ -199,7 +199,7 @@ class CTRexClient(object): """ if confirm: - prompt = "WARNING: This will terminate active T-Rex session indiscriminately.\nAre you sure? " + prompt = "WARNING: This will terminate active TRex session indiscriminately.\nAre you sure? " sys.stdout.write('%s [y/n]\n' % prompt) while True: try: @@ -221,20 +221,20 @@ class CTRexClient(object): def wait_until_kickoff_finish(self, timeout = 40): """ - Block the client application until T-Rex changes state from 'Starting' to either 'Idle' or 'Running' + Block the client application until TRex changes state from 'Starting' to either 'Idle' or 'Running' - The request is only valid if the stop initiator is the same client as the T-Rex run initiator. + The request is only valid if the stop initiator is the same client as the TRex run initiator. :parameters: timeout : int - maximum time (in seconds) to wait in blocking state until T-Rex changes state from 'Starting' to either 'Idle' or 'Running' + maximum time (in seconds) to wait in blocking state until TRex changes state from 'Starting' to either 'Idle' or 'Running' :return: + **True** on successful termination - + **False** if request issued but T-Rex wasn't running. + + **False** if request issued but TRex wasn't running. :raises: - + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed T-Rex run (unexpected termination). + + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed TRex run (unexpected termination). + ProtocolError, in case of error in JSON-RPC protocol. .. note:: Exceptions are throws only when start_trex did not block in the first place, i.e. `block_to_success` parameter was set to `False` @@ -252,22 +252,22 @@ class CTRexClient(object): def is_running (self, dump_out = False): """ - Poll for T-Rex running status. + Poll for TRex running status. - If T-Rex is running, a history item will be added into result_obj and processed. + If TRex is running, a history item will be added into result_obj and processed. - .. tip:: This method is especially useful for iterating until T-Rex run is finished. + .. tip:: This method is especially useful for iterating until TRex run is finished. :parameters: dump_out : dict if passed, the pointer object is cleared and the latest dump stored in it. :return: - + **True** if T-Rex is running. - + **False** if T-Rex is not running. + + **True** if TRex is running. + + **False** if TRex is not running. :raises: - + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed T-Rex run (unexpected termination). + + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed TRex run (unexpected termination). + :exc:`TypeError`, in case JSON stream decoding error. + ProtocolError, in case of error in JSON-RPC protocol. @@ -292,7 +292,7 @@ class CTRexClient(object): def get_trex_files_path (self): """ - Fetches the local path in which files are stored when pushed to t-rex server from client. + Fetches the local path in which files are stored when pushed to TRex server from client. :parameters: None @@ -300,7 +300,7 @@ class CTRexClient(object): :return: string representation of the desired path - .. note:: The returned path represents a path on the T-Rex server **local machine** + .. note:: The returned path represents a path on the TRex server **local machine** :raises: ProtocolError, in case of error in JSON-RPC protocol. @@ -317,7 +317,7 @@ class CTRexClient(object): def get_running_status (self): """ - Fetches the current T-Rex status. + Fetches the current TRex status. If available, a verbose data will accompany the state itself. @@ -344,18 +344,18 @@ class CTRexClient(object): def get_running_info (self): """ - Performs single poll of T-Rex running data and process it into the result object (named `result_obj`). + Performs single poll of TRex running data and process it into the result object (named `result_obj`). - .. tip:: This method will throw an exception if T-Rex isn't running. Always consider using :func:`trex_client.CTRexClient.is_running` which handles a single poll operation in safer manner. + .. tip:: This method will throw an exception if TRex isn't running. Always consider using :func:`trex_client.CTRexClient.is_running` which handles a single poll operation in safer manner. :parameters: None :return: - dictionary containing the most updated data dump from T-Rex. + dictionary containing the most updated data dump from TRex. :raises: - + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed T-Rex run (unexpected termination). + + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed TRex run (unexpected termination). + :exc:`TypeError`, in case JSON stream decoding error. + ProtocolError, in case of error in JSON-RPC protocol. @@ -379,7 +379,7 @@ class CTRexClient(object): def sample_until_condition (self, condition_func, time_between_samples = 5): """ - Automatically sets ongoing sampling of T-Rex data, with sampling rate described by time_between_samples. + Automatically sets ongoing sampling of TRex data, with sampling rate described by time_between_samples. On each fetched dump, the condition_func is applied on the result objects, and if returns True, the sampling will stop. @@ -394,36 +394,36 @@ class CTRexClient(object): default value : **5** :return: - the first result object (see :class:`CTRexResult` for further details) of the T-Rex run on which the condition has been met. + the first result object (see :class:`CTRexResult` for further details) of the TRex run on which the condition has been met. :raises: + :exc:`UserWarning`, in case the condition_func method condition hasn't been met - + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed T-Rex run (unexpected termination). + + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed TRex run (unexpected termination). + :exc:`TypeError`, in case JSON stream decoding error. + ProtocolError, in case of error in JSON-RPC protocol. + :exc:`Exception`, in case the condition_func suffered from any kind of exception """ - # make sure T-Rex is running. raise exceptions here if any + # make sure TRex is running. raise exceptions here if any self.wait_until_kickoff_finish() try: while self.is_running(): results = self.get_result_obj() if condition_func(results): - # if condition satisfied, stop T-Rex and return result object + # if condition satisfied, stop TRex and return result object self.stop_trex() return results time.sleep(time_between_samples) except TRexWarning: # means we're back to Idle state, and didn't meet our condition - raise UserWarning("T-Rex results condition wasn't met during T-Rex run.") + raise UserWarning("TRex results condition wasn't met during TRex run.") except Exception: # this could come from provided method 'condition_func' raise def sample_to_run_finish (self, time_between_samples = 5): """ - Automatically sets automatically sampling of T-Rex data with sampling rate described by time_between_samples until T-Rex run finished. + Automatically sets automatically sampling of TRex data with sampling rate described by time_between_samples until TRex run finished. :parameters: time_between_samples : int @@ -436,7 +436,7 @@ class CTRexClient(object): :raises: + :exc:`UserWarning`, in case the condition_func method condition hasn't been met - + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed T-Rex run (unexpected termination). + + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed TRex run (unexpected termination). + :exc:`TypeError`, in case JSON stream decoding error. + ProtocolError, in case of error in JSON-RPC protocol. @@ -475,13 +475,13 @@ class CTRexClient(object): def is_reserved (self): """ - Checks if T-Rex is currently reserved to any user or not. + Checks if TRex is currently reserved to any user or not. :parameters: None :return: - + **True** if T-Rex is reserved. + + **True** if TRex is reserved. + **False** otherwise. :raises: @@ -499,13 +499,13 @@ class CTRexClient(object): def reserve_trex (self, user = None): """ - Reserves the usage of T-Rex to a certain user. + Reserves the usage of TRex to a certain user. - When T-Rex is reserved, it can't be reserved. + When TRex is reserved, it can't be reserved. :parameters: user : str - a username of the desired owner of T-Rex + a username of the desired owner of TRex default: current logged user @@ -513,8 +513,8 @@ class CTRexClient(object): **True** if reservation made successfully :raises: - + :exc:`trex_exceptions.TRexRequestDenied`, in case T-Rex is reserved for another user than the one trying to make the reservation. - + :exc:`trex_exceptions.TRexInUseError`, in case T-Rex is currently running. + + :exc:`trex_exceptions.TRexRequestDenied`, in case TRex is reserved for another user than the one trying to make the reservation. + + :exc:`trex_exceptions.TRexInUseError`, in case TRex is currently running. + ProtocolError, in case of error in JSON-RPC protocol. """ @@ -530,14 +530,14 @@ class CTRexClient(object): def cancel_reservation (self, user = None): """ - Cancels a current reservation of T-Rex to a certain user. + Cancels a current reservation of TRex to a certain user. - When T-Rex is reserved, no other user can start new T-Rex runs. + When TRex is reserved, no other user can start new TRex runs. :parameters: user : str - a username of the desired owner of T-Rex + a username of the desired owner of TRex default: current logged user @@ -546,7 +546,7 @@ class CTRexClient(object): + **False** if there was no reservation at all. :raises: - + :exc:`trex_exceptions.TRexRequestDenied`, in case T-Rex is reserved for another user than the one trying to cancel the reservation. + + :exc:`trex_exceptions.TRexRequestDenied`, in case TRex is reserved for another user than the one trying to cancel the reservation. + ProtocolError, in case of error in JSON-RPC protocol. """ @@ -627,7 +627,7 @@ class CTRexClient(object): return method_to_call() except socket.error as e: if e.errno == errno.ECONNREFUSED: - raise SocketError(errno.ECONNREFUSED, "Connection from T-Rex server was refused. Please make sure the server is up.") + raise SocketError(errno.ECONNREFUSED, "Connection from TRex server was refused. Please make sure the server is up.") def check_server_connectivity (self): """ @@ -640,7 +640,7 @@ class CTRexClient(object): raise socket.gaierror(e.errno, "Could not resolve server hostname. Please make sure hostname entered correctly.") except socket.error as e: if e.errno == errno.ECONNREFUSED: - raise socket.error(errno.ECONNREFUSED, "Connection from T-Rex server was refused. Please make sure the server is up.") + raise socket.error(errno.ECONNREFUSED, "Connection from TRex server was refused. Please make sure the server is up.") finally: self.prompt_verbose_data() @@ -671,7 +671,7 @@ class CTRexClient(object): def _handle_AppError_exception(self, err): """ - This private method triggres the T-Rex dedicated exception generation in case a general ProtocolError has been raised. + This private method triggres the TRex dedicated exception generation in case a general ProtocolError has been raised. """ # handle known exceptions based on known error codes. # if error code is not known, raise ProtocolError @@ -680,17 +680,17 @@ class CTRexClient(object): class CTRexResult(object): """ - A class containing all results received from T-Rex. + A class containing all results received from TRex. Ontop to containing the results, this class offers easier data access and extended results processing options """ def __init__(self, max_history_size): """ - Instatiate a T-Rex result object + Instatiate a TRex result object :parameters: max_history_size : int - a number to set the maximum history size of a single T-Rex run. Each sampling adds a new item to history. + a number to set the maximum history size of a single TRex run. Each sampling adds a new item to history. """ self._history = deque(maxlen = max_history_size) @@ -749,7 +749,7 @@ class CTRexResult(object): def get_avg_latency (self): """ - Fetches the average latency measured on each of the interfaces from the start of T-Rex run + Fetches the average latency measured on each of the interfaces from the start of TRex run :parameters: None @@ -779,7 +779,7 @@ class CTRexResult(object): def get_total_drops (self): """ - Fetches the total number of drops identified from the moment T-Rex run began. + Fetches the total number of drops identified from the moment TRex run began. :parameters: None @@ -835,7 +835,7 @@ class CTRexResult(object): def is_done_warmup (self): """ - Checks if T-Rex latest results TX-rate indicates that T-Rex has reached its expected TX-rate. + Checks if TRex latest results TX-rate indicates that TRex has reached its expected TX-rate. :parameters: None @@ -856,7 +856,7 @@ class CTRexResult(object): defines a path to desired data. .. tip:: | Use '.' to enter one level deeper in dictionary hierarchy. - | Use '[i]' to access the i'th indexed obejct of an array. + | Use '[i]' to access the i'th indexed object of an array. tree_path_to_key : regex apply a regex to filter results out from a multiple results set. diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 5513f420..334496d1 100644..100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -6,22 +6,314 @@ try: except ImportError: # support import for Python 3 import client.outer_packages -from client_utils.jsonrpc_client import JsonRpcClient - +from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage +from client_utils.packet_builder import CTRexPktBuilder +import json +from common.trex_stats import * +from collections import namedtuple class CTRexStatelessClient(object): """docstring for CTRexStatelessClient""" - def __init__(self, server="localhost", port=5050, virtual=False): + RpcCmdData = namedtuple('RpcCmdData', ['method', 'params']) + + def __init__(self, username, server="localhost", port=5050, virtual=False): super(CTRexStatelessClient, self).__init__() + self.user = username self.tx_link = CTRexStatelessClient.CTxLink(server, port, virtual) + self._conn_handler = {} + self._active_ports = set() + self._stats = CTRexStatsManager("port", "stream") + self._system_info = None + + # ----- decorator methods ----- # + def force_status(owned=True, active_and_owned=False): + def wrapper(func): + def wrapper_f(self, *args, **kwargs): + port_ids = kwargs.get("port_id") + if isinstance(port_ids, int): + # make sure port_ids is a list + port_ids = [port_ids] + bad_ids = set() + for port_id in port_ids: + port_owned = self._conn_handler.get(kwargs.get(port_id)) + if owned and not port_owned: + bad_ids.add(port_ids) + elif active_and_owned: # stronger condition than just owned, hence gets precedence + if port_owned and port_id in self._active_ports: + continue + else: + bad_ids.add(port_ids) + else: + continue + if bad_ids: + # Some port IDs are not according to desires status + raise RuntimeError("The requested method ('{0}') cannot be invoked since port IDs {1} are not" + "at allowed stated".format(func.__name__)) + else: + func(self, *args, **kwargs) + return wrapper_f + return wrapper + + @property + def system_info(self): + if not self._system_info: + self._system_info = self.get_system_info() + return self._system_info + + # ----- user-access methods ----- # + def ping(self): + return self.transmit("ping") + + def get_supported_cmds(self): + return self.transmit("get_supported_cmds") + + def get_version(self): + return self.transmit("get_version") + + def get_system_info(self): + return self.transmit("get_system_info") + + def get_port_count(self): + return self.system_info.get("port_count") + + def acquire(self, port_id, force=False): + if not self._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") + if isinstance(port_id, list) or isinstance(port_id, set): + # handle as batch mode + port_ids = set(port_id) # convert to set to avoid duplications + commands = [self.RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force}) + for p_id in port_ids] + rc, resp_list = self.transmit_batch(commands) + if rc: + self._process_batch_result(commands, resp_list, self._handle_acquire_response) + else: + params = {"port_id": port_id, + "user": self.user, + "force": force} + command = self.RpcCmdData("acquire", params) + self._handle_acquire_response(command, self.transmit(command.method, command.params)) + return self._conn_handler.get(port_id) + + @force_status(owned=True) + def release(self, port_id=None): + if not self._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") + if isinstance(port_id, list) or isinstance(port_id, set): + # handle as batch mode + port_ids = set(port_id) # convert to set to avoid duplications + commands = [self.RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + for p_id in port_ids] + rc, resp_list = self.transmit_batch(commands) + if rc: + self._process_batch_result(commands, resp_list, self._handle_release_response) + else: + self._conn_handler.pop(port_id) + params = {"handler": self._conn_handler.get(port_id), + "port_id": port_id} + command = self.RpcCmdData("release", params) + self._handle_release_response(command, self.transmit(command.method, command.params)) + return + + @force_status(owned=True) + def add_stream(self, stream_id, stream_obj, port_id=None): + if not self._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") + assert isinstance(stream_obj, CStream) + params = {"handler": self._conn_handler.get(port_id), + "port_id": port_id, + "stream_id": stream_id, + "stream": stream_obj.dump()} + return self.transmit("add_stream", params) + + @force_status(owned=True) + def remove_stream(self, stream_id, port_id=None): + if not self._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") + params = {"handler": self._conn_handler.get(port_id), + "port_id": port_id, + "stream_id": stream_id} + return self.transmit("remove_stream", params) + + @force_status(owned=True, active_and_owned=True) + def get_stream_id_list(self, port_id=None): + if not self._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") + params = {"handler": self._conn_handler.get(port_id), + "port_id": port_id} + return self.transmit("get_stream_list", params) + + @force_status(owned=True, active_and_owned=True) + def get_stream(self, stream_id, port_id=None): + if not self._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") + params = {"handler": self._conn_handler.get(port_id), + "port_id": port_id, + "stream_id": stream_id} + return self.transmit("get_stream_list", params) + @force_status(owned=True) + def start_traffic(self, port_id=None): + if not self._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") + if isinstance(port_id, list) or isinstance(port_id, set): + # handle as batch mode + port_ids = set(port_id) # convert to set to avoid duplications + commands = [self.RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + for p_id in port_ids] + rc, resp_list = self.transmit_batch(commands) + if rc: + self._process_batch_result(commands, resp_list, self._handle_start_traffic_response) + else: + params = {"handler": self._conn_handler.get(port_id), + "port_id": port_id} + command = self.RpcCmdData("start_traffic", params) + self._handle_start_traffic_response(command, self.transmit(command.method, command.params)) + return - def transmit(self, method_name, params = {}): + @force_status(owned=False, active_and_owned=True) + def stop_traffic(self, port_id=None): + if not self._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") + if isinstance(port_id, list) or isinstance(port_id, set): + # handle as batch mode + port_ids = set(port_id) # convert to set to avoid duplications + commands = [self.RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + for p_id in port_ids] + rc, resp_list = self.transmit_batch(commands) + if rc: + self._process_batch_result(commands, resp_list, self._handle_stop_traffic_response) + else: + params = {"handler": self._conn_handler.get(port_id), + "port_id": port_id} + command = self.RpcCmdData("stop_traffic", params) + self._handle_start_traffic_response(command, self.transmit(command.method, command.params)) + return + + def get_global_stats(self): + command = self.RpcCmdData("get_global_stats", {}) + return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params)) + # return self.transmit("get_global_stats") + + @force_status(owned=True, active_and_owned=True) + def get_port_stats(self, port_id=None): + if not self._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") + if isinstance(port_id, list) or isinstance(port_id, set): + # handle as batch mode + port_ids = set(port_id) # convert to set to avoid duplications + commands = [self.RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + for p_id in port_ids] + rc, resp_list = self.transmit_batch(commands) + if rc: + self._process_batch_result(commands, resp_list, self._handle_get_port_stats_response) + else: + params = {"handler": self._conn_handler.get(port_id), + "port_id": port_id} + command = self.RpcCmdData("get_port_stats", params) + return self._handle_get_port_stats_response(command, self.transmit(command.method, command.params)) + + @force_status(owned=True, active_and_owned=True) + def get_stream_stats(self, port_id=None): + if not self._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") + if isinstance(port_id, list) or isinstance(port_id, set): + # handle as batch mode + port_ids = set(port_id) # convert to set to avoid duplications + commands = [self.RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + for p_id in port_ids] + rc, resp_list = self.transmit_batch(commands) + if rc: + self._process_batch_result(commands, resp_list, self._handle_get_stream_stats_response) + else: + params = {"handler": self._conn_handler.get(port_id), + "port_id": port_id} + command = self.RpcCmdData("get_stream_stats", params) + return self._handle_get_stream_stats_response(command, self.transmit(command.method, command.params)) + + # ----- internal methods ----- # + def transmit(self, method_name, params={}): return self.tx_link.transmit(method_name, params) + def transmit_batch(self, batch_list): + return self.tx_link.transmit_batch(batch_list) + + @staticmethod + def _object_decoder(obj_type, obj_data): + if obj_type == "global": + return CGlobalStats(**obj_data) + elif obj_type == "port": + return CPortStats(**obj_data) + elif obj_type == "stream": + return CStreamStats(**obj_data) + else: + # Do not serialize the data into class + return obj_data + + @staticmethod + def default_success_test(result_obj): + if result_obj.success: + return True + else: + return False + + # ----- handler internal methods ----- # + def _handle_acquire_response(self, request, response): + if response.success: + self._conn_handler[request.get("port_id")] = response.data + + def _handle_release_response(self, request, response): + if response.success: + del self._conn_handler[request.get("port_id")] + def _handle_start_traffic_response(self, request, response): + if response.success: + self._active_ports.add(request.get("port_id")) + + def _handle_stop_traffic_response(self, request, response): + if response.success: + self._active_ports.remove(request.get("port_id")) + + def _handle_get_global_stats_response(self, request, response): + if response.success: + return CGlobalStats(**response.success) + else: + return False + + def _handle_get_port_stats_response(self, request, response): + if response.success: + return CPortStats(**response.success) + else: + return False + + def _handle_get_stream_stats_response(self, request, response): + if response.success: + return CStreamStats(**response.success) + else: + return False + + def _is_ports_valid(self, port_id): + if isinstance(port_id, list) or isinstance(port_id, set): + # check each item of the sequence + return all([self._is_ports_valid(port) + for port in port_id]) + elif (isinstance(port_id, int)) and (port_id > 0) and (port_id <= self.get_port_count()): + return True + else: + return False + + def _process_batch_result(self, req_list, resp_list, handler_func=None, success_test=default_success_test): + for i, response in enumerate(resp_list): + # testing each result with success test so that a conclusion report could be deployed in future. + if success_test(response): + # run handler method with its params + handler_func(req_list[i], response) + else: + continue # TODO: mark in this case somehow the bad result + + # ------ private classes ------ # class CTxLink(object): """describes the connectivity of the stateless client method""" def __init__(self, server="localhost", port=5050, virtual=False): @@ -33,18 +325,170 @@ class CTRexStatelessClient(object): if not self.virtual: self.rpc_link.connect() - def transmit(self, method_name, params = {}): + def transmit(self, method_name, params={}): if self.virtual: - print "Transmitting virtually over tcp://{server}:{port}".format( - server=self.server, - port=self.port) - id, msg = self.rpc_link.create_jsonrpc_v2(method_name, params) + self._prompt_virtual_tx_msg() + _, msg = self.rpc_link.create_jsonrpc_v2(method_name, params) print msg return else: return self.rpc_link.invoke_rpc_method(method_name, params) + def transmit_batch(self, batch_list): + if self.virtual: + self._prompt_virtual_tx_msg() + print [msg + for _, msg in [self.rpc_link.create_jsonrpc_v2(command.method, command.params) + for command in batch_list]] + else: + batch = self.rpc_link.create_batch() + for command in batch_list: + batch.add(command.method, command.params) + # invoke the batch + return batch.invoke() + + def _prompt_virtual_tx_msg(self): + print "Transmitting virtually over tcp://{server}:{port}".format(server=self.server, + port=self.port) + + +class CStream(object): + """docstring for CStream""" + DEFAULTS = {"rx_stats": CRxStats, + "mode": CTxMode, + "isg": 5.0, + "next_stream": -1, + "self_start": True, + "enabled": True} + + def __init__(self, **kwargs): + super(CStream, self).__init__() + for k, v in kwargs.items(): + setattr(self, k, v) + # set default values to unset attributes, according to DEFAULTS dict + set_keys = set(kwargs.keys()) + keys_to_set = [x + for x in self.DEFAULTS + if x not in set_keys] + for key in keys_to_set: + default = self.DEFAULTS.get(key) + if type(default) == type: + setattr(self, key, default()) + else: + setattr(self, key, default) + + @property + def packet(self): + return self._packet + + @packet.setter + def packet(self, packet_obj): + assert isinstance(packet_obj, CTRexPktBuilder) + self._packet = packet_obj + + @property + def enabled(self): + return self._enabled + + @enabled.setter + def enabled(self, bool_value): + self._enabled = bool(bool_value) + + @property + def self_start(self): + return self._self_start + + @self_start.setter + def self_start(self, bool_value): + self._self_start = bool(bool_value) + + @property + def next_stream(self): + return self._next_stream + + @next_stream.setter + def next_stream(self, value): + self._next_stream = int(value) + + def dump(self): + pass + return {"enabled": self.enabled, + "self_start": self.self_start, + "isg": self.isg, + "next_stream": self.next_stream, + "packet": self.packet.dump_pkt(), + "mode": self.mode.dump(), + "vm": self.packet.get_vm_data(), + "rx_stats": self.rx_stats.dump()} + +class CRxStats(object): + + def __init__(self, enabled=False, seq_enabled=False, latency_enabled=False): + self._rx_dict = {"enabled": enabled, + "seq_enabled": seq_enabled, + "latency_enabled": latency_enabled} + + @property + def enabled(self): + return self._rx_dict.get("enabled") + + @enabled.setter + def enabled(self, bool_value): + self._rx_dict['enabled'] = bool(bool_value) + + @property + def seq_enabled(self): + return self._rx_dict.get("seq_enabled") + + @seq_enabled.setter + def seq_enabled(self, bool_value): + self._rx_dict['seq_enabled'] = bool(bool_value) + + @property + def latency_enabled(self): + return self._rx_dict.get("latency_enabled") + + @latency_enabled.setter + def latency_enabled(self, bool_value): + self._rx_dict['latency_enabled'] = bool(bool_value) + + def dump(self): + return {k: v + for k, v in self._rx_dict.items() + if v + } + + +class CTxMode(object): + """docstring for CTxMode""" + def __init__(self, tx_mode, pps): + super(CTxMode, self).__init__() + if tx_mode not in ["continuous", "single_burst", "multi_burst"]: + raise ValueError("Unknown TX mode ('{0}')has been initialized.".format(tx_mode)) + self._tx_mode = tx_mode + self._fields = {'pps': float(pps)} + if tx_mode == "single_burst": + self._fields['total_pkts'] = 0 + elif tx_mode == "multi_burst": + self._fields['pkts_per_burst'] = 0 + self._fields['ibg'] = 0.0 + self._fields['count'] = 0 + else: + pass + + def set_tx_mode_attr(self, attr, val): + if attr in self._fields: + self._fields[attr] = type(self._fields.get(attr))(val) + else: + raise ValueError("The provided attribute ('{0}') is not a legal attribute in selected TX mode ('{1}')". + format(attr, self._tx_mode)) + def dump(self): + dump = {"type": self._tx_mode} + dump.update({k: v + for k, v in self._fields.items() + }) + return dump if __name__ == "__main__": |