diff options
author | Dan Klein <danklein10@gmail.com> | 2015-10-07 13:47:18 +0300 |
---|---|---|
committer | Dan Klein <danklein10@gmail.com> | 2015-10-07 13:47:18 +0300 |
commit | 4f286bfefa6bbb0be4cdcf1fb004c82fc334c21f (patch) | |
tree | a01524843895e9f272974835d5d20390fae40170 /scripts | |
parent | bafc3ec4b2686cdec4ac1c33f69f7607f368d4ce (diff) |
progress in TRexStatelessClient module
mainly at batching support
Diffstat (limited to 'scripts')
-rwxr-xr-x | scripts/automation/trex_control_plane/client/trex_stateless_client.py | 202 | ||||
-rwxr-xr-x | scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py | 6 |
2 files changed, 139 insertions, 69 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 3a48c612..412bdc09 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -17,17 +17,18 @@ class CTRexStatelessClient(object): """docstring for CTRexStatelessClient""" RpcCmdData = namedtuple('RpcCmdData', ['method', 'params']) - def __init__(self, server="localhost", port=5050, virtual=False): + def __init__(self, username, server="localhost", port=5050, virtual=False): super(CTRexStatelessClient, self).__init__() + self.user = username self.tx_link = CTRexStatelessClient.CTxLink(server, port, virtual) self._conn_handler = {} self._active_ports = set() self._port_stats = CTRexStatsManager() self._stream_stats = CTRexStatsManager() - + self._system_info = None # ----- decorator methods ----- # - def force_status(owned=True, active=False): + def force_status(owned=True, active_and_owned=False): def wrapper(func): def wrapper_f(self, *args, **kwargs): port_ids = kwargs.get("port_id") @@ -36,52 +37,79 @@ class CTRexStatelessClient(object): port_ids = [port_ids] bad_ids = set() for port_id in port_ids: - if not self._conn_handler.get(kwargs.get(port_id)): + port_owned = self._conn_handler.get(kwargs.get(port_id)) + if owned and not port_owned: bad_ids.add(port_ids) + elif active_and_owned: # stronger condition than just owned, hence gets precedence + if port_owned and port_id in self._active_ports: + continue + else: + bad_ids.add(port_ids) + else: + continue if bad_ids: # Some port IDs are not according to desires status - own_str = "owned" if owned else "not-owned" - act_str = "active" if active else "non-active" - raise RuntimeError("The requested method ('{0}') cannot be invoked since port IDs {1} are not both" \ - "{2} and {3}".format(func.__name__, - bad_ids, - own_str, - act_str)) + raise RuntimeError("The requested method ('{0}') cannot be invoked since port IDs {1} are not" \ + "at allowed stated".format(func.__name__)) else: func(self, *args, **kwargs) return wrapper_f return wrapper - # def owned(func): - # def wrapper(self, *args, **kwargs): - # if self._conn_handler.get(kwargs.get("port_id")): - # return func(self, *args, **kwargs) - # else: - # raise RuntimeError("The requested method ('{0}') cannot be invoked unless the desired port is owned". - # format(func.__name__)) - # return wrapper + @property + def system_info(self): + if not self._system_info: + self._system_info = self.get_system_info() + return self._system_info # ----- user-access methods ----- # - def acquire(self, port_id, username, force=False): + def ping(self): + return self.transmit("ping") + + def get_supported_cmds(self): + return self.transmit("get_supported_cmds") + + def get_version(self): + return self.transmit("get_version") + + def get_system_info(self): + return self.transmit("get_system_info") + + def get_port_count(self): + return self.system_info.get("port_count") + + def acquire(self, port_id, force=False): + if not CTRexStatelessClient._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") if isinstance(port_id, list) or isinstance(port_id, set): + # handle as batch mode port_ids = set(port_id) # convert to set to avoid duplications - commands = [self.RpcCmdData("acquire", {"port_id":p_id, "user":username, "force":force}) + commands = [self.RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) + # TODO: further processing here else: params = {"port_id": port_id, - "user": username, + "user": self.user, "force": force} self._conn_handler[port_id] = self.transmit("acquire", params) return self._conn_handler[port_id] @force_status(owned=True) def release(self, port_id=None): - self._conn_handler.pop(port_id) - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} - return self.transmit("release", params) + if isinstance(port_id, list) or isinstance(port_id, set): + # handle as batch mode + port_ids = set(port_id) # convert to set to avoid duplications + commands = [self.RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + for p_id in port_ids] + rc, resp_list = self.transmit_batch(commands) + # TODO: further processing here + else: + self._conn_handler.pop(port_id) + params = {"handler": self._conn_handler.get(port_id), + "port_id": port_id} + return self.transmit("release", params) @force_status(owned=True) def add_stream(self, stream_id, stream_obj, port_id=None): @@ -99,45 +127,77 @@ class CTRexStatelessClient(object): "stream_id": stream_id} return self.transmit("remove_stream", params) - @force_status(owned=True,active=) + @force_status(owned=True, active_and_owned=True) def get_stream_id_list(self, port_id=None): params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} return self.transmit("get_stream_list", params) - @owned + @force_status(owned=True, active_and_owned=True) def get_stream(self, stream_id, port_id=None): params = {"handler": self._conn_handler.get(port_id), "port_id": port_id, "stream_id": stream_id} return self.transmit("get_stream_list", params) - @owned + @force_status(owned=True) def start_traffic(self, port_id=None): - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} - return self.transmit("start_traffic", params) + if isinstance(port_id, list) or isinstance(port_id, set): + # handle as batch mode + port_ids = set(port_id) # convert to set to avoid duplications + commands = [self.RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + for p_id in port_ids] + rc, resp_list = self.transmit_batch(commands) + # TODO: further processing here + else: + params = {"handler": self._conn_handler.get(port_id), + "port_id": port_id} + return self.transmit("start_traffic", params) - @owned + @force_status(owned=False, active_and_owned=True) def stop_traffic(self, port_id=None): - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} - return self.transmit("stop_traffic", params) + if isinstance(port_id, list) or isinstance(port_id, set): + # handle as batch mode + port_ids = set(port_id) # convert to set to avoid duplications + commands = [self.RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + for p_id in port_ids] + rc, resp_list = self.transmit_batch(commands) + # TODO: further processing here + else: + params = {"handler": self._conn_handler.get(port_id), + "port_id": port_id} + return self.transmit("stop_traffic", params) def get_global_stats(self): return self.transmit("get_global_stats") - @owned + @force_status(owned=True, active_and_owned=True) def get_port_stats(self, port_id=None): - params = {"handler": self._conn_handler.get(port_id), # TODO: verify if needed - "port_id": port_id} - return self.transmit("get_port_stats", params) + if isinstance(port_id, list) or isinstance(port_id, set): + # handle as batch mode + port_ids = set(port_id) # convert to set to avoid duplications + commands = [self.RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + for p_id in port_ids] + rc, resp_list = self.transmit_batch(commands) + # TODO: further processing here + else: + params = {"handler": self._conn_handler.get(port_id), + "port_id": port_id} + return self.transmit("get_port_stats", params) - @owned + @force_status(owned=True, active_and_owned=True) def get_stream_stats(self, port_id=None): - params = {"handler": self._conn_handler.get(port_id), # TODO: verify if needed - "port_id": port_id} - return self.transmit("get_stream_stats", params) + if isinstance(port_id, list) or isinstance(port_id, set): + # handle as batch mode + port_ids = set(port_id) # convert to set to avoid duplications + commands = [self.RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + for p_id in port_ids] + rc, resp_list = self.transmit_batch(commands) + # TODO: further processing here + else: + params = {"handler": self._conn_handler.get(port_id), + "port_id": port_id} + return self.transmit("get_stream_stats", params) # ----- internal methods ----- # def transmit(self, method_name, params={}): @@ -148,18 +208,25 @@ class CTRexStatelessClient(object): @staticmethod def _object_decoder(obj_type, obj_data): - if obj_type=="global": + if obj_type == "global": return CGlobalStats(**obj_data) - elif obj_type=="port": + elif obj_type == "port": return CPortStats(**obj_data) - elif obj_type=="stream": + elif obj_type == "stream": return CStreamStats(**obj_data) else: # Do not serialize the data into class return obj_data - - + def _is_ports_valid(self, port_id): + if isinstance(port_id, list) or isinstance(port_id, set): + # check each item of the sequence + return all([CTRexStatelessClient._is_ports_valid(port) + for port in port_id]) + elif (isinstance(port_id, int)) and (port_id > 0) and (port_id <= self.get_port_count()): + return True + else: + return False # ------ private classes ------ # @@ -196,12 +263,12 @@ class CTRexStatelessClient(object): # invoke the batch return batch.invoke() - def _prompt_virtual_tx_msg(self): print "Transmitting virtually over tcp://{server}:{port}".format( server=self.server, port=self.port) + class CStream(object): """docstring for CStream""" DEFAULTS = {"rx_stats": CRxStats, @@ -217,7 +284,9 @@ class CStream(object): setattr(self, k, v) # set default values to unset attributes, according to DEFAULTS dict set_keys = set(kwargs.keys()) - keys_to_set = [x for x in self.DEFAULTS if x not in set_keys] + keys_to_set = [x + for x in self.DEFAULTS + if x not in set_keys] for key in keys_to_set: default = self.DEFAULTS.get(key) if type(default)==type: @@ -260,21 +329,21 @@ class CStream(object): def dump(self): pass - return {"enabled":self.enabled, - "self_start":self.self_start, - "isg":self.isg, - "next_stream":self.next_stream, - "packet":self.packet.dump_pkt(), - "mode":self.mode.dump(), - "vm":self.packet.get_vm_data(), - "rx_stats":self.rx_stats.dump()} + return {"enabled": self.enabled, + "self_start": self.self_start, + "isg": self.isg, + "next_stream": self.next_stream, + "packet": self.packet.dump_pkt(), + "mode": self.mode.dump(), + "vm": self.packet.get_vm_data(), + "rx_stats": self.rx_stats.dump()} class CRxStats(object): def __init__(self, enabled=False, seq_enabled=False, latency_enabled=False): - self._rx_dict = {"enabled" : enabled, - "seq_enabled" : seq_enabled, - "latency_enabled" : latency_enabled} + self._rx_dict = {"enabled": enabled, + "seq_enabled": seq_enabled, + "latency_enabled": latency_enabled} @property def enabled(self): @@ -301,11 +370,12 @@ class CRxStats(object): self._rx_dict['latency_enabled'] = bool(bool_value) def dump(self): - return {k:v + return {k: v for k,v in self._rx_dict.items() if v } + class CTxMode(object): """docstring for CTxMode""" def __init__(self, tx_mode, pps): @@ -313,7 +383,7 @@ class CTxMode(object): if tx_mode not in ["continuous", "single_burst", "multi_burst"]: raise ValueError("Unknown TX mode ('{0}')has been initialized.".format(tx_mode)) self._tx_mode = tx_mode - self._fields = {'pps':float(pps)} + self._fields = {'pps': float(pps)} if tx_mode == "single_burst": self._fields['total_pkts'] = 0 elif tx_mode == "multi_burst": @@ -331,8 +401,8 @@ class CTxMode(object): format(attr, self._tx_mode)) def dump(self): - dump = {"type":self._tx_mode} - dump.update({k:v + dump = {"type": self._tx_mode} + dump.update({k: v for k, v in self._fields.items() }) return dump diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py index db5ddc51..b8b1734d 100755 --- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py +++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py @@ -171,7 +171,7 @@ class JsonRpcClient(object): def process_single_response (self, response_json): if (response_json.get("jsonrpc") != "2.0"): - return False, "Malfromed Response ({0})".format(str(response)) + return False, "Malformed Response ({0})".format(str(response)) # error reported by server if ("error" in response_json): @@ -182,7 +182,7 @@ class JsonRpcClient(object): # if no error there should be a result if ("result" not in response_json): - return False, "Malfromed Response ({0})".format(str(response)) + return False, "Malformed Response ({0})".format(str(response)) return True, response_json["result"] @@ -191,7 +191,7 @@ class JsonRpcClient(object): def set_verbose(self, mode): self.verbose = mode - def disconnect (self): + def disconnect(self): if self.connected: self.socket.close(linger = 0) self.context.destroy(linger = 0) |