From a39e4416cd78fc3b147695465c4de1c896b3face Mon Sep 17 00:00:00 2001 From: Dan Klein Date: Tue, 27 Oct 2015 10:00:18 +0200 Subject: more hltapi progress connect working --- .../client/trex_stateless_client.py | 185 ++++++++++++++++----- 1 file changed, 144 insertions(+), 41 deletions(-) (limited to 'scripts/automation/trex_control_plane/client/trex_stateless_client.py') diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index b976db66..97d3ec0a 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -13,10 +13,19 @@ from common.trex_stats import * from common.trex_streams import * from collections import namedtuple +RpcCmdData = namedtuple('RpcCmdData', ['method', 'params']) + +class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'])): + __slots__ = () + def __str__(self): + return "{id:^3} - {msg} ({stat})".format(id=self.id, + msg=self.msg, + stat="success" if self.success else "fail") + +# RpcResponseStatus = namedtuple('RpcResponseStatus', ['success', 'id', 'msg']) class CTRexStatelessClient(object): """docstring for CTRexStatelessClient""" - RpcCmdData = namedtuple('RpcCmdData', ['method', 'params']) def __init__(self, username, server="localhost", port=5050, virtual=False): super(CTRexStatelessClient, self).__init__() @@ -26,48 +35,69 @@ class CTRexStatelessClient(object): self._active_ports = set() self._stats = CTRexStatsManager("port", "stream") self._system_info = None + self._server_version = None + self.__err_log = None # ----- decorator methods ----- # def force_status(owned=True, active_and_owned=False): def wrapper(func): def wrapper_f(self, *args, **kwargs): port_ids = kwargs.get("port_id") + if not port_ids: + port_ids = args[0] if isinstance(port_ids, int): # make sure port_ids is a list port_ids = [port_ids] bad_ids = set() for port_id in port_ids: - port_owned = self._conn_handler.get(kwargs.get(port_id)) + port_owned = self._conn_handler.get(port_id) if owned and not port_owned: - bad_ids.add(port_ids) + bad_ids.add(port_id) elif active_and_owned: # stronger condition than just owned, hence gets precedence if port_owned and port_id in self._active_ports: continue else: - bad_ids.add(port_ids) + bad_ids.add(port_id) else: continue if bad_ids: # Some port IDs are not according to desires status raise RuntimeError("The requested method ('{0}') cannot be invoked since port IDs {1} are not" - "at allowed stated".format(func.__name__)) + "at allowed stated".format(func.__name__, list(bad_ids))) else: - func(self, *args, **kwargs) + return func(self, *args, **kwargs) return wrapper_f return wrapper @property def system_info(self): if not self._system_info: - self._system_info = self.get_system_info() - return self._system_info + rc, info = self.get_system_info() + if rc: + self._system_info = info + else: + self.__err_log = info + return self._system_info if self._system_info else "Unknown" + + @property + def server_version(self): + if not self._server_version: + rc, ver_info = self.get_version() + if rc: + self._server_version = ver_info + else: + self.__err_log = ver_info + return self._server_version if self._server_version else "Unknown" # ----- user-access methods ----- # def connect(self): - self.comm_link.connect() + rc, err = self.comm_link.connect() + if not rc: + return rc, err + return self._init_sync() def disconnect(self): - self.comm_link.disconnect() + return self.comm_link.disconnect() def ping(self): return self.transmit("ping") @@ -90,18 +120,19 @@ class CTRexStatelessClient(object): if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode port_ids = set(port_id) # convert to set to avoid duplications - commands = [self.RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force}) + commands = [RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) if rc: - self._process_batch_result(commands, resp_list, self._handle_acquire_response) + return self._process_batch_result(commands, resp_list, self._handle_acquire_response) else: params = {"port_id": port_id, "user": self.user, "force": force} - command = self.RpcCmdData("acquire", params) - self._handle_acquire_response(command, self.transmit(command.method, command.params)) - return self._conn_handler.get(port_id) + command = RpcCmdData("acquire", params) + return self._handle_acquire_response(command, + self.transmit(command.method, command.params), + self.default_success_test) @force_status(owned=True) def release(self, port_id=None): @@ -110,7 +141,7 @@ class CTRexStatelessClient(object): if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode port_ids = set(port_id) # convert to set to avoid duplications - commands = [self.RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + commands = [RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) if rc: @@ -119,7 +150,7 @@ class CTRexStatelessClient(object): self._conn_handler.pop(port_id) params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} - command = self.RpcCmdData("release", params) + command = RpcCmdData("release", params) self._handle_release_response(command, self.transmit(command.method, command.params)) return @@ -143,6 +174,28 @@ class CTRexStatelessClient(object): "stream_id": stream_id} return self.transmit("remove_stream", params) + @force_status(owned=True) + def remove_all_streams(self, port_id=None): + if not self._is_ports_valid(port_id): + raise ValueError("Provided illegal port id input") + if isinstance(port_id, list) or isinstance(port_id, set): + # handle as batch mode + port_ids = set(port_id) # convert to set to avoid duplications + commands = [RpcCmdData("remove_all_streams", {"port_id": p_id, "handler": self._conn_handler.get(p_id)}) + for p_id in port_ids] + rc, resp_list = self.transmit_batch(commands) + if rc: + return self._process_batch_result(commands, resp_list, self._handle_remove_streams_response, + success_test=self.ack_success_test) + else: + params = {"port_id": port_id, + "handler": self._conn_handler.get(port_id)} + command = RpcCmdData("remove_all_streams", params) + return self._handle_remove_streams_response(command, + self.transmit(command.method, command.params), + self.ack_success_test) + pass + @force_status(owned=True, active_and_owned=True) def get_stream_id_list(self, port_id=None): if not self._is_ports_valid(port_id): @@ -167,7 +220,7 @@ class CTRexStatelessClient(object): if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode port_ids = set(port_id) # convert to set to avoid duplications - commands = [self.RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + commands = [RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) if rc: @@ -175,7 +228,7 @@ class CTRexStatelessClient(object): else: params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} - command = self.RpcCmdData("start_traffic", params) + command = RpcCmdData("start_traffic", params) self._handle_start_traffic_response(command, self.transmit(command.method, command.params)) return @@ -186,7 +239,7 @@ class CTRexStatelessClient(object): if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode port_ids = set(port_id) # convert to set to avoid duplications - commands = [self.RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + commands = [RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) if rc: @@ -194,12 +247,12 @@ class CTRexStatelessClient(object): else: params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} - command = self.RpcCmdData("stop_traffic", params) + command = RpcCmdData("stop_traffic", params) self._handle_start_traffic_response(command, self.transmit(command.method, command.params)) return def get_global_stats(self): - command = self.RpcCmdData("get_global_stats", {}) + command = RpcCmdData("get_global_stats", {}) return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params)) # return self.transmit("get_global_stats") @@ -210,7 +263,7 @@ class CTRexStatelessClient(object): if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode port_ids = set(port_id) # convert to set to avoid duplications - commands = [self.RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + commands = [RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) if rc: @@ -218,7 +271,7 @@ class CTRexStatelessClient(object): else: params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} - command = self.RpcCmdData("get_port_stats", params) + command = RpcCmdData("get_port_stats", params) return self._handle_get_port_stats_response(command, self.transmit(command.method, command.params)) @force_status(owned=True, active_and_owned=True) @@ -228,7 +281,7 @@ class CTRexStatelessClient(object): if isinstance(port_id, list) or isinstance(port_id, set): # handle as batch mode port_ids = set(port_id) # convert to set to avoid duplications - commands = [self.RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) + commands = [RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) for p_id in port_ids] rc, resp_list = self.transmit_batch(commands) if rc: @@ -236,10 +289,18 @@ class CTRexStatelessClient(object): else: params = {"handler": self._conn_handler.get(port_id), "port_id": port_id} - command = self.RpcCmdData("get_stream_stats", params) + command = RpcCmdData("get_stream_stats", params) return self._handle_get_stream_stats_response(command, self.transmit(command.method, command.params)) # ----- internal methods ----- # + def _init_sync(self): + # get server version and system info + err = False + if self.server_version == "Unknown" or self.system_info == "Unknown": + self.disconnect() + return False, self.__err_log + return True, "" + def transmit(self, method_name, params={}): return self.comm_link.transmit(method_name, params) @@ -265,16 +326,46 @@ class CTRexStatelessClient(object): else: return False + @staticmethod + def ack_success_test(result_obj): + if result_obj.success and result_obj.data == "ACK": + return True + else: + return False + + # ----- handler internal methods ----- # - def _handle_acquire_response(self, request, response): - if response.success: - self._conn_handler[request.get("port_id")] = response.data + def _handle_general_response(self, request, response, msg, success_test=None): + port_id = request.params.get("port_id") + if not success_test: + success_test = self.default_success_test + if success_test(response): + self._conn_handler[port_id] = response.data + return RpcResponseStatus(True, port_id, msg) + else: + return RpcResponseStatus(False, port_id, response.data) + - def _handle_release_response(self, request, response): + def _handle_acquire_response(self, request, response, success_test): + port_id = request.params.get("port_id") + if success_test(response): + self._conn_handler[port_id] = response.data + return RpcResponseStatus(True, port_id, "Acquired") + else: + return RpcResponseStatus(False, port_id, response.data) + + def _handle_remove_streams_response(self, request, response, success_test): + port_id = request.params.get("port_id") + if success_test(response): + return RpcResponseStatus(True, port_id, "Removed") + else: + return RpcResponseStatus(False, port_id, response.data) + + def _handle_release_response(self, request, response, success_test): if response.success: del self._conn_handler[request.get("port_id")] - def _handle_start_traffic_response(self, request, response): + def _handle_start_traffic_response(self, request, response, success_test): if response.success: self._active_ports.add(request.get("port_id")) @@ -282,19 +373,19 @@ class CTRexStatelessClient(object): if response.success: self._active_ports.remove(request.get("port_id")) - def _handle_get_global_stats_response(self, request, response): + def _handle_get_global_stats_response(self, request, response, success_test): if response.success: return CGlobalStats(**response.success) else: return False - def _handle_get_port_stats_response(self, request, response): + def _handle_get_port_stats_response(self, request, response, success_test): if response.success: return CPortStats(**response.success) else: return False - def _handle_get_stream_stats_response(self, request, response): + def _handle_get_stream_stats_response(self, request, response, success_test): if response.success: return CStreamStats(**response.success) else: @@ -311,13 +402,21 @@ class CTRexStatelessClient(object): return False def _process_batch_result(self, req_list, resp_list, handler_func=None, success_test=default_success_test): + res_ok = True + responses = [] + if isinstance(success_test, staticmethod): + success_test = success_test.__func__ for i, response in enumerate(resp_list): - # testing each result with success test so that a conclusion report could be deployed in future. - if success_test(response): - # run handler method with its params - handler_func(req_list[i], response) - else: - continue # TODO: mark in this case somehow the bad result + # run handler method with its params + processed_response = handler_func(req_list[i], response, success_test) + responses.append(processed_response) + if not processed_response.success: + res_ok = False + # else: + # res_ok = False # TODO: mark in this case somehow the bad result + # print res_ok + # print responses + return res_ok, responses # ------ private classes ------ # @@ -332,7 +431,11 @@ class CTRexStatelessClient(object): def connect(self): if not self.virtual: - self.rpc_link.connect() + return self.rpc_link.connect() + + def disconnect(self): + if not self.virtual: + self.rpc_link.disconnect() def transmit(self, method_name, params={}): if self.virtual: -- cgit 1.2.3-korg