summaryrefslogtreecommitdiffstats
path: root/scripts/automation/trex_control_plane/client
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/automation/trex_control_plane/client')
-rwxr-xr-xscripts/automation/trex_control_plane/client/outer_packages.py3
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_adv_client.py2
-rwxr-xr-xscripts/automation/trex_control_plane/client/trex_client.py140
-rwxr-xr-x[-rw-r--r--]scripts/automation/trex_control_plane/client/trex_stateless_client.py462
4 files changed, 525 insertions, 82 deletions
diff --git a/scripts/automation/trex_control_plane/client/outer_packages.py b/scripts/automation/trex_control_plane/client/outer_packages.py
index 5facad20..206d4b4c 100755
--- a/scripts/automation/trex_control_plane/client/outer_packages.py
+++ b/scripts/automation/trex_control_plane/client/outer_packages.py
@@ -1,7 +1,6 @@
#!/router/bin/python
import sys
-import site
import os
CURRENT_PATH = os.path.dirname(os.path.realpath(__file__))
@@ -25,6 +24,6 @@ def import_module_list(modules_list):
for p in modules_list:
full_path = os.path.join(PATH_TO_PYTHON_LIB, p)
fix_path = os.path.normcase(full_path) # (CURRENT_PATH+p)
- site.addsitedir(full_path)
+ sys.path.insert(1, full_path)
import_client_modules()
diff --git a/scripts/automation/trex_control_plane/client/trex_adv_client.py b/scripts/automation/trex_control_plane/client/trex_adv_client.py
index b3fe3dad..bf7ccf58 100755
--- a/scripts/automation/trex_control_plane/client/trex_adv_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_adv_client.py
@@ -8,7 +8,7 @@ class CTRexAdvClient(trex_client.CTRexClient):
super(CTRexAdvClient, self).__init__(trex_host, max_history_size, trex_daemon_port, trex_zmq_port, verbose)
pass
- # T-REX KIWI advanced methods
+ # TRex KIWI advanced methods
def start_quick_trex(self, pcap_file, d, delay, dual, ipv6, times, interfaces):
try:
return self.server.start_quick_trex(pcap_file = pcap_file, duration = d, dual = dual, delay = delay, ipv6 = ipv6, times = times, interfaces = interfaces)
diff --git a/scripts/automation/trex_control_plane/client/trex_client.py b/scripts/automation/trex_control_plane/client/trex_client.py
index 56775766..c3677132 100755
--- a/scripts/automation/trex_control_plane/client/trex_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_client.py
@@ -30,18 +30,18 @@ from distutils.util import strtobool
class CTRexClient(object):
"""
- This class defines the client side of the RESTfull interaction with T-Rex
+ This class defines the client side of the RESTfull interaction with TRex
"""
def __init__(self, trex_host, max_history_size = 100, trex_daemon_port = 8090, trex_zmq_port = 4500, verbose = False):
"""
- Instantiate a T-Rex client object, and connecting it to listening daemon-server
+ Instantiate a TRex client object, and connecting it to listening daemon-server
:parameters:
trex_host : str
- a string of the t-rex ip address or hostname.
+ a string of the TRex ip address or hostname.
max_history_size : int
- a number to set the maximum history size of a single T-Rex run. Each sampling adds a new item to history.
+ a number to set the maximum history size of a single TRex run. Each sampling adds a new item to history.
default value : **100**
trex_daemon_port : int
@@ -69,7 +69,7 @@ class CTRexClient(object):
self.result_obj = CTRexResult(max_history_size)
self.decoder = JSONDecoder()
self.trex_server_path = "http://{hostname}:{port}/".format( hostname = trex_host, port = trex_daemon_port )
- self.__verbose_print("Connecting to T-Rex @ {trex_path} ...".format( trex_path = self.trex_server_path ) )
+ self.__verbose_print("Connecting to TRex @ {trex_path} ...".format( trex_path = self.trex_server_path ) )
self.history = jsonrpclib.history.History()
self.server = jsonrpclib.Server(self.trex_server_path, history = self.history)
self.check_server_connectivity()
@@ -90,7 +90,7 @@ class CTRexClient(object):
def start_trex (self, f, d, block_to_success = True, timeout = 30, user = None, **trex_cmd_options):
"""
- Request to start a T-Rex run on server.
+ Request to start a TRex run on server.
:parameters:
f : str
@@ -98,17 +98,17 @@ class CTRexClient(object):
d : int
the desired duration of the test. must be at least 30 seconds long.
block_to_success : bool
- determine if this method blocks until T-Rex changes state from 'Starting' to either 'Idle' or 'Running'
+ determine if this method blocks until TRex changes state from 'Starting' to either 'Idle' or 'Running'
default value : **True**
timeout : int
- maximum time (in seconds) to wait in blocking state until T-Rex changes state from 'Starting' to either 'Idle' or 'Running'
+ maximum time (in seconds) to wait in blocking state until TRex changes state from 'Starting' to either 'Idle' or 'Running'
default value: **30**
user : str
the identity of the the run issuer.
trex_cmd_options : key, val
- sets desired T-Rex options using key=val syntax, separated by comma.
+ sets desired TRex options using key=val syntax, separated by comma.
for keys with no value, state key=True
:return:
@@ -117,8 +117,8 @@ class CTRexClient(object):
:raises:
+ :exc:`ValueError`, in case 'd' parameter inserted with wrong value.
+ :exc:`trex_exceptions.TRexError`, in case one of the trex_cmd_options raised an exception at server.
- + :exc:`trex_exceptions.TRexInUseError`, in case T-Rex is already taken.
- + :exc:`trex_exceptions.TRexRequestDenied`, in case T-Rex is reserved for another user than the one trying start T-Rex.
+ + :exc:`trex_exceptions.TRexInUseError`, in case TRex is already taken.
+ + :exc:`trex_exceptions.TRexRequestDenied`, in case TRex is reserved for another user than the one trying start TRex.
+ ProtocolError, in case of error in JSON-RPC protocol.
"""
@@ -128,7 +128,7 @@ class CTRexClient(object):
if d < 30: # specify a test should take at least 30 seconds long.
raise ValueError
except ValueError:
- raise ValueError('d parameter must be integer, specifying how long T-Rex run, and must be larger than 30 secs.')
+ raise ValueError('d parameter must be integer, specifying how long TRex run, and must be larger than 30 secs.')
trex_cmd_options.update( {'f' : f, 'd' : d} )
@@ -146,25 +146,25 @@ class CTRexClient(object):
if retval!=0:
self.seq = retval # update seq num only on successful submission
return True
- else: # T-Rex is has been started by another user
- raise TRexInUseError('T-Rex is already being used by another user or process. Try again once T-Rex is back in IDLE state.')
+ else: # TRex is has been started by another user
+ raise TRexInUseError('TRex is already being used by another user or process. Try again once TRex is back in IDLE state.')
def stop_trex (self):
"""
- Request to stop a T-Rex run on server.
+ Request to stop a TRex run on server.
- The request is only valid if the stop initiator is the same client as the T-Rex run initiator.
+ The request is only valid if the stop initiator is the same client as the TRex run initiator.
:parameters:
None
:return:
+ **True** on successful termination
- + **False** if request issued but T-Rex wasn't running.
+ + **False** if request issued but TRex wasn't running.
:raises:
- + :exc:`trex_exceptions.TRexRequestDenied`, in case T-Rex ir running but started by another user.
- + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed T-Rex run (unexpected termination).
+ + :exc:`trex_exceptions.TRexRequestDenied`, in case TRex ir running but started by another user.
+ + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed TRex run (unexpected termination).
+ ProtocolError, in case of error in JSON-RPC protocol.
"""
@@ -179,16 +179,16 @@ class CTRexClient(object):
def force_kill (self, confirm = True):
"""
- Force killing of running T-Rex process (if exists) on the server.
+ Force killing of running TRex process (if exists) on the server.
.. tip:: This method is a safety method and **overrides any running or reserved resources**, and as such isn't designed to be used on a regular basis.
Always consider using :func:`trex_client.CTRexClient.stop_trex` instead.
- In the end of this method, T-Rex will return to IDLE state with no reservation.
+ In the end of this method, TRex will return to IDLE state with no reservation.
:parameters:
confirm : bool
- Prompt a user confirmation before continue terminating T-Rex session
+ Prompt a user confirmation before continue terminating TRex session
:return:
+ **True** on successful termination
@@ -199,7 +199,7 @@ class CTRexClient(object):
"""
if confirm:
- prompt = "WARNING: This will terminate active T-Rex session indiscriminately.\nAre you sure? "
+ prompt = "WARNING: This will terminate active TRex session indiscriminately.\nAre you sure? "
sys.stdout.write('%s [y/n]\n' % prompt)
while True:
try:
@@ -221,20 +221,20 @@ class CTRexClient(object):
def wait_until_kickoff_finish(self, timeout = 40):
"""
- Block the client application until T-Rex changes state from 'Starting' to either 'Idle' or 'Running'
+ Block the client application until TRex changes state from 'Starting' to either 'Idle' or 'Running'
- The request is only valid if the stop initiator is the same client as the T-Rex run initiator.
+ The request is only valid if the stop initiator is the same client as the TRex run initiator.
:parameters:
timeout : int
- maximum time (in seconds) to wait in blocking state until T-Rex changes state from 'Starting' to either 'Idle' or 'Running'
+ maximum time (in seconds) to wait in blocking state until TRex changes state from 'Starting' to either 'Idle' or 'Running'
:return:
+ **True** on successful termination
- + **False** if request issued but T-Rex wasn't running.
+ + **False** if request issued but TRex wasn't running.
:raises:
- + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed T-Rex run (unexpected termination).
+ + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed TRex run (unexpected termination).
+ ProtocolError, in case of error in JSON-RPC protocol.
.. note:: Exceptions are throws only when start_trex did not block in the first place, i.e. `block_to_success` parameter was set to `False`
@@ -252,22 +252,22 @@ class CTRexClient(object):
def is_running (self, dump_out = False):
"""
- Poll for T-Rex running status.
+ Poll for TRex running status.
- If T-Rex is running, a history item will be added into result_obj and processed.
+ If TRex is running, a history item will be added into result_obj and processed.
- .. tip:: This method is especially useful for iterating until T-Rex run is finished.
+ .. tip:: This method is especially useful for iterating until TRex run is finished.
:parameters:
dump_out : dict
if passed, the pointer object is cleared and the latest dump stored in it.
:return:
- + **True** if T-Rex is running.
- + **False** if T-Rex is not running.
+ + **True** if TRex is running.
+ + **False** if TRex is not running.
:raises:
- + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed T-Rex run (unexpected termination).
+ + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed TRex run (unexpected termination).
+ :exc:`TypeError`, in case JSON stream decoding error.
+ ProtocolError, in case of error in JSON-RPC protocol.
@@ -292,7 +292,7 @@ class CTRexClient(object):
def get_trex_files_path (self):
"""
- Fetches the local path in which files are stored when pushed to t-rex server from client.
+ Fetches the local path in which files are stored when pushed to TRex server from client.
:parameters:
None
@@ -300,7 +300,7 @@ class CTRexClient(object):
:return:
string representation of the desired path
- .. note:: The returned path represents a path on the T-Rex server **local machine**
+ .. note:: The returned path represents a path on the TRex server **local machine**
:raises:
ProtocolError, in case of error in JSON-RPC protocol.
@@ -317,7 +317,7 @@ class CTRexClient(object):
def get_running_status (self):
"""
- Fetches the current T-Rex status.
+ Fetches the current TRex status.
If available, a verbose data will accompany the state itself.
@@ -344,18 +344,18 @@ class CTRexClient(object):
def get_running_info (self):
"""
- Performs single poll of T-Rex running data and process it into the result object (named `result_obj`).
+ Performs single poll of TRex running data and process it into the result object (named `result_obj`).
- .. tip:: This method will throw an exception if T-Rex isn't running. Always consider using :func:`trex_client.CTRexClient.is_running` which handles a single poll operation in safer manner.
+ .. tip:: This method will throw an exception if TRex isn't running. Always consider using :func:`trex_client.CTRexClient.is_running` which handles a single poll operation in safer manner.
:parameters:
None
:return:
- dictionary containing the most updated data dump from T-Rex.
+ dictionary containing the most updated data dump from TRex.
:raises:
- + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed T-Rex run (unexpected termination).
+ + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed TRex run (unexpected termination).
+ :exc:`TypeError`, in case JSON stream decoding error.
+ ProtocolError, in case of error in JSON-RPC protocol.
@@ -379,7 +379,7 @@ class CTRexClient(object):
def sample_until_condition (self, condition_func, time_between_samples = 5):
"""
- Automatically sets ongoing sampling of T-Rex data, with sampling rate described by time_between_samples.
+ Automatically sets ongoing sampling of TRex data, with sampling rate described by time_between_samples.
On each fetched dump, the condition_func is applied on the result objects, and if returns True, the sampling will stop.
@@ -394,36 +394,36 @@ class CTRexClient(object):
default value : **5**
:return:
- the first result object (see :class:`CTRexResult` for further details) of the T-Rex run on which the condition has been met.
+ the first result object (see :class:`CTRexResult` for further details) of the TRex run on which the condition has been met.
:raises:
+ :exc:`UserWarning`, in case the condition_func method condition hasn't been met
- + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed T-Rex run (unexpected termination).
+ + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed TRex run (unexpected termination).
+ :exc:`TypeError`, in case JSON stream decoding error.
+ ProtocolError, in case of error in JSON-RPC protocol.
+ :exc:`Exception`, in case the condition_func suffered from any kind of exception
"""
- # make sure T-Rex is running. raise exceptions here if any
+ # make sure TRex is running. raise exceptions here if any
self.wait_until_kickoff_finish()
try:
while self.is_running():
results = self.get_result_obj()
if condition_func(results):
- # if condition satisfied, stop T-Rex and return result object
+ # if condition satisfied, stop TRex and return result object
self.stop_trex()
return results
time.sleep(time_between_samples)
except TRexWarning:
# means we're back to Idle state, and didn't meet our condition
- raise UserWarning("T-Rex results condition wasn't met during T-Rex run.")
+ raise UserWarning("TRex results condition wasn't met during TRex run.")
except Exception:
# this could come from provided method 'condition_func'
raise
def sample_to_run_finish (self, time_between_samples = 5):
"""
- Automatically sets automatically sampling of T-Rex data with sampling rate described by time_between_samples until T-Rex run finished.
+ Automatically sets automatically sampling of TRex data with sampling rate described by time_between_samples until TRex run finished.
:parameters:
time_between_samples : int
@@ -436,7 +436,7 @@ class CTRexClient(object):
:raises:
+ :exc:`UserWarning`, in case the condition_func method condition hasn't been met
- + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed T-Rex run (unexpected termination).
+ + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed TRex run (unexpected termination).
+ :exc:`TypeError`, in case JSON stream decoding error.
+ ProtocolError, in case of error in JSON-RPC protocol.
@@ -475,13 +475,13 @@ class CTRexClient(object):
def is_reserved (self):
"""
- Checks if T-Rex is currently reserved to any user or not.
+ Checks if TRex is currently reserved to any user or not.
:parameters:
None
:return:
- + **True** if T-Rex is reserved.
+ + **True** if TRex is reserved.
+ **False** otherwise.
:raises:
@@ -499,13 +499,13 @@ class CTRexClient(object):
def reserve_trex (self, user = None):
"""
- Reserves the usage of T-Rex to a certain user.
+ Reserves the usage of TRex to a certain user.
- When T-Rex is reserved, it can't be reserved.
+ When TRex is reserved, it can't be reserved.
:parameters:
user : str
- a username of the desired owner of T-Rex
+ a username of the desired owner of TRex
default: current logged user
@@ -513,8 +513,8 @@ class CTRexClient(object):
**True** if reservation made successfully
:raises:
- + :exc:`trex_exceptions.TRexRequestDenied`, in case T-Rex is reserved for another user than the one trying to make the reservation.
- + :exc:`trex_exceptions.TRexInUseError`, in case T-Rex is currently running.
+ + :exc:`trex_exceptions.TRexRequestDenied`, in case TRex is reserved for another user than the one trying to make the reservation.
+ + :exc:`trex_exceptions.TRexInUseError`, in case TRex is currently running.
+ ProtocolError, in case of error in JSON-RPC protocol.
"""
@@ -530,14 +530,14 @@ class CTRexClient(object):
def cancel_reservation (self, user = None):
"""
- Cancels a current reservation of T-Rex to a certain user.
+ Cancels a current reservation of TRex to a certain user.
- When T-Rex is reserved, no other user can start new T-Rex runs.
+ When TRex is reserved, no other user can start new TRex runs.
:parameters:
user : str
- a username of the desired owner of T-Rex
+ a username of the desired owner of TRex
default: current logged user
@@ -546,7 +546,7 @@ class CTRexClient(object):
+ **False** if there was no reservation at all.
:raises:
- + :exc:`trex_exceptions.TRexRequestDenied`, in case T-Rex is reserved for another user than the one trying to cancel the reservation.
+ + :exc:`trex_exceptions.TRexRequestDenied`, in case TRex is reserved for another user than the one trying to cancel the reservation.
+ ProtocolError, in case of error in JSON-RPC protocol.
"""
@@ -627,7 +627,7 @@ class CTRexClient(object):
return method_to_call()
except socket.error as e:
if e.errno == errno.ECONNREFUSED:
- raise SocketError(errno.ECONNREFUSED, "Connection from T-Rex server was refused. Please make sure the server is up.")
+ raise SocketError(errno.ECONNREFUSED, "Connection from TRex server was refused. Please make sure the server is up.")
def check_server_connectivity (self):
"""
@@ -640,7 +640,7 @@ class CTRexClient(object):
raise socket.gaierror(e.errno, "Could not resolve server hostname. Please make sure hostname entered correctly.")
except socket.error as e:
if e.errno == errno.ECONNREFUSED:
- raise socket.error(errno.ECONNREFUSED, "Connection from T-Rex server was refused. Please make sure the server is up.")
+ raise socket.error(errno.ECONNREFUSED, "Connection from TRex server was refused. Please make sure the server is up.")
finally:
self.prompt_verbose_data()
@@ -671,7 +671,7 @@ class CTRexClient(object):
def _handle_AppError_exception(self, err):
"""
- This private method triggres the T-Rex dedicated exception generation in case a general ProtocolError has been raised.
+ This private method triggres the TRex dedicated exception generation in case a general ProtocolError has been raised.
"""
# handle known exceptions based on known error codes.
# if error code is not known, raise ProtocolError
@@ -680,17 +680,17 @@ class CTRexClient(object):
class CTRexResult(object):
"""
- A class containing all results received from T-Rex.
+ A class containing all results received from TRex.
Ontop to containing the results, this class offers easier data access and extended results processing options
"""
def __init__(self, max_history_size):
"""
- Instatiate a T-Rex result object
+ Instatiate a TRex result object
:parameters:
max_history_size : int
- a number to set the maximum history size of a single T-Rex run. Each sampling adds a new item to history.
+ a number to set the maximum history size of a single TRex run. Each sampling adds a new item to history.
"""
self._history = deque(maxlen = max_history_size)
@@ -749,7 +749,7 @@ class CTRexResult(object):
def get_avg_latency (self):
"""
- Fetches the average latency measured on each of the interfaces from the start of T-Rex run
+ Fetches the average latency measured on each of the interfaces from the start of TRex run
:parameters:
None
@@ -779,7 +779,7 @@ class CTRexResult(object):
def get_total_drops (self):
"""
- Fetches the total number of drops identified from the moment T-Rex run began.
+ Fetches the total number of drops identified from the moment TRex run began.
:parameters:
None
@@ -835,7 +835,7 @@ class CTRexResult(object):
def is_done_warmup (self):
"""
- Checks if T-Rex latest results TX-rate indicates that T-Rex has reached its expected TX-rate.
+ Checks if TRex latest results TX-rate indicates that TRex has reached its expected TX-rate.
:parameters:
None
@@ -856,7 +856,7 @@ class CTRexResult(object):
defines a path to desired data.
.. tip:: | Use '.' to enter one level deeper in dictionary hierarchy.
- | Use '[i]' to access the i'th indexed obejct of an array.
+ | Use '[i]' to access the i'th indexed object of an array.
tree_path_to_key : regex
apply a regex to filter results out from a multiple results set.
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
index 5513f420..334496d1 100644..100755
--- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py
+++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py
@@ -6,22 +6,314 @@ try:
except ImportError:
# support import for Python 3
import client.outer_packages
-from client_utils.jsonrpc_client import JsonRpcClient
-
+from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage
+from client_utils.packet_builder import CTRexPktBuilder
+import json
+from common.trex_stats import *
+from collections import namedtuple
class CTRexStatelessClient(object):
"""docstring for CTRexStatelessClient"""
- def __init__(self, server="localhost", port=5050, virtual=False):
+ RpcCmdData = namedtuple('RpcCmdData', ['method', 'params'])
+
+ def __init__(self, username, server="localhost", port=5050, virtual=False):
super(CTRexStatelessClient, self).__init__()
+ self.user = username
self.tx_link = CTRexStatelessClient.CTxLink(server, port, virtual)
+ self._conn_handler = {}
+ self._active_ports = set()
+ self._stats = CTRexStatsManager("port", "stream")
+ self._system_info = None
+
+ # ----- decorator methods ----- #
+ def force_status(owned=True, active_and_owned=False):
+ def wrapper(func):
+ def wrapper_f(self, *args, **kwargs):
+ port_ids = kwargs.get("port_id")
+ if isinstance(port_ids, int):
+ # make sure port_ids is a list
+ port_ids = [port_ids]
+ bad_ids = set()
+ for port_id in port_ids:
+ port_owned = self._conn_handler.get(kwargs.get(port_id))
+ if owned and not port_owned:
+ bad_ids.add(port_ids)
+ elif active_and_owned: # stronger condition than just owned, hence gets precedence
+ if port_owned and port_id in self._active_ports:
+ continue
+ else:
+ bad_ids.add(port_ids)
+ else:
+ continue
+ if bad_ids:
+ # Some port IDs are not according to desires status
+ raise RuntimeError("The requested method ('{0}') cannot be invoked since port IDs {1} are not"
+ "at allowed stated".format(func.__name__))
+ else:
+ func(self, *args, **kwargs)
+ return wrapper_f
+ return wrapper
+
+ @property
+ def system_info(self):
+ if not self._system_info:
+ self._system_info = self.get_system_info()
+ return self._system_info
+
+ # ----- user-access methods ----- #
+ def ping(self):
+ return self.transmit("ping")
+
+ def get_supported_cmds(self):
+ return self.transmit("get_supported_cmds")
+
+ def get_version(self):
+ return self.transmit("get_version")
+
+ def get_system_info(self):
+ return self.transmit("get_system_info")
+
+ def get_port_count(self):
+ return self.system_info.get("port_count")
+
+ def acquire(self, port_id, force=False):
+ if not self._is_ports_valid(port_id):
+ raise ValueError("Provided illegal port id input")
+ if isinstance(port_id, list) or isinstance(port_id, set):
+ # handle as batch mode
+ port_ids = set(port_id) # convert to set to avoid duplications
+ commands = [self.RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force})
+ for p_id in port_ids]
+ rc, resp_list = self.transmit_batch(commands)
+ if rc:
+ self._process_batch_result(commands, resp_list, self._handle_acquire_response)
+ else:
+ params = {"port_id": port_id,
+ "user": self.user,
+ "force": force}
+ command = self.RpcCmdData("acquire", params)
+ self._handle_acquire_response(command, self.transmit(command.method, command.params))
+ return self._conn_handler.get(port_id)
+
+ @force_status(owned=True)
+ def release(self, port_id=None):
+ if not self._is_ports_valid(port_id):
+ raise ValueError("Provided illegal port id input")
+ if isinstance(port_id, list) or isinstance(port_id, set):
+ # handle as batch mode
+ port_ids = set(port_id) # convert to set to avoid duplications
+ commands = [self.RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ for p_id in port_ids]
+ rc, resp_list = self.transmit_batch(commands)
+ if rc:
+ self._process_batch_result(commands, resp_list, self._handle_release_response)
+ else:
+ self._conn_handler.pop(port_id)
+ params = {"handler": self._conn_handler.get(port_id),
+ "port_id": port_id}
+ command = self.RpcCmdData("release", params)
+ self._handle_release_response(command, self.transmit(command.method, command.params))
+ return
+
+ @force_status(owned=True)
+ def add_stream(self, stream_id, stream_obj, port_id=None):
+ if not self._is_ports_valid(port_id):
+ raise ValueError("Provided illegal port id input")
+ assert isinstance(stream_obj, CStream)
+ params = {"handler": self._conn_handler.get(port_id),
+ "port_id": port_id,
+ "stream_id": stream_id,
+ "stream": stream_obj.dump()}
+ return self.transmit("add_stream", params)
+
+ @force_status(owned=True)
+ def remove_stream(self, stream_id, port_id=None):
+ if not self._is_ports_valid(port_id):
+ raise ValueError("Provided illegal port id input")
+ params = {"handler": self._conn_handler.get(port_id),
+ "port_id": port_id,
+ "stream_id": stream_id}
+ return self.transmit("remove_stream", params)
+
+ @force_status(owned=True, active_and_owned=True)
+ def get_stream_id_list(self, port_id=None):
+ if not self._is_ports_valid(port_id):
+ raise ValueError("Provided illegal port id input")
+ params = {"handler": self._conn_handler.get(port_id),
+ "port_id": port_id}
+ return self.transmit("get_stream_list", params)
+
+ @force_status(owned=True, active_and_owned=True)
+ def get_stream(self, stream_id, port_id=None):
+ if not self._is_ports_valid(port_id):
+ raise ValueError("Provided illegal port id input")
+ params = {"handler": self._conn_handler.get(port_id),
+ "port_id": port_id,
+ "stream_id": stream_id}
+ return self.transmit("get_stream_list", params)
+ @force_status(owned=True)
+ def start_traffic(self, port_id=None):
+ if not self._is_ports_valid(port_id):
+ raise ValueError("Provided illegal port id input")
+ if isinstance(port_id, list) or isinstance(port_id, set):
+ # handle as batch mode
+ port_ids = set(port_id) # convert to set to avoid duplications
+ commands = [self.RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ for p_id in port_ids]
+ rc, resp_list = self.transmit_batch(commands)
+ if rc:
+ self._process_batch_result(commands, resp_list, self._handle_start_traffic_response)
+ else:
+ params = {"handler": self._conn_handler.get(port_id),
+ "port_id": port_id}
+ command = self.RpcCmdData("start_traffic", params)
+ self._handle_start_traffic_response(command, self.transmit(command.method, command.params))
+ return
- def transmit(self, method_name, params = {}):
+ @force_status(owned=False, active_and_owned=True)
+ def stop_traffic(self, port_id=None):
+ if not self._is_ports_valid(port_id):
+ raise ValueError("Provided illegal port id input")
+ if isinstance(port_id, list) or isinstance(port_id, set):
+ # handle as batch mode
+ port_ids = set(port_id) # convert to set to avoid duplications
+ commands = [self.RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ for p_id in port_ids]
+ rc, resp_list = self.transmit_batch(commands)
+ if rc:
+ self._process_batch_result(commands, resp_list, self._handle_stop_traffic_response)
+ else:
+ params = {"handler": self._conn_handler.get(port_id),
+ "port_id": port_id}
+ command = self.RpcCmdData("stop_traffic", params)
+ self._handle_start_traffic_response(command, self.transmit(command.method, command.params))
+ return
+
+ def get_global_stats(self):
+ command = self.RpcCmdData("get_global_stats", {})
+ return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params))
+ # return self.transmit("get_global_stats")
+
+ @force_status(owned=True, active_and_owned=True)
+ def get_port_stats(self, port_id=None):
+ if not self._is_ports_valid(port_id):
+ raise ValueError("Provided illegal port id input")
+ if isinstance(port_id, list) or isinstance(port_id, set):
+ # handle as batch mode
+ port_ids = set(port_id) # convert to set to avoid duplications
+ commands = [self.RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ for p_id in port_ids]
+ rc, resp_list = self.transmit_batch(commands)
+ if rc:
+ self._process_batch_result(commands, resp_list, self._handle_get_port_stats_response)
+ else:
+ params = {"handler": self._conn_handler.get(port_id),
+ "port_id": port_id}
+ command = self.RpcCmdData("get_port_stats", params)
+ return self._handle_get_port_stats_response(command, self.transmit(command.method, command.params))
+
+ @force_status(owned=True, active_and_owned=True)
+ def get_stream_stats(self, port_id=None):
+ if not self._is_ports_valid(port_id):
+ raise ValueError("Provided illegal port id input")
+ if isinstance(port_id, list) or isinstance(port_id, set):
+ # handle as batch mode
+ port_ids = set(port_id) # convert to set to avoid duplications
+ commands = [self.RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id})
+ for p_id in port_ids]
+ rc, resp_list = self.transmit_batch(commands)
+ if rc:
+ self._process_batch_result(commands, resp_list, self._handle_get_stream_stats_response)
+ else:
+ params = {"handler": self._conn_handler.get(port_id),
+ "port_id": port_id}
+ command = self.RpcCmdData("get_stream_stats", params)
+ return self._handle_get_stream_stats_response(command, self.transmit(command.method, command.params))
+
+ # ----- internal methods ----- #
+ def transmit(self, method_name, params={}):
return self.tx_link.transmit(method_name, params)
+ def transmit_batch(self, batch_list):
+ return self.tx_link.transmit_batch(batch_list)
+
+ @staticmethod
+ def _object_decoder(obj_type, obj_data):
+ if obj_type == "global":
+ return CGlobalStats(**obj_data)
+ elif obj_type == "port":
+ return CPortStats(**obj_data)
+ elif obj_type == "stream":
+ return CStreamStats(**obj_data)
+ else:
+ # Do not serialize the data into class
+ return obj_data
+
+ @staticmethod
+ def default_success_test(result_obj):
+ if result_obj.success:
+ return True
+ else:
+ return False
+
+ # ----- handler internal methods ----- #
+ def _handle_acquire_response(self, request, response):
+ if response.success:
+ self._conn_handler[request.get("port_id")] = response.data
+
+ def _handle_release_response(self, request, response):
+ if response.success:
+ del self._conn_handler[request.get("port_id")]
+ def _handle_start_traffic_response(self, request, response):
+ if response.success:
+ self._active_ports.add(request.get("port_id"))
+
+ def _handle_stop_traffic_response(self, request, response):
+ if response.success:
+ self._active_ports.remove(request.get("port_id"))
+
+ def _handle_get_global_stats_response(self, request, response):
+ if response.success:
+ return CGlobalStats(**response.success)
+ else:
+ return False
+
+ def _handle_get_port_stats_response(self, request, response):
+ if response.success:
+ return CPortStats(**response.success)
+ else:
+ return False
+
+ def _handle_get_stream_stats_response(self, request, response):
+ if response.success:
+ return CStreamStats(**response.success)
+ else:
+ return False
+
+ def _is_ports_valid(self, port_id):
+ if isinstance(port_id, list) or isinstance(port_id, set):
+ # check each item of the sequence
+ return all([self._is_ports_valid(port)
+ for port in port_id])
+ elif (isinstance(port_id, int)) and (port_id > 0) and (port_id <= self.get_port_count()):
+ return True
+ else:
+ return False
+
+ def _process_batch_result(self, req_list, resp_list, handler_func=None, success_test=default_success_test):
+ for i, response in enumerate(resp_list):
+ # testing each result with success test so that a conclusion report could be deployed in future.
+ if success_test(response):
+ # run handler method with its params
+ handler_func(req_list[i], response)
+ else:
+ continue # TODO: mark in this case somehow the bad result
+
+ # ------ private classes ------ #
class CTxLink(object):
"""describes the connectivity of the stateless client method"""
def __init__(self, server="localhost", port=5050, virtual=False):
@@ -33,18 +325,170 @@ class CTRexStatelessClient(object):
if not self.virtual:
self.rpc_link.connect()
- def transmit(self, method_name, params = {}):
+ def transmit(self, method_name, params={}):
if self.virtual:
- print "Transmitting virtually over tcp://{server}:{port}".format(
- server=self.server,
- port=self.port)
- id, msg = self.rpc_link.create_jsonrpc_v2(method_name, params)
+ self._prompt_virtual_tx_msg()
+ _, msg = self.rpc_link.create_jsonrpc_v2(method_name, params)
print msg
return
else:
return self.rpc_link.invoke_rpc_method(method_name, params)
+ def transmit_batch(self, batch_list):
+ if self.virtual:
+ self._prompt_virtual_tx_msg()
+ print [msg
+ for _, msg in [self.rpc_link.create_jsonrpc_v2(command.method, command.params)
+ for command in batch_list]]
+ else:
+ batch = self.rpc_link.create_batch()
+ for command in batch_list:
+ batch.add(command.method, command.params)
+ # invoke the batch
+ return batch.invoke()
+
+ def _prompt_virtual_tx_msg(self):
+ print "Transmitting virtually over tcp://{server}:{port}".format(server=self.server,
+ port=self.port)
+
+
+class CStream(object):
+ """docstring for CStream"""
+ DEFAULTS = {"rx_stats": CRxStats,
+ "mode": CTxMode,
+ "isg": 5.0,
+ "next_stream": -1,
+ "self_start": True,
+ "enabled": True}
+
+ def __init__(self, **kwargs):
+ super(CStream, self).__init__()
+ for k, v in kwargs.items():
+ setattr(self, k, v)
+ # set default values to unset attributes, according to DEFAULTS dict
+ set_keys = set(kwargs.keys())
+ keys_to_set = [x
+ for x in self.DEFAULTS
+ if x not in set_keys]
+ for key in keys_to_set:
+ default = self.DEFAULTS.get(key)
+ if type(default) == type:
+ setattr(self, key, default())
+ else:
+ setattr(self, key, default)
+
+ @property
+ def packet(self):
+ return self._packet
+
+ @packet.setter
+ def packet(self, packet_obj):
+ assert isinstance(packet_obj, CTRexPktBuilder)
+ self._packet = packet_obj
+
+ @property
+ def enabled(self):
+ return self._enabled
+
+ @enabled.setter
+ def enabled(self, bool_value):
+ self._enabled = bool(bool_value)
+
+ @property
+ def self_start(self):
+ return self._self_start
+
+ @self_start.setter
+ def self_start(self, bool_value):
+ self._self_start = bool(bool_value)
+
+ @property
+ def next_stream(self):
+ return self._next_stream
+
+ @next_stream.setter
+ def next_stream(self, value):
+ self._next_stream = int(value)
+
+ def dump(self):
+ pass
+ return {"enabled": self.enabled,
+ "self_start": self.self_start,
+ "isg": self.isg,
+ "next_stream": self.next_stream,
+ "packet": self.packet.dump_pkt(),
+ "mode": self.mode.dump(),
+ "vm": self.packet.get_vm_data(),
+ "rx_stats": self.rx_stats.dump()}
+
+class CRxStats(object):
+
+ def __init__(self, enabled=False, seq_enabled=False, latency_enabled=False):
+ self._rx_dict = {"enabled": enabled,
+ "seq_enabled": seq_enabled,
+ "latency_enabled": latency_enabled}
+
+ @property
+ def enabled(self):
+ return self._rx_dict.get("enabled")
+
+ @enabled.setter
+ def enabled(self, bool_value):
+ self._rx_dict['enabled'] = bool(bool_value)
+
+ @property
+ def seq_enabled(self):
+ return self._rx_dict.get("seq_enabled")
+
+ @seq_enabled.setter
+ def seq_enabled(self, bool_value):
+ self._rx_dict['seq_enabled'] = bool(bool_value)
+
+ @property
+ def latency_enabled(self):
+ return self._rx_dict.get("latency_enabled")
+
+ @latency_enabled.setter
+ def latency_enabled(self, bool_value):
+ self._rx_dict['latency_enabled'] = bool(bool_value)
+
+ def dump(self):
+ return {k: v
+ for k, v in self._rx_dict.items()
+ if v
+ }
+
+
+class CTxMode(object):
+ """docstring for CTxMode"""
+ def __init__(self, tx_mode, pps):
+ super(CTxMode, self).__init__()
+ if tx_mode not in ["continuous", "single_burst", "multi_burst"]:
+ raise ValueError("Unknown TX mode ('{0}')has been initialized.".format(tx_mode))
+ self._tx_mode = tx_mode
+ self._fields = {'pps': float(pps)}
+ if tx_mode == "single_burst":
+ self._fields['total_pkts'] = 0
+ elif tx_mode == "multi_burst":
+ self._fields['pkts_per_burst'] = 0
+ self._fields['ibg'] = 0.0
+ self._fields['count'] = 0
+ else:
+ pass
+
+ def set_tx_mode_attr(self, attr, val):
+ if attr in self._fields:
+ self._fields[attr] = type(self._fields.get(attr))(val)
+ else:
+ raise ValueError("The provided attribute ('{0}') is not a legal attribute in selected TX mode ('{1}')".
+ format(attr, self._tx_mode))
+ def dump(self):
+ dump = {"type": self._tx_mode}
+ dump.update({k: v
+ for k, v in self._fields.items()
+ })
+ return dump
if __name__ == "__main__":