From 20a7b8ed2a75debc3f2015d571fb4faf2cfc8b13 Mon Sep 17 00:00:00 2001 From: Dan Klein Date: Sun, 20 Sep 2015 23:56:04 +0300 Subject: minor updated to client API library, adding the CRxStats class --- .../client/trex_stateless_client.py | 46 ++++++++++++++++++++-- 1 file changed, 43 insertions(+), 3 deletions(-) (limited to 'scripts/automation/trex_control_plane/client') diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 5513f420..a2c148ce 100644 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -7,7 +7,8 @@ except ImportError: # support import for Python 3 import client.outer_packages from client_utils.jsonrpc_client import JsonRpcClient - +from client_utils.packet_builder import CTRexPktBuilder +import json class CTRexStatelessClient(object): @@ -16,11 +17,50 @@ class CTRexStatelessClient(object): super(CTRexStatelessClient, self).__init__() self.tx_link = CTRexStatelessClient.CTxLink(server, port, virtual) + def add_stream(self): + pass - def transmit(self, method_name, params = {}): + def transmit(self, method_name, params={}): return self.tx_link.transmit(method_name, params) + # ------ private classes ------ # + class CRxStats(object): + + def __init__(self, enabled=False, seq_enabled=False, latency_enabled=False): + self._rx_dict = {"enabled" : enabled, + "seq_enabled" : seq_enabled, + "latency_enabled" : latency_enabled} + + @property + def enabled(self): + return self._rx_dict.get("enabled") + + @enabled.setter + def enabled(self, bool_value): + self._rx_dict['enabled'] = bool_value + + @property + def seq_enabled(self): + return self._rx_dict.get("seq_enabled") + + @seq_enabled.setter + def seq_enabled(self, bool_value): + self._rx_dict['seq_enabled'] = bool_value + + @property + def latency_enabled(self): + return self._rx_dict.get("latency_enabled") + + @latency_enabled.setter + def latency_enabled(self, bool_value): + self._rx_dict['latency_enabled'] = bool_value + + def dump(self): + return json.dumps({i:self._rx_dict.get(i) + for i in self._rx_dict.keys() + if self._rx_dict.get(i) + }) class CTxLink(object): """describes the connectivity of the stateless client method""" @@ -33,7 +73,7 @@ class CTRexStatelessClient(object): if not self.virtual: self.rpc_link.connect() - def transmit(self, method_name, params = {}): + def transmit(self, method_name, params={}): if self.virtual: print "Transmitting virtually over tcp://{server}:{port}".format( server=self.server, -- cgit From 2965b8f7e7ca924af9ce6d30b2c0b586ec3e21c5 Mon Sep 17 00:00:00 2001 From: Dan Klein Date: Mon, 21 Sep 2015 11:22:53 +0300 Subject: Updated implementation of add_stream method, created subclasses to reflect vairous objects. --- .../client/trex_stateless_client.py | 101 +++++++++++++++++++-- 1 file changed, 92 insertions(+), 9 deletions(-) (limited to 'scripts/automation/trex_control_plane/client') diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index a2c148ce..322d7319 100644 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -17,14 +17,66 @@ class CTRexStatelessClient(object): super(CTRexStatelessClient, self).__init__() self.tx_link = CTRexStatelessClient.CTxLink(server, port, virtual) - def add_stream(self): - pass + def add_stream(self, handler, port_id, stream_id, stream_obj): + assert isinstance(stream_obj, CTRexStatelessClient.CStream) + params = {"handler":handler, + "port_id":port_id, + "stream_id":stream_id, + "stream":stream_obj.dump()} + return self.transmit("add_stream", params) def transmit(self, method_name, params={}): return self.tx_link.transmit(method_name, params) # ------ private classes ------ # + class CStream(object): + """docstring for CStream""" + def __init__(self): + super(CStream, self).__init__() + self.packet = CTRexPktBuilder() + self.rx_stats = CRxStats() + self.mode = CTxMode() + self.isg + self._next_stream = -1 + self._self_start + self._enabled + + @property + def enabled(self): + return self._enabled + + @enabled.setter + def enabled(self, bool_value): + self._enabled = bool(bool_value) + + @property + def self_start(self): + return self._self_start + + @enabled.setter + def self_start(self, bool_value): + self._self_start = bool(bool_value) + + @property + def next_stream(self): + return self._next_stream + + @enabled.setter + def next_stream(self, value): + self._next_stream = int(bool_value) + + def dump(self): + pass + return {"enabled":self.enabled, + "self_start":self.self_start, + "isg":self.isg, + "next_stream":self.next_stream, + "packet":self.packet.dump_pkt(), + "mode":self.mode.dump(), + "vm":self.packet.dump_vm_instructions(), # TODO - add this method to packet builder module + "rx_stats":self.rx_stats.dump()}) + class CRxStats(object): def __init__(self, enabled=False, seq_enabled=False, latency_enabled=False): @@ -38,7 +90,7 @@ class CTRexStatelessClient(object): @enabled.setter def enabled(self, bool_value): - self._rx_dict['enabled'] = bool_value + self._rx_dict['enabled'] = bool(bool_value) @property def seq_enabled(self): @@ -46,7 +98,7 @@ class CTRexStatelessClient(object): @seq_enabled.setter def seq_enabled(self, bool_value): - self._rx_dict['seq_enabled'] = bool_value + self._rx_dict['seq_enabled'] = bool(bool_value) @property def latency_enabled(self): @@ -54,13 +106,44 @@ class CTRexStatelessClient(object): @latency_enabled.setter def latency_enabled(self, bool_value): - self._rx_dict['latency_enabled'] = bool_value + self._rx_dict['latency_enabled'] = bool(bool_value) + + def dump(self): + return {k:v + for k,v in self._rx_dict.items() + if v + }) + + class CTxMode(object): + """docstring for CTxMode""" + def __init__(self, tx_mode, pps): + super(CTxMode, self).__init__() + if tx_mode not in ["continuous", "single_burst", "multi_burst"]: + raise ValueError("Unknown TX mode ('{0}')has been initialized.".format(tx_mode)) + self._tx_mode = tx_mode + self._fields = {'pps':float(pps)} + if tx_mode == "single_burst": + self._fields['total_pkts'] = 0 + elif tx_mode == "multi_burst": + self._fields['pkts_per_burst'] = 0 + self._fields['ibg'] = 0.0 + self._fields['count'] = 0 + else: + pass + + def set_tx_mode_attr(self, attr, val): + if attr in self._fields: + self._fields[attr] = type(self._fields.get(attr))(val) + else: + raise ValueError("The provided attribute ('{0}') is not a legal attribute in selected TX mode ('{1}')". + format(attr, self._tx_mode)) def dump(self): - return json.dumps({i:self._rx_dict.get(i) - for i in self._rx_dict.keys() - if self._rx_dict.get(i) - }) + dump = {"type":self._tx_mode} + dump.update({k:v + for k, v in self._fields.items() + }) + return dump class CTxLink(object): """describes the connectivity of the stateless client method""" -- cgit From b3ac7facbbbc4815388298534fdfdd161ce89534 Mon Sep 17 00:00:00 2001 From: Dan Klein Date: Tue, 22 Sep 2015 13:56:49 +0300 Subject: updated packet builder and stateless client modules --- .../trex_control_plane/client/trex_stateless_client.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'scripts/automation/trex_control_plane/client') diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 322d7319..12f6eff6 100644 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -33,10 +33,10 @@ class CTRexStatelessClient(object): class CStream(object): """docstring for CStream""" def __init__(self): - super(CStream, self).__init__() + super(CTRexStatelessClient.CStream, self).__init__() self.packet = CTRexPktBuilder() - self.rx_stats = CRxStats() - self.mode = CTxMode() + self.rx_stats = CTRexStatelessClient.CRxStats() + self.mode = CTRexStatelessClient.CTxMode() self.isg self._next_stream = -1 self._self_start @@ -54,7 +54,7 @@ class CTRexStatelessClient(object): def self_start(self): return self._self_start - @enabled.setter + @self_start.setter def self_start(self, bool_value): self._self_start = bool(bool_value) @@ -62,7 +62,7 @@ class CTRexStatelessClient(object): def next_stream(self): return self._next_stream - @enabled.setter + @next_stream.setter def next_stream(self, value): self._next_stream = int(bool_value) @@ -75,7 +75,7 @@ class CTRexStatelessClient(object): "packet":self.packet.dump_pkt(), "mode":self.mode.dump(), "vm":self.packet.dump_vm_instructions(), # TODO - add this method to packet builder module - "rx_stats":self.rx_stats.dump()}) + "rx_stats":self.rx_stats.dump()} class CRxStats(object): @@ -112,7 +112,7 @@ class CTRexStatelessClient(object): return {k:v for k,v in self._rx_dict.items() if v - }) + } class CTxMode(object): """docstring for CTxMode""" -- cgit From 9629c9953516281d4bdaad1ed63d145de336a983 Mon Sep 17 00:00:00 2001 From: Dan Klein Date: Tue, 6 Oct 2015 07:28:24 +0300 Subject: Commiting last night progress --- .../client/trex_stateless_client.py | 300 +++++++++++++-------- 1 file changed, 182 insertions(+), 118 deletions(-) (limited to 'scripts/automation/trex_control_plane/client') diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 12f6eff6..b7580531 100644 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -16,135 +16,66 @@ class CTRexStatelessClient(object): def __init__(self, server="localhost", port=5050, virtual=False): super(CTRexStatelessClient, self).__init__() self.tx_link = CTRexStatelessClient.CTxLink(server, port, virtual) + self._conn_handler = {} - def add_stream(self, handler, port_id, stream_id, stream_obj): - assert isinstance(stream_obj, CTRexStatelessClient.CStream) - params = {"handler":handler, + def owned(func): + def wrapper(self, *args, **kwargs ) : + if self._conn_handler.get(kwargs.get("port_id")): + return func(self, *args, **kwargs) + else: + raise RuntimeError("The requested method ('{0}') cannot be invoked unless the desired port is owned". + format(func.__name__)) + return wrapper + + def acquire(self, port_id, username, force=False): + params = {"port_id": port_id, + "user": username, + "force": force} + self._conn_handler[port_id] = self.transmit("acquire", params) + return self._conn_handler + + @owned + def release(self, port_id=None): + self._conn_handler.pop(port_id) + params = {"handler":self._conn_handler.get(port_id), + "port_id": port_id} + return self.transmit("release", params) + + @owned + def add_stream(self, stream_id, stream_obj, port_id=None): + assert isinstance(stream_obj, CStream) + params = {"handler":self._conn_handler.get(port_id), "port_id":port_id, "stream_id":stream_id, "stream":stream_obj.dump()} return self.transmit("add_stream", params) - def transmit(self, method_name, params={}): - return self.tx_link.transmit(method_name, params) + @owned + def remove_stream(self, stream_id, port_id=None): + params = {"handler":self._conn_handler.get(port_id), + "port_id":port_id, + "stream_id":stream_id} + return self.transmit("remove_stream", params) + @owned + def get_stream_list(self, port_id=None): + params = {"handler":self._conn_handler.get(port_id), + "port_id":port_id} + return self.transmit("get_stream_list", params) - # ------ private classes ------ # - class CStream(object): - """docstring for CStream""" - def __init__(self): - super(CTRexStatelessClient.CStream, self).__init__() - self.packet = CTRexPktBuilder() - self.rx_stats = CTRexStatelessClient.CRxStats() - self.mode = CTRexStatelessClient.CTxMode() - self.isg - self._next_stream = -1 - self._self_start - self._enabled - - @property - def enabled(self): - return self._enabled - - @enabled.setter - def enabled(self, bool_value): - self._enabled = bool(bool_value) - - @property - def self_start(self): - return self._self_start - - @self_start.setter - def self_start(self, bool_value): - self._self_start = bool(bool_value) - - @property - def next_stream(self): - return self._next_stream - - @next_stream.setter - def next_stream(self, value): - self._next_stream = int(bool_value) - - def dump(self): - pass - return {"enabled":self.enabled, - "self_start":self.self_start, - "isg":self.isg, - "next_stream":self.next_stream, - "packet":self.packet.dump_pkt(), - "mode":self.mode.dump(), - "vm":self.packet.dump_vm_instructions(), # TODO - add this method to packet builder module - "rx_stats":self.rx_stats.dump()} - - class CRxStats(object): - - def __init__(self, enabled=False, seq_enabled=False, latency_enabled=False): - self._rx_dict = {"enabled" : enabled, - "seq_enabled" : seq_enabled, - "latency_enabled" : latency_enabled} - - @property - def enabled(self): - return self._rx_dict.get("enabled") - - @enabled.setter - def enabled(self, bool_value): - self._rx_dict['enabled'] = bool(bool_value) - - @property - def seq_enabled(self): - return self._rx_dict.get("seq_enabled") - - @seq_enabled.setter - def seq_enabled(self, bool_value): - self._rx_dict['seq_enabled'] = bool(bool_value) - - @property - def latency_enabled(self): - return self._rx_dict.get("latency_enabled") - - @latency_enabled.setter - def latency_enabled(self, bool_value): - self._rx_dict['latency_enabled'] = bool(bool_value) - - def dump(self): - return {k:v - for k,v in self._rx_dict.items() - if v - } - - class CTxMode(object): - """docstring for CTxMode""" - def __init__(self, tx_mode, pps): - super(CTxMode, self).__init__() - if tx_mode not in ["continuous", "single_burst", "multi_burst"]: - raise ValueError("Unknown TX mode ('{0}')has been initialized.".format(tx_mode)) - self._tx_mode = tx_mode - self._fields = {'pps':float(pps)} - if tx_mode == "single_burst": - self._fields['total_pkts'] = 0 - elif tx_mode == "multi_burst": - self._fields['pkts_per_burst'] = 0 - self._fields['ibg'] = 0.0 - self._fields['count'] = 0 - else: - pass + @owned + def get_stream(self, stream_id, port_id=None): + params = {"handler":self._conn_handler.get(port_id), + "port_id":port_id} + return self.transmit("get_stream_list", params) - def set_tx_mode_attr(self, attr, val): - if attr in self._fields: - self._fields[attr] = type(self._fields.get(attr))(val) - else: - raise ValueError("The provided attribute ('{0}') is not a legal attribute in selected TX mode ('{1}')". - format(attr, self._tx_mode)) - def dump(self): - dump = {"type":self._tx_mode} - dump.update({k:v - for k, v in self._fields.items() - }) - return dump + def transmit(self, method_name, params={}): + return self.tx_link.transmit(method_name, params) + + + # ------ private classes ------ # class CTxLink(object): """describes the connectivity of the stateless client method""" def __init__(self, server="localhost", port=5050, virtual=False): @@ -167,7 +98,140 @@ class CTRexStatelessClient(object): else: return self.rpc_link.invoke_rpc_method(method_name, params) +class CStream(object): + """docstring for CStream""" + DEFAULTS = {"rx_stats": CRxStats, + "mode": CTxMode, + "isg": 5.0, + "next_stream": -1, + "self_start": True, + "enabled": True} + + def __init__(self, **kwargs): + super(CStream, self).__init__() + for k, v in kwargs.items(): + setattr(self, k, v) + # set default values to unset attributes, according to DEFAULTS dict + set_keys = set(kwargs.keys()) + keys_to_set = [x for x in self.DEFAULTS if x not in set_keys] + for key in keys_to_set: + default = self.DEFAULTS.get(key) + if type(default)==type: + setattr(self, key, default()) + else: + setattr(self, key, default) + + @property + def packet(self): + return self._packet + + @packet.setter + def packet(self, packet_obj): + assert isinstance(packet_obj, CTRexPktBuilder) + self._packet = packet_obj + + @property + def enabled(self): + return self._enabled + + @enabled.setter + def enabled(self, bool_value): + self._enabled = bool(bool_value) + + @property + def self_start(self): + return self._self_start + + @self_start.setter + def self_start(self, bool_value): + self._self_start = bool(bool_value) + + @property + def next_stream(self): + return self._next_stream + + @next_stream.setter + def next_stream(self, value): + self._next_stream = int(value) + + def dump(self): + pass + return {"enabled":self.enabled, + "self_start":self.self_start, + "isg":self.isg, + "next_stream":self.next_stream, + "packet":self.packet.dump_pkt(), + "mode":self.mode.dump(), + "vm":self.packet.get_vm_data(), + "rx_stats":self.rx_stats.dump()} + +class CRxStats(object): + + def __init__(self, enabled=False, seq_enabled=False, latency_enabled=False): + self._rx_dict = {"enabled" : enabled, + "seq_enabled" : seq_enabled, + "latency_enabled" : latency_enabled} + + @property + def enabled(self): + return self._rx_dict.get("enabled") + + @enabled.setter + def enabled(self, bool_value): + self._rx_dict['enabled'] = bool(bool_value) + + @property + def seq_enabled(self): + return self._rx_dict.get("seq_enabled") + + @seq_enabled.setter + def seq_enabled(self, bool_value): + self._rx_dict['seq_enabled'] = bool(bool_value) + + @property + def latency_enabled(self): + return self._rx_dict.get("latency_enabled") + + @latency_enabled.setter + def latency_enabled(self, bool_value): + self._rx_dict['latency_enabled'] = bool(bool_value) + + def dump(self): + return {k:v + for k,v in self._rx_dict.items() + if v + } + +class CTxMode(object): + """docstring for CTxMode""" + def __init__(self, tx_mode, pps): + super(CTxMode, self).__init__() + if tx_mode not in ["continuous", "single_burst", "multi_burst"]: + raise ValueError("Unknown TX mode ('{0}')has been initialized.".format(tx_mode)) + self._tx_mode = tx_mode + self._fields = {'pps':float(pps)} + if tx_mode == "single_burst": + self._fields['total_pkts'] = 0 + elif tx_mode == "multi_burst": + self._fields['pkts_per_burst'] = 0 + self._fields['ibg'] = 0.0 + self._fields['count'] = 0 + else: + pass + def set_tx_mode_attr(self, attr, val): + if attr in self._fields: + self._fields[attr] = type(self._fields.get(attr))(val) + else: + raise ValueError("The provided attribute ('{0}') is not a legal attribute in selected TX mode ('{1}')". + format(attr, self._tx_mode)) + + def dump(self): + dump = {"type":self._tx_mode} + dump.update({k:v + for k, v in self._fields.items() + }) + return dump if __name__ == "__main__": -- cgit From ada8c62d7393e22cee7fccf9eb1b16b8ebe3c7c8 Mon Sep 17 00:00:00 2001 From: Dan Klein Date: Tue, 6 Oct 2015 01:53:24 +0300 Subject: Removed any "T-Rex" string out of the code and documentation, using GREP! --- .../trex_control_plane/client/trex_client.py | 140 ++++++++++----------- 1 file changed, 70 insertions(+), 70 deletions(-) (limited to 'scripts/automation/trex_control_plane/client') diff --git a/scripts/automation/trex_control_plane/client/trex_client.py b/scripts/automation/trex_control_plane/client/trex_client.py index 56775766..c3677132 100755 --- a/scripts/automation/trex_control_plane/client/trex_client.py +++ b/scripts/automation/trex_control_plane/client/trex_client.py @@ -30,18 +30,18 @@ from distutils.util import strtobool class CTRexClient(object): """ - This class defines the client side of the RESTfull interaction with T-Rex + This class defines the client side of the RESTfull interaction with TRex """ def __init__(self, trex_host, max_history_size = 100, trex_daemon_port = 8090, trex_zmq_port = 4500, verbose = False): """ - Instantiate a T-Rex client object, and connecting it to listening daemon-server + Instantiate a TRex client object, and connecting it to listening daemon-server :parameters: trex_host : str - a string of the t-rex ip address or hostname. + a string of the TRex ip address or hostname. max_history_size : int - a number to set the maximum history size of a single T-Rex run. Each sampling adds a new item to history. + a number to set the maximum history size of a single TRex run. Each sampling adds a new item to history. default value : **100** trex_daemon_port : int @@ -69,7 +69,7 @@ class CTRexClient(object): self.result_obj = CTRexResult(max_history_size) self.decoder = JSONDecoder() self.trex_server_path = "http://{hostname}:{port}/".format( hostname = trex_host, port = trex_daemon_port ) - self.__verbose_print("Connecting to T-Rex @ {trex_path} ...".format( trex_path = self.trex_server_path ) ) + self.__verbose_print("Connecting to TRex @ {trex_path} ...".format( trex_path = self.trex_server_path ) ) self.history = jsonrpclib.history.History() self.server = jsonrpclib.Server(self.trex_server_path, history = self.history) self.check_server_connectivity() @@ -90,7 +90,7 @@ class CTRexClient(object): def start_trex (self, f, d, block_to_success = True, timeout = 30, user = None, **trex_cmd_options): """ - Request to start a T-Rex run on server. + Request to start a TRex run on server. :parameters: f : str @@ -98,17 +98,17 @@ class CTRexClient(object): d : int the desired duration of the test. must be at least 30 seconds long. block_to_success : bool - determine if this method blocks until T-Rex changes state from 'Starting' to either 'Idle' or 'Running' + determine if this method blocks until TRex changes state from 'Starting' to either 'Idle' or 'Running' default value : **True** timeout : int - maximum time (in seconds) to wait in blocking state until T-Rex changes state from 'Starting' to either 'Idle' or 'Running' + maximum time (in seconds) to wait in blocking state until TRex changes state from 'Starting' to either 'Idle' or 'Running' default value: **30** user : str the identity of the the run issuer. trex_cmd_options : key, val - sets desired T-Rex options using key=val syntax, separated by comma. + sets desired TRex options using key=val syntax, separated by comma. for keys with no value, state key=True :return: @@ -117,8 +117,8 @@ class CTRexClient(object): :raises: + :exc:`ValueError`, in case 'd' parameter inserted with wrong value. + :exc:`trex_exceptions.TRexError`, in case one of the trex_cmd_options raised an exception at server. - + :exc:`trex_exceptions.TRexInUseError`, in case T-Rex is already taken. - + :exc:`trex_exceptions.TRexRequestDenied`, in case T-Rex is reserved for another user than the one trying start T-Rex. + + :exc:`trex_exceptions.TRexInUseError`, in case TRex is already taken. + + :exc:`trex_exceptions.TRexRequestDenied`, in case TRex is reserved for another user than the one trying start TRex. + ProtocolError, in case of error in JSON-RPC protocol. """ @@ -128,7 +128,7 @@ class CTRexClient(object): if d < 30: # specify a test should take at least 30 seconds long. raise ValueError except ValueError: - raise ValueError('d parameter must be integer, specifying how long T-Rex run, and must be larger than 30 secs.') + raise ValueError('d parameter must be integer, specifying how long TRex run, and must be larger than 30 secs.') trex_cmd_options.update( {'f' : f, 'd' : d} ) @@ -146,25 +146,25 @@ class CTRexClient(object): if retval!=0: self.seq = retval # update seq num only on successful submission return True - else: # T-Rex is has been started by another user - raise TRexInUseError('T-Rex is already being used by another user or process. Try again once T-Rex is back in IDLE state.') + else: # TRex is has been started by another user + raise TRexInUseError('TRex is already being used by another user or process. Try again once TRex is back in IDLE state.') def stop_trex (self): """ - Request to stop a T-Rex run on server. + Request to stop a TRex run on server. - The request is only valid if the stop initiator is the same client as the T-Rex run initiator. + The request is only valid if the stop initiator is the same client as the TRex run initiator. :parameters: None :return: + **True** on successful termination - + **False** if request issued but T-Rex wasn't running. + + **False** if request issued but TRex wasn't running. :raises: - + :exc:`trex_exceptions.TRexRequestDenied`, in case T-Rex ir running but started by another user. - + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed T-Rex run (unexpected termination). + + :exc:`trex_exceptions.TRexRequestDenied`, in case TRex ir running but started by another user. + + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed TRex run (unexpected termination). + ProtocolError, in case of error in JSON-RPC protocol. """ @@ -179,16 +179,16 @@ class CTRexClient(object): def force_kill (self, confirm = True): """ - Force killing of running T-Rex process (if exists) on the server. + Force killing of running TRex process (if exists) on the server. .. tip:: This method is a safety method and **overrides any running or reserved resources**, and as such isn't designed to be used on a regular basis. Always consider using :func:`trex_client.CTRexClient.stop_trex` instead. - In the end of this method, T-Rex will return to IDLE state with no reservation. + In the end of this method, TRex will return to IDLE state with no reservation. :parameters: confirm : bool - Prompt a user confirmation before continue terminating T-Rex session + Prompt a user confirmation before continue terminating TRex session :return: + **True** on successful termination @@ -199,7 +199,7 @@ class CTRexClient(object): """ if confirm: - prompt = "WARNING: This will terminate active T-Rex session indiscriminately.\nAre you sure? " + prompt = "WARNING: This will terminate active TRex session indiscriminately.\nAre you sure? " sys.stdout.write('%s [y/n]\n' % prompt) while True: try: @@ -221,20 +221,20 @@ class CTRexClient(object): def wait_until_kickoff_finish(self, timeout = 40): """ - Block the client application until T-Rex changes state from 'Starting' to either 'Idle' or 'Running' + Block the client application until TRex changes state from 'Starting' to either 'Idle' or 'Running' - The request is only valid if the stop initiator is the same client as the T-Rex run initiator. + The request is only valid if the stop initiator is the same client as the TRex run initiator. :parameters: timeout : int - maximum time (in seconds) to wait in blocking state until T-Rex changes state from 'Starting' to either 'Idle' or 'Running' + maximum time (in seconds) to wait in blocking state until TRex changes state from 'Starting' to either 'Idle' or 'Running' :return: + **True** on successful termination - + **False** if request issued but T-Rex wasn't running. + + **False** if request issued but TRex wasn't running. :raises: - + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed T-Rex run (unexpected termination). + + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed TRex run (unexpected termination). + ProtocolError, in case of error in JSON-RPC protocol. .. note:: Exceptions are throws only when start_trex did not block in the first place, i.e. `block_to_success` parameter was set to `False` @@ -252,22 +252,22 @@ class CTRexClient(object): def is_running (self, dump_out = False): """ - Poll for T-Rex running status. + Poll for TRex running status. - If T-Rex is running, a history item will be added into result_obj and processed. + If TRex is running, a history item will be added into result_obj and processed. - .. tip:: This method is especially useful for iterating until T-Rex run is finished. + .. tip:: This method is especially useful for iterating until TRex run is finished. :parameters: dump_out : dict if passed, the pointer object is cleared and the latest dump stored in it. :return: - + **True** if T-Rex is running. - + **False** if T-Rex is not running. + + **True** if TRex is running. + + **False** if TRex is not running. :raises: - + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed T-Rex run (unexpected termination). + + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed TRex run (unexpected termination). + :exc:`TypeError`, in case JSON stream decoding error. + ProtocolError, in case of error in JSON-RPC protocol. @@ -292,7 +292,7 @@ class CTRexClient(object): def get_trex_files_path (self): """ - Fetches the local path in which files are stored when pushed to t-rex server from client. + Fetches the local path in which files are stored when pushed to TRex server from client. :parameters: None @@ -300,7 +300,7 @@ class CTRexClient(object): :return: string representation of the desired path - .. note:: The returned path represents a path on the T-Rex server **local machine** + .. note:: The returned path represents a path on the TRex server **local machine** :raises: ProtocolError, in case of error in JSON-RPC protocol. @@ -317,7 +317,7 @@ class CTRexClient(object): def get_running_status (self): """ - Fetches the current T-Rex status. + Fetches the current TRex status. If available, a verbose data will accompany the state itself. @@ -344,18 +344,18 @@ class CTRexClient(object): def get_running_info (self): """ - Performs single poll of T-Rex running data and process it into the result object (named `result_obj`). + Performs single poll of TRex running data and process it into the result object (named `result_obj`). - .. tip:: This method will throw an exception if T-Rex isn't running. Always consider using :func:`trex_client.CTRexClient.is_running` which handles a single poll operation in safer manner. + .. tip:: This method will throw an exception if TRex isn't running. Always consider using :func:`trex_client.CTRexClient.is_running` which handles a single poll operation in safer manner. :parameters: None :return: - dictionary containing the most updated data dump from T-Rex. + dictionary containing the most updated data dump from TRex. :raises: - + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed T-Rex run (unexpected termination). + + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed TRex run (unexpected termination). + :exc:`TypeError`, in case JSON stream decoding error. + ProtocolError, in case of error in JSON-RPC protocol. @@ -379,7 +379,7 @@ class CTRexClient(object): def sample_until_condition (self, condition_func, time_between_samples = 5): """ - Automatically sets ongoing sampling of T-Rex data, with sampling rate described by time_between_samples. + Automatically sets ongoing sampling of TRex data, with sampling rate described by time_between_samples. On each fetched dump, the condition_func is applied on the result objects, and if returns True, the sampling will stop. @@ -394,36 +394,36 @@ class CTRexClient(object): default value : **5** :return: - the first result object (see :class:`CTRexResult` for further details) of the T-Rex run on which the condition has been met. + the first result object (see :class:`CTRexResult` for further details) of the TRex run on which the condition has been met. :raises: + :exc:`UserWarning`, in case the condition_func method condition hasn't been met - + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed T-Rex run (unexpected termination). + + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed TRex run (unexpected termination). + :exc:`TypeError`, in case JSON stream decoding error. + ProtocolError, in case of error in JSON-RPC protocol. + :exc:`Exception`, in case the condition_func suffered from any kind of exception """ - # make sure T-Rex is running. raise exceptions here if any + # make sure TRex is running. raise exceptions here if any self.wait_until_kickoff_finish() try: while self.is_running(): results = self.get_result_obj() if condition_func(results): - # if condition satisfied, stop T-Rex and return result object + # if condition satisfied, stop TRex and return result object self.stop_trex() return results time.sleep(time_between_samples) except TRexWarning: # means we're back to Idle state, and didn't meet our condition - raise UserWarning("T-Rex results condition wasn't met during T-Rex run.") + raise UserWarning("TRex results condition wasn't met during TRex run.") except Exception: # this could come from provided method 'condition_func' raise def sample_to_run_finish (self, time_between_samples = 5): """ - Automatically sets automatically sampling of T-Rex data with sampling rate described by time_between_samples until T-Rex run finished. + Automatically sets automatically sampling of TRex data with sampling rate described by time_between_samples until TRex run finished. :parameters: time_between_samples : int @@ -436,7 +436,7 @@ class CTRexClient(object): :raises: + :exc:`UserWarning`, in case the condition_func method condition hasn't been met - + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed T-Rex run (unexpected termination). + + :exc:`trex_exceptions.TRexIncompleteRunError`, in case one of failed TRex run (unexpected termination). + :exc:`TypeError`, in case JSON stream decoding error. + ProtocolError, in case of error in JSON-RPC protocol. @@ -475,13 +475,13 @@ class CTRexClient(object): def is_reserved (self): """ - Checks if T-Rex is currently reserved to any user or not. + Checks if TRex is currently reserved to any user or not. :parameters: None :return: - + **True** if T-Rex is reserved. + + **True** if TRex is reserved. + **False** otherwise. :raises: @@ -499,13 +499,13 @@ class CTRexClient(object): def reserve_trex (self, user = None): """ - Reserves the usage of T-Rex to a certain user. + Reserves the usage of TRex to a certain user. - When T-Rex is reserved, it can't be reserved. + When TRex is reserved, it can't be reserved. :parameters: user : str - a username of the desired owner of T-Rex + a username of the desired owner of TRex default: current logged user @@ -513,8 +513,8 @@ class CTRexClient(object): **True** if reservation made successfully :raises: - + :exc:`trex_exceptions.TRexRequestDenied`, in case T-Rex is reserved for another user than the one trying to make the reservation. - + :exc:`trex_exceptions.TRexInUseError`, in case T-Rex is currently running. + + :exc:`trex_exceptions.TRexRequestDenied`, in case TRex is reserved for another user than the one trying to make the reservation. + + :exc:`trex_exceptions.TRexInUseError`, in case TRex is currently running. + ProtocolError, in case of error in JSON-RPC protocol. """ @@ -530,14 +530,14 @@ class CTRexClient(object): def cancel_reservation (self, user = None): """ - Cancels a current reservation of T-Rex to a certain user. + Cancels a current reservation of TRex to a certain user. - When T-Rex is reserved, no other user can start new T-Rex runs. + When TRex is reserved, no other user can start new TRex runs. :parameters: user : str - a username of the desired owner of T-Rex + a username of the desired owner of TRex default: current logged user @@ -546,7 +546,7 @@ class CTRexClient(object): + **False** if there was no reservation at all. :raises: - + :exc:`trex_exceptions.TRexRequestDenied`, in case T-Rex is reserved for another user than the one trying to cancel the reservation. + + :exc:`trex_exceptions.TRexRequestDenied`, in case TRex is reserved for another user than the one trying to cancel the reservation. + ProtocolError, in case of error in JSON-RPC protocol. """ @@ -627,7 +627,7 @@ class CTRexClient(object): return method_to_call() except socket.error as e: if e.errno == errno.ECONNREFUSED: - raise SocketError(errno.ECONNREFUSED, "Connection from T-Rex server was refused. Please make sure the server is up.") + raise SocketError(errno.ECONNREFUSED, "Connection from TRex server was refused. Please make sure the server is up.") def check_server_connectivity (self): """ @@ -640,7 +640,7 @@ class CTRexClient(object): raise socket.gaierror(e.errno, "Could not resolve server hostname. Please make sure hostname entered correctly.") except socket.error as e: if e.errno == errno.ECONNREFUSED: - raise socket.error(errno.ECONNREFUSED, "Connection from T-Rex server was refused. Please make sure the server is up.") + raise socket.error(errno.ECONNREFUSED, "Connection from TRex server was refused. Please make sure the server is up.") finally: self.prompt_verbose_data() @@ -671,7 +671,7 @@ class CTRexClient(object): def _handle_AppError_exception(self, err): """ - This private method triggres the T-Rex dedicated exception generation in case a general ProtocolError has been raised. + This private method triggres the TRex dedicated exception generation in case a general ProtocolError has been raised. """ # handle known exceptions based on known error codes. # if error code is not known, raise ProtocolError @@ -680,17 +680,17 @@ class CTRexClient(object): class CTRexResult(object): """ - A class containing all results received from T-Rex. + A class containing all results received from TRex. Ontop to containing the results, this class offers easier data access and extended results processing options """ def __init__(self, max_history_size): """ - Instatiate a T-Rex result object + Instatiate a TRex result object :parameters: max_history_size : int - a number to set the maximum history size of a single T-Rex run. Each sampling adds a new item to history. + a number to set the maximum history size of a single TRex run. Each sampling adds a new item to history. """ self._history = deque(maxlen = max_history_size) @@ -749,7 +749,7 @@ class CTRexResult(object): def get_avg_latency (self): """ - Fetches the average latency measured on each of the interfaces from the start of T-Rex run + Fetches the average latency measured on each of the interfaces from the start of TRex run :parameters: None @@ -779,7 +779,7 @@ class CTRexResult(object): def get_total_drops (self): """ - Fetches the total number of drops identified from the moment T-Rex run began. + Fetches the total number of drops identified from the moment TRex run began. :parameters: None @@ -835,7 +835,7 @@ class CTRexResult(object): def is_done_warmup (self): """ - Checks if T-Rex latest results TX-rate indicates that T-Rex has reached its expected TX-rate. + Checks if TRex latest results TX-rate indicates that TRex has reached its expected TX-rate. :parameters: None @@ -856,7 +856,7 @@ class CTRexResult(object): defines a path to desired data. .. tip:: | Use '.' to enter one level deeper in dictionary hierarchy. - | Use '[i]' to access the i'th indexed obejct of an array. + | Use '[i]' to access the i'th indexed object of an array. tree_path_to_key : regex apply a regex to filter results out from a multiple results set. -- cgit From 2b5c0e9fc7482584d2259a7f79496ea86bcf4b5a Mon Sep 17 00:00:00 2001 From: Dan Klein Date: Tue, 6 Oct 2015 14:43:39 +0300 Subject: progress in stateless client, added trex_stats --- .../client/trex_stateless_client.py | 90 ++++++++++++++++------ 1 file changed, 68 insertions(+), 22 deletions(-) (limited to 'scripts/automation/trex_control_plane/client') diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index b25d5cd5..9e49b852 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -9,6 +9,7 @@ except ImportError: from client_utils.jsonrpc_client import JsonRpcClient from client_utils.packet_builder import CTRexPktBuilder import json +from common.trex_stats import * class CTRexStatelessClient(object): @@ -17,31 +18,62 @@ class CTRexStatelessClient(object): super(CTRexStatelessClient, self).__init__() self.tx_link = CTRexStatelessClient.CTxLink(server, port, virtual) self._conn_handler = {} - - def owned(func): - def wrapper(self, *args, **kwargs): - if self._conn_handler.get(kwargs.get("port_id")): - return func(self, *args, **kwargs) - else: - raise RuntimeError("The requested method ('{0}') cannot be invoked unless the desired port is owned". - format(func.__name__)) + self._active_ports = set() + self._port_stats = CTRexStatsManager() + self._stream_stats = CTRexStatsManager() + + + # ----- decorator methods ----- # + def force_status(owned=True, active=False): + def wrapper(func): + def wrapper_f(self, *args, **kwargs): + port_ids = kwargs.get("port_id") + if isinstance(port_ids, int): + # make sure port_ids is a list + port_ids = [port_ids] + bad_ids = set() + for port_id in port_ids: + if not self._conn_handler.get(kwargs.get(port_id)): + bad_ids.add(port_ids) + if bad_ids: + # Some port IDs are not according to desires status + own_str = "owned" if owned else "not-owned" + act_str = "active" if active else "non-active" + raise RuntimeError("The requested method ('{0}') cannot be invoked since port IDs {1} are not both" \ + "{2} and {3}".format(func.__name__, + bad_ids, + own_str, + act_str)) + else: + func(self, *args, **kwargs) + return wrapper_f return wrapper + # def owned(func): + # def wrapper(self, *args, **kwargs): + # if self._conn_handler.get(kwargs.get("port_id")): + # return func(self, *args, **kwargs) + # else: + # raise RuntimeError("The requested method ('{0}') cannot be invoked unless the desired port is owned". + # format(func.__name__)) + # return wrapper + + # ----- user-access methods ----- # def acquire(self, port_id, username, force=False): params = {"port_id": port_id, "user": username, "force": force} self._conn_handler[port_id] = self.transmit("acquire", params) - return self._conn_handler + return self._conn_handler[port_id] - @owned + @force_status(owned=True) def release(self, port_id=None): self._conn_handler.pop(port_id) params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} return self.transmit("release", params) - @owned + @force_status(owned=True) def add_stream(self, stream_id, stream_obj, port_id=None): assert isinstance(stream_obj, CStream) params = {"handler": self._conn_handler.get(port_id), @@ -50,15 +82,15 @@ class CTRexStatelessClient(object): "stream": stream_obj.dump()} return self.transmit("add_stream", params) - @owned + @force_status(owned=True) def remove_stream(self, stream_id, port_id=None): params = {"handler": self._conn_handler.get(port_id), "port_id": port_id, "stream_id": stream_id} return self.transmit("remove_stream", params) - @owned - def get_stream_list(self, port_id=None): + @force_status(owned=True,active=) + def get_stream_id_list(self, port_id=None): params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} return self.transmit("get_stream_list", params) @@ -86,21 +118,35 @@ class CTRexStatelessClient(object): return self.transmit("get_global_stats") @owned - def stop_traffic(self, port_id=None): - params = {"handler": self._conn_handler.get(port_id), + def get_port_stats(self, port_id=None): + params = {"handler": self._conn_handler.get(port_id), # TODO: verify if needed "port_id": port_id} - return self.transmit("stop_traffic", params) - - - + return self.transmit("get_port_stats", params) + @owned + def get_stream_stats(self, port_id=None): + params = {"handler": self._conn_handler.get(port_id), # TODO: verify if needed + "port_id": port_id} + return self.transmit("get_stream_stats", params) + # ----- internal methods ----- # + def transmit(self, method_name, params={}): + return self.tx_link.transmit(method_name, params) + @staticmethod + def _object_decoder(obj_type, obj_data): + if obj_type=="global": + return CGlobalStats(**obj_data) + elif obj_type=="port": + return CPortStats(**obj_data) + elif obj_type=="stream": + return CStreamStats(**obj_data) + else: + # Do not serialize the data into class + return obj_data - def transmit(self, method_name, params={}): - return self.tx_link.transmit(method_name, params) # ------ private classes ------ # -- cgit From c27d9bf5bc25a1a9e063ca076ce2e99c02dfe31e Mon Sep 17 00:00:00 2001 From: Dan Klein Date: Wed, 7 Oct 2015 00:32:37 +0300 Subject: Incorporating batch commands in stateless client --- .../client/trex_stateless_client.py | 48 ++++++++++++++++++---- 1 file changed, 39 insertions(+), 9 deletions(-) (limited to 'scripts/automation/trex_control_plane/client') diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 9e49b852..3a48c612 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -6,14 +6,17 @@ try: except ImportError: # support import for Python 3 import client.outer_packages -from client_utils.jsonrpc_client import JsonRpcClient +from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage from client_utils.packet_builder import CTRexPktBuilder import json from common.trex_stats import * +from collections import namedtuple class CTRexStatelessClient(object): """docstring for CTRexStatelessClient""" + RpcCmdData = namedtuple('RpcCmdData', ['method', 'params']) + def __init__(self, server="localhost", port=5050, virtual=False): super(CTRexStatelessClient, self).__init__() self.tx_link = CTRexStatelessClient.CTxLink(server, port, virtual) @@ -60,11 +63,18 @@ class CTRexStatelessClient(object): # ----- user-access methods ----- # def acquire(self, port_id, username, force=False): - params = {"port_id": port_id, - "user": username, - "force": force} - self._conn_handler[port_id] = self.transmit("acquire", params) - return self._conn_handler[port_id] + if isinstance(port_id, list) or isinstance(port_id, set): + port_ids = set(port_id) # convert to set to avoid duplications + commands = [self.RpcCmdData("acquire", {"port_id":p_id, "user":username, "force":force}) + for p_id in port_ids] + rc, resp_list = self.transmit_batch(commands) + + else: + params = {"port_id": port_id, + "user": username, + "force": force} + self._conn_handler[port_id] = self.transmit("acquire", params) + return self._conn_handler[port_id] @force_status(owned=True) def release(self, port_id=None): @@ -133,6 +143,9 @@ class CTRexStatelessClient(object): def transmit(self, method_name, params={}): return self.tx_link.transmit(method_name, params) + def transmit_batch(self, batch_list): + return self.tx_link.transmit_batch(batch_list) + @staticmethod def _object_decoder(obj_type, obj_data): if obj_type=="global": @@ -163,15 +176,32 @@ class CTRexStatelessClient(object): def transmit(self, method_name, params={}): if self.virtual: - print "Transmitting virtually over tcp://{server}:{port}".format( - server=self.server, - port=self.port) + self._prompt_virtual_tx_msg() id, msg = self.rpc_link.create_jsonrpc_v2(method_name, params) print msg return else: return self.rpc_link.invoke_rpc_method(method_name, params) + def transmit_batch(self, batch_list): + if self.virtual: + self._prompt_virtual_tx_msg() + print [msg + for id, msg in [self.rpc_link.create_jsonrpc_v2(command.method, command.params) + for command in batch_list]] + else: + batch = self.rpc_link.create_batch() + for command in batch_list: + batch.add(command.method, command.params) + # invoke the batch + return batch.invoke() + + + def _prompt_virtual_tx_msg(self): + print "Transmitting virtually over tcp://{server}:{port}".format( + server=self.server, + port=self.port) + class CStream(object): """docstring for CStream""" DEFAULTS = {"rx_stats": CRxStats, -- cgit From 4f286bfefa6bbb0be4cdcf1fb004c82fc334c21f Mon Sep 17 00:00:00 2001 From: Dan Klein Date: Wed, 7 Oct 2015 13:47:18 +0300 Subject: progress in TRexStatelessClient module mainly at batching support --- .../client/trex_stateless_client.py | 202 ++++++++++++++------- 1 file changed, 136 insertions(+), 66 deletions(-) (limited to 'scripts/automation/trex_control_plane/client') diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 3a48c612..412bdc09 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -17,17 +17,18 @@ class CTRexStatelessClient(object): """docstring for CTRexStatelessClient""" RpcCmdData = namedtuple('RpcCmdData', ['method', 'params']) - def __init__(self, server="localhost", port=5050, virtual=False): + def __init__(self, username, server="localhost", port=5050, virtual=False): super(CTRexStatelessClient, self).__init__() + self.user = username self.tx_link = CTRexStatelessClient.CTxLink(server, port, virtual) self._conn_handler = {} self._active_ports = set() self._port_stats = CTRexStatsManager() self._stream_stats = CTRexStatsManager() - + self._system_info = None # ----- decorator methods ----- # - def force_status(owned=True, active=False): + def force_status(owned=True, active_and_owned=False): def wrapper(func): def wrapper_f(self, *args, **kwargs): port_ids = kwargs.get("port_id") @@ -36,52 +37,79 @@ class CTRexStatelessClient(object): port_ids = [port_ids] bad_ids = set() for port_id in port_ids: - if not self._conn_handler.get(kwargs.get(port_id)): + port_owned = self._conn_handler.get(kwargs.get(port_id)) + if owned and not port_owned: bad_ids.add(port_ids) + elif active_and_owned: # stronger condition than just owned, hence gets precedence + if port_owned and port_id in self._active_ports: + continue + else: + bad_ids.add(port_ids) + else: + continue if bad_ids: # Some port IDs are not according to desires status - own_str = "owned" if owned else "not-owned" - act_str = "active" if active else "non-active" - raise RuntimeError("The requested method ('{0}') cannot be invoked since port IDs {1} are not both" \ - "{2} and {3}".format(func.__name__, - bad_ids, - own_str, - act_str)) + raise RuntimeError("The requested method ('{0}') cannot be invoked since port IDs {1} are not" \ + "at allowed stated".format(func.__name__)) else: func(self, *args, **kwargs) return wrapper_f return wrapper - # def owned(func): - # def wrapper(self, *args, **kwargs): - # if self._conn_handler.get(kwargs.get("port_id")): - # return func(self, *args, **kwargs) - # else: - # raise RuntimeError("The requested method ('{0}') cannot be invoked unless the desired port is owned". - # format(func.__name__)) - # return wrapper + @property + def system_info(self): + if not self._system_info: + self._system_info = self.get_system_info() + return self._system_info # ----- user-access methods ----- # - def acquire(self, port_id, username, force=False): + def ping(self): + return self.transmit("ping") + + def get_supported_cmds(self): + return self.transmit("get_supported_cmds") + + def get_version(self): + return self.transmit("get_version") + + def get_system_info(self): + return self.transmit("get_system_info") + + def get_port_count(self): + return self.system_info.get("port_count") + + def acquire(self, port_id, force=False): + if not CTRexStatelessClient._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") if isinstance(port_id, list) or isinstance(port_id, set): + # handle as batch mode port_ids = set(port_id) # convert to set to avoid duplications - commands = [self.RpcCmdData("acquire", {"port_id":p_id, "user":username, "force":force}) + commands = [self.RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) + # TODO: further processing here else: params = {"port_id": port_id, - "user": username, + "user": self.user, "force": force} self._conn_handler[port_id] = self.transmit("acquire", params) return self._conn_handler[port_id] @force_status(owned=True) def release(self, port_id=None): - self._conn_handler.pop(port_id) - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} - return self.transmit("release", params) + if isinstance(port_id, list) or isinstance(port_id, set): + # handle as batch mode + port_ids = set(port_id) # convert to set to avoid duplications + commands = [self.RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + for p_id in port_ids] + rc, resp_list = self.transmit_batch(commands) + # TODO: further processing here + else: + self._conn_handler.pop(port_id) + params = {"handler": self._conn_handler.get(port_id), + "port_id": port_id} + return self.transmit("release", params) @force_status(owned=True) def add_stream(self, stream_id, stream_obj, port_id=None): @@ -99,45 +127,77 @@ class CTRexStatelessClient(object): "stream_id": stream_id} return self.transmit("remove_stream", params) - @force_status(owned=True,active=) + @force_status(owned=True, active_and_owned=True) def get_stream_id_list(self, port_id=None): params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} return self.transmit("get_stream_list", params) - @owned + @force_status(owned=True, active_and_owned=True) def get_stream(self, stream_id, port_id=None): params = {"handler": self._conn_handler.get(port_id), "port_id": port_id, "stream_id": stream_id} return self.transmit("get_stream_list", params) - @owned + @force_status(owned=True) def start_traffic(self, port_id=None): - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} - return self.transmit("start_traffic", params) + if isinstance(port_id, list) or isinstance(port_id, set): + # handle as batch mode + port_ids = set(port_id) # convert to set to avoid duplications + commands = [self.RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + for p_id in port_ids] + rc, resp_list = self.transmit_batch(commands) + # TODO: further processing here + else: + params = {"handler": self._conn_handler.get(port_id), + "port_id": port_id} + return self.transmit("start_traffic", params) - @owned + @force_status(owned=False, active_and_owned=True) def stop_traffic(self, port_id=None): - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} - return self.transmit("stop_traffic", params) + if isinstance(port_id, list) or isinstance(port_id, set): + # handle as batch mode + port_ids = set(port_id) # convert to set to avoid duplications + commands = [self.RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + for p_id in port_ids] + rc, resp_list = self.transmit_batch(commands) + # TODO: further processing here + else: + params = {"handler": self._conn_handler.get(port_id), + "port_id": port_id} + return self.transmit("stop_traffic", params) def get_global_stats(self): return self.transmit("get_global_stats") - @owned + @force_status(owned=True, active_and_owned=True) def get_port_stats(self, port_id=None): - params = {"handler": self._conn_handler.get(port_id), # TODO: verify if needed - "port_id": port_id} - return self.transmit("get_port_stats", params) + if isinstance(port_id, list) or isinstance(port_id, set): + # handle as batch mode + port_ids = set(port_id) # convert to set to avoid duplications + commands = [self.RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + for p_id in port_ids] + rc, resp_list = self.transmit_batch(commands) + # TODO: further processing here + else: + params = {"handler": self._conn_handler.get(port_id), + "port_id": port_id} + return self.transmit("get_port_stats", params) - @owned + @force_status(owned=True, active_and_owned=True) def get_stream_stats(self, port_id=None): - params = {"handler": self._conn_handler.get(port_id), # TODO: verify if needed - "port_id": port_id} - return self.transmit("get_stream_stats", params) + if isinstance(port_id, list) or isinstance(port_id, set): + # handle as batch mode + port_ids = set(port_id) # convert to set to avoid duplications + commands = [self.RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + for p_id in port_ids] + rc, resp_list = self.transmit_batch(commands) + # TODO: further processing here + else: + params = {"handler": self._conn_handler.get(port_id), + "port_id": port_id} + return self.transmit("get_stream_stats", params) # ----- internal methods ----- # def transmit(self, method_name, params={}): @@ -148,18 +208,25 @@ class CTRexStatelessClient(object): @staticmethod def _object_decoder(obj_type, obj_data): - if obj_type=="global": + if obj_type == "global": return CGlobalStats(**obj_data) - elif obj_type=="port": + elif obj_type == "port": return CPortStats(**obj_data) - elif obj_type=="stream": + elif obj_type == "stream": return CStreamStats(**obj_data) else: # Do not serialize the data into class return obj_data - - + def _is_ports_valid(self, port_id): + if isinstance(port_id, list) or isinstance(port_id, set): + # check each item of the sequence + return all([CTRexStatelessClient._is_ports_valid(port) + for port in port_id]) + elif (isinstance(port_id, int)) and (port_id > 0) and (port_id <= self.get_port_count()): + return True + else: + return False # ------ private classes ------ # @@ -196,12 +263,12 @@ class CTRexStatelessClient(object): # invoke the batch return batch.invoke() - def _prompt_virtual_tx_msg(self): print "Transmitting virtually over tcp://{server}:{port}".format( server=self.server, port=self.port) + class CStream(object): """docstring for CStream""" DEFAULTS = {"rx_stats": CRxStats, @@ -217,7 +284,9 @@ class CStream(object): setattr(self, k, v) # set default values to unset attributes, according to DEFAULTS dict set_keys = set(kwargs.keys()) - keys_to_set = [x for x in self.DEFAULTS if x not in set_keys] + keys_to_set = [x + for x in self.DEFAULTS + if x not in set_keys] for key in keys_to_set: default = self.DEFAULTS.get(key) if type(default)==type: @@ -260,21 +329,21 @@ class CStream(object): def dump(self): pass - return {"enabled":self.enabled, - "self_start":self.self_start, - "isg":self.isg, - "next_stream":self.next_stream, - "packet":self.packet.dump_pkt(), - "mode":self.mode.dump(), - "vm":self.packet.get_vm_data(), - "rx_stats":self.rx_stats.dump()} + return {"enabled": self.enabled, + "self_start": self.self_start, + "isg": self.isg, + "next_stream": self.next_stream, + "packet": self.packet.dump_pkt(), + "mode": self.mode.dump(), + "vm": self.packet.get_vm_data(), + "rx_stats": self.rx_stats.dump()} class CRxStats(object): def __init__(self, enabled=False, seq_enabled=False, latency_enabled=False): - self._rx_dict = {"enabled" : enabled, - "seq_enabled" : seq_enabled, - "latency_enabled" : latency_enabled} + self._rx_dict = {"enabled": enabled, + "seq_enabled": seq_enabled, + "latency_enabled": latency_enabled} @property def enabled(self): @@ -301,11 +370,12 @@ class CRxStats(object): self._rx_dict['latency_enabled'] = bool(bool_value) def dump(self): - return {k:v + return {k: v for k,v in self._rx_dict.items() if v } + class CTxMode(object): """docstring for CTxMode""" def __init__(self, tx_mode, pps): @@ -313,7 +383,7 @@ class CTxMode(object): if tx_mode not in ["continuous", "single_burst", "multi_burst"]: raise ValueError("Unknown TX mode ('{0}')has been initialized.".format(tx_mode)) self._tx_mode = tx_mode - self._fields = {'pps':float(pps)} + self._fields = {'pps': float(pps)} if tx_mode == "single_burst": self._fields['total_pkts'] = 0 elif tx_mode == "multi_burst": @@ -331,8 +401,8 @@ class CTxMode(object): format(attr, self._tx_mode)) def dump(self): - dump = {"type":self._tx_mode} - dump.update({k:v + dump = {"type": self._tx_mode} + dump.update({k: v for k, v in self._fields.items() }) return dump -- cgit From c3ced9cd49c609d8a25933012f9aa2e5db9298d9 Mon Sep 17 00:00:00 2001 From: Dan Klein Date: Wed, 7 Oct 2015 13:49:28 +0300 Subject: Applied port validity on port_id relevant methods --- .../trex_control_plane/client/trex_stateless_client.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) (limited to 'scripts/automation/trex_control_plane/client') diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 412bdc09..95ed8522 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -98,6 +98,8 @@ class CTRexStatelessClient(object): @force_status(owned=True) def release(self, port_id=None): + if not CTRexStatelessClient._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode port_ids = set(port_id) # convert to set to avoid duplications @@ -113,6 +115,8 @@ class CTRexStatelessClient(object): @force_status(owned=True) def add_stream(self, stream_id, stream_obj, port_id=None): + if not CTRexStatelessClient._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") assert isinstance(stream_obj, CStream) params = {"handler": self._conn_handler.get(port_id), "port_id": port_id, @@ -122,6 +126,8 @@ class CTRexStatelessClient(object): @force_status(owned=True) def remove_stream(self, stream_id, port_id=None): + if not CTRexStatelessClient._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") params = {"handler": self._conn_handler.get(port_id), "port_id": port_id, "stream_id": stream_id} @@ -129,12 +135,16 @@ class CTRexStatelessClient(object): @force_status(owned=True, active_and_owned=True) def get_stream_id_list(self, port_id=None): + if not CTRexStatelessClient._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} return self.transmit("get_stream_list", params) @force_status(owned=True, active_and_owned=True) def get_stream(self, stream_id, port_id=None): + if not CTRexStatelessClient._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") params = {"handler": self._conn_handler.get(port_id), "port_id": port_id, "stream_id": stream_id} @@ -142,6 +152,8 @@ class CTRexStatelessClient(object): @force_status(owned=True) def start_traffic(self, port_id=None): + if not CTRexStatelessClient._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode port_ids = set(port_id) # convert to set to avoid duplications @@ -156,6 +168,8 @@ class CTRexStatelessClient(object): @force_status(owned=False, active_and_owned=True) def stop_traffic(self, port_id=None): + if not CTRexStatelessClient._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode port_ids = set(port_id) # convert to set to avoid duplications @@ -173,6 +187,8 @@ class CTRexStatelessClient(object): @force_status(owned=True, active_and_owned=True) def get_port_stats(self, port_id=None): + if not CTRexStatelessClient._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode port_ids = set(port_id) # convert to set to avoid duplications @@ -187,6 +203,8 @@ class CTRexStatelessClient(object): @force_status(owned=True, active_and_owned=True) def get_stream_stats(self, port_id=None): + if not CTRexStatelessClient._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode port_ids = set(port_id) # convert to set to avoid duplications -- cgit From 086dac9854b9711cebf73d392973cae9358b6b0e Mon Sep 17 00:00:00 2001 From: Dan Klein Date: Wed, 7 Oct 2015 23:43:18 +0300 Subject: More progress in stateless client. Mainly more mature approach to handling results --- .../client/trex_stateless_client.py | 68 ++++++++++++++++++---- 1 file changed, 56 insertions(+), 12 deletions(-) (limited to 'scripts/automation/trex_control_plane/client') diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 95ed8522..faccf168 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -23,8 +23,7 @@ class CTRexStatelessClient(object): self.tx_link = CTRexStatelessClient.CTxLink(server, port, virtual) self._conn_handler = {} self._active_ports = set() - self._port_stats = CTRexStatsManager() - self._stream_stats = CTRexStatsManager() + self._stats = CTRexStatsManager("port", "stream") self._system_info = None # ----- decorator methods ----- # @@ -87,14 +86,15 @@ class CTRexStatelessClient(object): commands = [self.RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) - # TODO: further processing here - + if rc: + self._process_batch_result(commands, resp_list, self._handle_acquire_response) else: params = {"port_id": port_id, "user": self.user, "force": force} - self._conn_handler[port_id] = self.transmit("acquire", params) - return self._conn_handler[port_id] + command = self.RpcCmdData("acquire", params) + self._handle_acquire_response(command, self.transmit(command.method, command.params)) + return self._conn_handler.get(port_id) @force_status(owned=True) def release(self, port_id=None): @@ -106,12 +106,15 @@ class CTRexStatelessClient(object): commands = [self.RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) - # TODO: further processing here + if rc: + self._process_batch_result(commands, resp_list, self._handle_release_response) else: self._conn_handler.pop(port_id) params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} - return self.transmit("release", params) + command = self.RpcCmdData("release", params) + self._handle_release_response(command, self.transmit(command.method, command.params)) + return @force_status(owned=True) def add_stream(self, stream_id, stream_obj, port_id=None): @@ -160,11 +163,14 @@ class CTRexStatelessClient(object): commands = [self.RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) - # TODO: further processing here + if rc: + self._process_batch_result(commands, resp_list, self._handle_start_traffic_response) else: params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} - return self.transmit("start_traffic", params) + command = self.RpcCmdData("start_traffic", params) + self._handle_start_traffic_response(command, self.transmit(command.method, command.params)) + return @force_status(owned=False, active_and_owned=True) def stop_traffic(self, port_id=None): @@ -176,11 +182,14 @@ class CTRexStatelessClient(object): commands = [self.RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) - # TODO: further processing here + if rc: + self._process_batch_result(commands, resp_list, self._handle_stop_traffic_response) else: params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} - return self.transmit("stop_traffic", params) + command = self.RpcCmdData("stop_traffic", params) + self._handle_start_traffic_response(command, self.transmit(command.method, command.params)) + return def get_global_stats(self): return self.transmit("get_global_stats") @@ -236,6 +245,31 @@ class CTRexStatelessClient(object): # Do not serialize the data into class return obj_data + @staticmethod + def default_success_test(result_obj): + if result_obj[0]: + return True + else: + return False + + # ----- handler internal methods ----- # + def _handle_acquire_response(self, request, response): + if response[0]: + self._conn_handler[request.get("port_id")] = response[1] + + def _handle_release_response(self, request, response): + if response[0]: + del self._conn_handler[request.get("port_id")] + + def _handle_start_traffic_response(self, request, response): + if response[0]: + self._active_ports.add(request.get("port_id")) + + def _handle_stop_traffic_response(self, request, response): + if response[0]: + self._active_ports.remove(request.get("port_id")) + + def _is_ports_valid(self, port_id): if isinstance(port_id, list) or isinstance(port_id, set): # check each item of the sequence @@ -246,6 +280,16 @@ class CTRexStatelessClient(object): else: return False + def _process_batch_result(self, req_list, resp_list, handler_func=None, success_test=default_success_test): + for i, response in enumerate(resp_list): + if success_test(): + # run handler method with its params + handler_func(req_list[i], response) + else: + continue # TODO: mark in this case somehow the bad result + + + # ------ private classes ------ # class CTxLink(object): -- cgit From d1065266e17e514dab4aec87abab729a518cdf26 Mon Sep 17 00:00:00 2001 From: Dan Klein Date: Mon, 12 Oct 2015 07:12:24 +0300 Subject: Applied some code cosmetics --- .../client/trex_stateless_client.py | 32 ++++++++++------------ 1 file changed, 14 insertions(+), 18 deletions(-) (limited to 'scripts/automation/trex_control_plane/client') diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index faccf168..7c373e42 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -82,7 +82,7 @@ class CTRexStatelessClient(object): raise ValueError("Provided illegal port id input") if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications + port_ids = set(port_id) # convert to set to avoid duplications commands = [self.RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) @@ -159,7 +159,7 @@ class CTRexStatelessClient(object): raise ValueError("Provided illegal port id input") if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications + port_ids = set(port_id) # convert to set to avoid duplications commands = [self.RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) @@ -178,7 +178,7 @@ class CTRexStatelessClient(object): raise ValueError("Provided illegal port id input") if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications + port_ids = set(port_id) # convert to set to avoid duplications commands = [self.RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) @@ -200,7 +200,7 @@ class CTRexStatelessClient(object): raise ValueError("Provided illegal port id input") if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications + port_ids = set(port_id) # convert to set to avoid duplications commands = [self.RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) @@ -216,7 +216,7 @@ class CTRexStatelessClient(object): raise ValueError("Provided illegal port id input") if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications + port_ids = set(port_id) # convert to set to avoid duplications commands = [self.RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) @@ -263,12 +263,11 @@ class CTRexStatelessClient(object): def _handle_start_traffic_response(self, request, response): if response[0]: - self._active_ports.add(request.get("port_id")) + self._active_ports.add(request.get("port_id")) def _handle_stop_traffic_response(self, request, response): if response[0]: - self._active_ports.remove(request.get("port_id")) - + self._active_ports.remove(request.get("port_id")) def _is_ports_valid(self, port_id): if isinstance(port_id, list) or isinstance(port_id, set): @@ -289,8 +288,6 @@ class CTRexStatelessClient(object): continue # TODO: mark in this case somehow the bad result - - # ------ private classes ------ # class CTxLink(object): """describes the connectivity of the stateless client method""" @@ -316,8 +313,8 @@ class CTRexStatelessClient(object): if self.virtual: self._prompt_virtual_tx_msg() print [msg - for id, msg in [self.rpc_link.create_jsonrpc_v2(command.method, command.params) - for command in batch_list]] + for _, msg in [self.rpc_link.create_jsonrpc_v2(command.method, command.params) + for command in batch_list]] else: batch = self.rpc_link.create_batch() for command in batch_list: @@ -326,9 +323,8 @@ class CTRexStatelessClient(object): return batch.invoke() def _prompt_virtual_tx_msg(self): - print "Transmitting virtually over tcp://{server}:{port}".format( - server=self.server, - port=self.port) + print "Transmitting virtually over tcp://{server}:{port}".format(server=self.server, + port=self.port) class CStream(object): @@ -351,7 +347,7 @@ class CStream(object): if x not in set_keys] for key in keys_to_set: default = self.DEFAULTS.get(key) - if type(default)==type: + if type(default) == type: setattr(self, key, default()) else: setattr(self, key, default) @@ -433,7 +429,7 @@ class CRxStats(object): def dump(self): return {k: v - for k,v in self._rx_dict.items() + for k, v in self._rx_dict.items() if v } @@ -460,7 +456,7 @@ class CTxMode(object): self._fields[attr] = type(self._fields.get(attr))(val) else: raise ValueError("The provided attribute ('{0}') is not a legal attribute in selected TX mode ('{1}')". - format(attr, self._tx_mode)) + format(attr, self._tx_mode)) def dump(self): dump = {"type": self._tx_mode} -- cgit From a9f60d36e81c25244dad8f4f4c985f1e8e368c7c Mon Sep 17 00:00:00 2001 From: Dan Klein Date: Mon, 12 Oct 2015 00:27:49 +0300 Subject: Updated handlers of getter methods and stats (Global, port, stream). Also, set return values of RPC commands as namedtuples --- .../client/trex_stateless_client.py | 75 ++++++++++++++-------- 1 file changed, 50 insertions(+), 25 deletions(-) (limited to 'scripts/automation/trex_control_plane/client') diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 7c373e42..334496d1 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -48,7 +48,7 @@ class CTRexStatelessClient(object): continue if bad_ids: # Some port IDs are not according to desires status - raise RuntimeError("The requested method ('{0}') cannot be invoked since port IDs {1} are not" \ + raise RuntimeError("The requested method ('{0}') cannot be invoked since port IDs {1} are not" "at allowed stated".format(func.__name__)) else: func(self, *args, **kwargs) @@ -78,7 +78,7 @@ class CTRexStatelessClient(object): return self.system_info.get("port_count") def acquire(self, port_id, force=False): - if not CTRexStatelessClient._is_ports_valid(port_id): + if not self._is_ports_valid(port_id): raise ValueError("Provided illegal port id input") if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode @@ -98,7 +98,7 @@ class CTRexStatelessClient(object): @force_status(owned=True) def release(self, port_id=None): - if not CTRexStatelessClient._is_ports_valid(port_id): + if not self._is_ports_valid(port_id): raise ValueError("Provided illegal port id input") if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode @@ -118,7 +118,7 @@ class CTRexStatelessClient(object): @force_status(owned=True) def add_stream(self, stream_id, stream_obj, port_id=None): - if not CTRexStatelessClient._is_ports_valid(port_id): + if not self._is_ports_valid(port_id): raise ValueError("Provided illegal port id input") assert isinstance(stream_obj, CStream) params = {"handler": self._conn_handler.get(port_id), @@ -129,7 +129,7 @@ class CTRexStatelessClient(object): @force_status(owned=True) def remove_stream(self, stream_id, port_id=None): - if not CTRexStatelessClient._is_ports_valid(port_id): + if not self._is_ports_valid(port_id): raise ValueError("Provided illegal port id input") params = {"handler": self._conn_handler.get(port_id), "port_id": port_id, @@ -138,7 +138,7 @@ class CTRexStatelessClient(object): @force_status(owned=True, active_and_owned=True) def get_stream_id_list(self, port_id=None): - if not CTRexStatelessClient._is_ports_valid(port_id): + if not self._is_ports_valid(port_id): raise ValueError("Provided illegal port id input") params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} @@ -146,7 +146,7 @@ class CTRexStatelessClient(object): @force_status(owned=True, active_and_owned=True) def get_stream(self, stream_id, port_id=None): - if not CTRexStatelessClient._is_ports_valid(port_id): + if not self._is_ports_valid(port_id): raise ValueError("Provided illegal port id input") params = {"handler": self._conn_handler.get(port_id), "port_id": port_id, @@ -155,7 +155,7 @@ class CTRexStatelessClient(object): @force_status(owned=True) def start_traffic(self, port_id=None): - if not CTRexStatelessClient._is_ports_valid(port_id): + if not self._is_ports_valid(port_id): raise ValueError("Provided illegal port id input") if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode @@ -174,7 +174,7 @@ class CTRexStatelessClient(object): @force_status(owned=False, active_and_owned=True) def stop_traffic(self, port_id=None): - if not CTRexStatelessClient._is_ports_valid(port_id): + if not self._is_ports_valid(port_id): raise ValueError("Provided illegal port id input") if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode @@ -192,11 +192,13 @@ class CTRexStatelessClient(object): return def get_global_stats(self): - return self.transmit("get_global_stats") + command = self.RpcCmdData("get_global_stats", {}) + return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params)) + # return self.transmit("get_global_stats") @force_status(owned=True, active_and_owned=True) def get_port_stats(self, port_id=None): - if not CTRexStatelessClient._is_ports_valid(port_id): + if not self._is_ports_valid(port_id): raise ValueError("Provided illegal port id input") if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode @@ -204,15 +206,17 @@ class CTRexStatelessClient(object): commands = [self.RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) - # TODO: further processing here + if rc: + self._process_batch_result(commands, resp_list, self._handle_get_port_stats_response) else: params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} - return self.transmit("get_port_stats", params) + command = self.RpcCmdData("get_port_stats", params) + return self._handle_get_port_stats_response(command, self.transmit(command.method, command.params)) @force_status(owned=True, active_and_owned=True) def get_stream_stats(self, port_id=None): - if not CTRexStatelessClient._is_ports_valid(port_id): + if not self._is_ports_valid(port_id): raise ValueError("Provided illegal port id input") if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode @@ -220,11 +224,13 @@ class CTRexStatelessClient(object): commands = [self.RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) - # TODO: further processing here + if rc: + self._process_batch_result(commands, resp_list, self._handle_get_stream_stats_response) else: params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} - return self.transmit("get_stream_stats", params) + command = self.RpcCmdData("get_stream_stats", params) + return self._handle_get_stream_stats_response(command, self.transmit(command.method, command.params)) # ----- internal methods ----- # def transmit(self, method_name, params={}): @@ -247,32 +253,50 @@ class CTRexStatelessClient(object): @staticmethod def default_success_test(result_obj): - if result_obj[0]: + if result_obj.success: return True else: return False # ----- handler internal methods ----- # def _handle_acquire_response(self, request, response): - if response[0]: - self._conn_handler[request.get("port_id")] = response[1] + if response.success: + self._conn_handler[request.get("port_id")] = response.data def _handle_release_response(self, request, response): - if response[0]: + if response.success: del self._conn_handler[request.get("port_id")] def _handle_start_traffic_response(self, request, response): - if response[0]: + if response.success: self._active_ports.add(request.get("port_id")) def _handle_stop_traffic_response(self, request, response): - if response[0]: + if response.success: self._active_ports.remove(request.get("port_id")) + def _handle_get_global_stats_response(self, request, response): + if response.success: + return CGlobalStats(**response.success) + else: + return False + + def _handle_get_port_stats_response(self, request, response): + if response.success: + return CPortStats(**response.success) + else: + return False + + def _handle_get_stream_stats_response(self, request, response): + if response.success: + return CStreamStats(**response.success) + else: + return False + def _is_ports_valid(self, port_id): if isinstance(port_id, list) or isinstance(port_id, set): # check each item of the sequence - return all([CTRexStatelessClient._is_ports_valid(port) + return all([self._is_ports_valid(port) for port in port_id]) elif (isinstance(port_id, int)) and (port_id > 0) and (port_id <= self.get_port_count()): return True @@ -281,7 +305,8 @@ class CTRexStatelessClient(object): def _process_batch_result(self, req_list, resp_list, handler_func=None, success_test=default_success_test): for i, response in enumerate(resp_list): - if success_test(): + # testing each result with success test so that a conclusion report could be deployed in future. + if success_test(response): # run handler method with its params handler_func(req_list[i], response) else: @@ -303,7 +328,7 @@ class CTRexStatelessClient(object): def transmit(self, method_name, params={}): if self.virtual: self._prompt_virtual_tx_msg() - id, msg = self.rpc_link.create_jsonrpc_v2(method_name, params) + _, msg = self.rpc_link.create_jsonrpc_v2(method_name, params) print msg return else: -- cgit