diff options
author | imarom <imarom@cisco.com> | 2015-11-12 18:28:21 +0200 |
---|---|---|
committer | imarom <imarom@cisco.com> | 2015-11-12 18:28:21 +0200 |
commit | 513581840e5787e73161de049aa59552f23e719d (patch) | |
tree | ebb128f76c53e353c46b0af3f0a183280257a863 /scripts/automation | |
parent | 78c6593c5a2d3d2242be7fc659d15eac6b869358 (diff) |
modifying stateless client to a simpler lightweight module
Diffstat (limited to 'scripts/automation')
3 files changed, 144 insertions, 272 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 5a7b1873..93b36f82 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -12,6 +12,7 @@ import json from common.trex_stats import * from common.trex_streams import * from collections import namedtuple +from common.text_opts import * from trex_async_client import CTRexAsyncClient @@ -27,50 +28,48 @@ class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'] # simple class to represent complex return value class RC: - def __init__ (self, rc, data): - self.rc = rc - self.data = data + def __init__ (self, rc = None, data = None): + self.rc_list = [] + + if (rc != None) and (data != None): + tuple_rc = namedtuple('RC', ['rc', 'data']) + self.rc_list.append(tuple_rc(rc, data)) + + def add (self, rc): + self.rc_list += rc.rc_list def good (self): - return self.rc + return all([x.rc for x in self.rc_list]) def bad (self): - return not self.rc + return not self.good() def data (self): - if self.good(): - return self.data - else: - return "" + return all([x.data if x.rc else "" for x in self.rc_list]) def err (self): - if self.bad(): - return self.data - else: - return "" + return all([x.data if not x.rc else "" for x in self.rc_list]) -RC_OK = RC(True, "") -def RC_ERR (err): - return RC(False, err) + def annotate (self, desc): + print format_text('\n{:<40}'.format(desc), 'bold'), -class RC_LIST: - def __init__ (self): - self.rc_list = [] + if self.bad(): + # print all the errors + for x in self.rc_list: + if not x.rc: + print format_text("\n{0}".format(x.data), 'bold') - def add (self, rc): - self.rc_list.append(rc) + print format_text("[FAILED]\n", 'red', 'bold') - def good(self): - return all([x.good() for x in self.rc_list]) - def bad (self): - not self.good() + else: + print format_text("[SUCCESS]\n", 'green', 'bold') - def data (self): - return [x.data() for x in self.rc_list] - def err (self): - return [x.err() for x in self.rc_list] +def RC_OK(): + return RC(True, "") +def RC_ERR (err): + return RC(False, err) # describes a single port @@ -104,7 +103,7 @@ class Port: rc = self.transmit(command.method, command.params) if rc.success: self.handler = rc.data - return RC_OK + return RC_OK() else: return RC_ERR(rc.data) @@ -118,7 +117,7 @@ class Port: rc = self.transmit(command.method, command.params) if rc.success: self.handler = rc.data - return RC_OK + return RC_OK() else: return RC_ERR(rc.data) @@ -145,13 +144,13 @@ class Port: else: raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, sync_data['state'])) - return RC_OK + return RC_OK() # return TRUE if write commands def is_port_writeable (self): # operations on port can be done on state idle or state sreams - return ((self.state == STATE_IDLE) or (self.state == STATE_STREAMS)) + return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS)) # add stream to the port def add_stream (self, stream_id, stream_obj): @@ -163,11 +162,12 @@ class Port: params = {"handler": self.handler, "port_id": self.port_id, "stream_id": stream_id, - "stream": stream_obj.dump()} + "stream": stream_obj} rc, data = self.transmit("add_stream", params) if not rc: - return self.err(data) + r = self.err(data) + print r.good() # add the stream self.streams[stream_id] = stream_obj @@ -175,7 +175,7 @@ class Port: # the only valid state now self.state = self.STATE_STREAMS - return RC_OK + return RC_OK() # remove stream from port def remove_stream (self, stream_id): @@ -194,16 +194,21 @@ class Port: self.streams[stream_id] = None - return RC_OK + return RC_OK() # remove all the streams def remove_all_streams (self): - for stream_id in self.streams.keys(): - rc = self.remove_stream(stream_id) - if rc.bad(): - return rc - return RC_OK + params = {"handler": self.handler, + "port_id": self.port_id} + + rc, data = self.transmit("remove_all_streams", params) + if not rc: + return self.err(data) + + self.streams = {} + + return RC_OK() # start traffic def start (self, mul): @@ -220,17 +225,20 @@ class Port: "port_id": self.port_id, "mul": mul} - rc, data = self.transmit("remove_stream", params) + rc, data = self.transmit("start_traffic", params) if not rc: return self.err(data) self.state = self.STATE_TX - return RC_OK + return RC_OK() + + # stop traffic + # with force ignores the cached state and sends the command + def stop (self, force = False): - def stop (self): - if (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE): - return self.err("Unable to stop traffic - port is down") + if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE): + return self.err("port is not transmitting") params = {"handler": self.handler, "port_id": self.port_id} @@ -240,9 +248,10 @@ class Port: return self.err(data) # only valid state after stop - self.state = self.STREAMS + self.state = self.STATE_STREAMS + + return RC_OK() - return RC_OK class CTRexStatelessClient(object): @@ -265,7 +274,7 @@ class CTRexStatelessClient(object): ############# helper functions section ############## - def __validate_port_list(self, port_id): + def validate_port_list(self, port_id): if isinstance(port_id, list) or isinstance(port_id, set): # check each item of the sequence return all([self._is_ports_valid(port) @@ -275,15 +284,25 @@ class CTRexStatelessClient(object): else: return False + # some preprocessing for port argument def __ports (self, port_id_list): + + # none means all if port_id_list == None: return range(0, self.get_port_count()) + # always list + if isinstance(port_id_list, int): + port_id_list = [port_id_list] + + if not isinstance(port_id_list, list): + raise ValueError("bad port id list: {0}".format(port_id_list)) + for port_id in port_id_list: if not isinstance(port_id, int) or (port_id < 0) or (port_id > self.get_port_count()): raise ValueError("bad port id {0}".format(port_id)) - return [port_id_list] if isinstance(port_id_list, int) else port_id_list + return port_id_list ############ boot up section ################ @@ -297,7 +316,6 @@ class CTRexStatelessClient(object): if not rc: return RC_ERR(data) - # cache system info rc, data = self.transmit("get_system_info") if not rc: @@ -328,7 +346,7 @@ class CTRexStatelessClient(object): self.connected = True - return RC_OK + return RC_OK() def is_connected (self): return self.connected @@ -367,11 +385,11 @@ class CTRexStatelessClient(object): return self.comm_link.port def get_acquired_ports(self): - return [port for port in self.ports if port.is_acquired()] + return [port.port_id for port in self.ports if port.is_acquired()] def get_active_ports(self): - return [port for port in self.ports if port.is_active()] + return [port.port_id for port in self.ports if port.is_active()] def set_verbose(self, mode): self.comm_link.set_verbose(mode) @@ -396,7 +414,7 @@ class CTRexStatelessClient(object): if rc.bad(): return rc - return RC_OK + return RC_OK() @@ -405,78 +423,71 @@ class CTRexStatelessClient(object): def acquire (self, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) - rc_list = RC_LIST() + rc = RC() for port_id in port_id_list: - rc = self.ports[port_id].acquire(force) - rc_list.add(rc) - - return rc_list + rc.add(self.ports[port_id].acquire(force)) + + return rc # release ports def release (self, port_id_list = None): port_id_list = self.__ports(port_id_list) - rc_list = RC_LIST() + rc = RC() for port_id in port_id_list: - rc, msg = self.ports[port_id].release(force) - rc_list.add(rc) + rc.add(self.ports[port_id].release(force)) - return rc_list + return rc def add_stream(self, stream_id, stream_obj, port_id_list = None): - assert isinstance(stream_obj, CStream) port_id_list = self.__ports(port_id_list) - rc_list = RC_LIST() + rc = RC() for port_id in port_id_list: - rc = self.ports[port_id].add_stream(stream_id, stream_obj) - rc_list.add(rc) + rc.add(self.ports[port_id].add_stream(stream_id, stream_obj)) - return rc_list + return rc def add_stream_pack(self, stream_pack_list, port_id_list = None): port_id_list = self.__ports(port_id_list) - rc_list = RC_LIST() + rc = RC() for stream_pack in stream_pack_list: - rc = self.add_stream(stream_pack.stream_id, stream_pack.stream, port_id_list) - rc_list.add(rc) + rc.add(self.add_stream(stream_pack.stream_id, stream_pack.stream, port_id_list)) - return rc_list + return rc def remove_stream(self, stream_id, port_id_list = None): port_id_list = self.__ports(port_id_list) - rc_list = RC_LIST() + rc = RC() for port_id in port_id_list: - rc = self.ports[port_id].remove_stream(stream_id) - rc_list.add(rc) + rc.add(self.ports[port_id].remove_stream(stream_id)) - return rc_list + return rc def remove_all_streams(self, port_id_list = None): port_id_list = self.__ports(port_id_list) - rc_list = RC_LIST() + rc = RC() for port_id in port_id_list: - rc = self.ports[port_id].remove_all_streams() - rc_list.add(rc) + rc.add(self.ports[port_id].remove_all_streams()) - return rc_list + return rc def get_stream(self, stream_id, port_id, get_pkt = False): @@ -498,85 +509,36 @@ class CTRexStatelessClient(object): port_id_list = self.__ports(port_id_list) - rc_list = RC_LIST() + rc = RC() for port_id in port_id_list: - rc = self.ports[port_id].start(multiplier) - rc_list.add(rc) + rc.add(self.ports[port_id].start(multiplier)) - return rc_list + return rc - def stop_traffic (self, port_id_list = None): + def stop_traffic (self, port_id_list = None, force = False): port_id_list = self.__ports(port_id_list) - - rc_list = RC_LIST() + rc = RC() for port_id in port_id_list: - rc = self.ports[port_id].stop() - rc_list.add(rc) + rc.add(self.ports[port_id].stop(force)) - return rc_list + return rc def get_port_stats(self, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - if isinstance(port_id, list) or isinstance(port_id, set): - # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications - commands = [RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) - for p_id in port_ids] - rc, resp_list = self.transmit_batch(commands) - if rc: - self._process_batch_result(commands, resp_list, self._handle_get_port_stats_response) - else: - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} - command = RpcCmdData("get_port_stats", params) - return self._handle_get_port_stats_response(command, self.transmit(command.method, command.params)) + pass def get_stream_stats(self, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - if isinstance(port_id, list) or isinstance(port_id, set): - # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications - commands = [RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) - for p_id in port_ids] - rc, resp_list = self.transmit_batch(commands) - if rc: - self._process_batch_result(commands, resp_list, self._handle_get_stream_stats_response) - else: - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} - command = RpcCmdData("get_stream_stats", params) - return self._handle_get_stream_stats_response(command, self.transmit(command.method, command.params)) - + pass def transmit(self, method_name, params={}): return self.comm_link.transmit(method_name, params) - def transmit_batch(self, batch_list): - return self.comm_link.transmit_batch(batch_list) - - - @staticmethod - def default_success_test(result_obj): - if result_obj.success: - return True - else: - return False - - @staticmethod - def ack_success_test(result_obj): - if result_obj.success and result_obj.data == "ACK": - return True - else: - return False ######################### Console (high level) API ######################### @@ -585,181 +547,91 @@ class CTRexStatelessClient(object): # acquire, stop, remove streams and clear stats # # - def cmd_reset (self, annotate_func): + def cmd_reset (self): # sync with the server rc = self.sync_with_server() - annotate_func("Syncing with the server:", rc.good(), rc.err()) + rc.annotate("Syncing with the server:") if rc.bad(): return rc - # force acquire all ports rc = self.acquire(force = True) - annotate_func("Force acquiring all ports:", rc.good(), rc.err()) + rc.annotate("Force acquiring all ports:") if rc.bad(): return rc # force stop all ports - port_id_list = self.get_active_ports() - rc = self.stop_traffic(port_id_list) - annotate_func("Stop traffic on all ports:", rc.good(), rc.err()) + rc = self.stop_traffic(self.get_port_ids(), True) + rc.annotate("Stop traffic on all ports:") if rc.bad(): return rc - return # remove all streams - rc = self.remove_all_streams(ports) - annotate_func("Removing all streams from all ports:", rc.good(), rc.err()) + rc = self.remove_all_streams(self.get_port_ids()) + rc.annotate("Removing all streams from all ports:") if rc.bad(): return rc # TODO: clear stats - return RC_OK + return RC_OK() # stop cmd - def cmd_stop (self, ports, annotate_func): + def cmd_stop (self, port_id_list): # find the relveant ports - active_ports = set(self.get_active_ports()).intersection(ports) + active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) + if not active_ports: - annotate_func("No active traffic on porvided ports") + print format_text("No active traffic on porvided ports", 'bold') return True - rc, log = self.stop_traffic(active_ports) - annotate_func("Stopping traffic on ports {0}:".format([port for port in active_ports]), rc, log) - if not rc: + rc = self.stop_traffic(active_ports) + rc.annotate("Stopping traffic on port(s) {0}:".format(port_id_list)) + if rc.bad(): return False return True # start cmd - def cmd_start (self, ports, stream_list, mult, force, annotate_func): + def cmd_start (self, port_id_list, stream_list, mult, force): - if (force and set(self.get_active_ports()).intersection(ports)): - rc = self.cmd_stop(ports, annotate_func) - if not rc: - return False + active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) - rc, log = self.remove_all_streams(ports) - annotate_func("Removing all streams from ports {0}:".format([port for port in ports]), rc, log, - "Please either retry with --force or stop traffic") - if not rc: - return False + if active_ports: + if not force: + print format_text("Port(s) {0} are active - please stop them or add '--force'".format(active_ports), 'bold') + return False + else: + rc = self.cmd_stop(active_ports) + if not rc: + return False - rc, log = self.add_stream_pack(stream_list.compiled, port_id= ports) - annotate_func("Attaching streams to port {0}:".format([port for port in ports]), rc, log) - if not rc: - return False - # finally, start the traffic - rc, log = self.start_traffic(mult, ports) - annotate_func("Starting traffic on ports {0}:".format([port for port in ports]), rc, log) - if not rc: + rc = self.remove_all_streams(port_id_list) + rc.annotate("Removing all streams from ports {0}:".format(port_id_list)) + if rc.bad(): return False - return True - - # ----- handler internal methods ----- # - def _handle_general_response(self, request, response, msg, success_test=None): - port_id = request.params.get("port_id") - if not success_test: - success_test = self.default_success_test - if success_test(response): - self._conn_handler[port_id] = response.data - return RpcResponseStatus(True, port_id, msg) - else: - return RpcResponseStatus(False, port_id, response.data) - - def _handle_acquire_response(self, request, response, success_test): - port_id = request.params.get("port_id") - if success_test(response): - self._conn_handler[port_id] = response.data - return RpcResponseStatus(True, port_id, "Acquired") - else: - return RpcResponseStatus(False, port_id, response.data) - - def _handle_add_stream_response(self, request, response, success_test): - port_id = request.params.get("port_id") - stream_id = request.params.get("stream_id") - if success_test(response): - return RpcResponseStatus(True, port_id, "Stream {0} added".format(stream_id)) - else: - return RpcResponseStatus(False, port_id, response.data) - - def _handle_remove_streams_response(self, request, response, success_test): - port_id = request.params.get("port_id") - if success_test(response): - return RpcResponseStatus(True, port_id, "Removed") - else: - return RpcResponseStatus(False, port_id, response.data) - - def _handle_release_response(self, request, response, success_test): - port_id = request.params.get("port_id") - if success_test(response): - del self._conn_handler[port_id] - return RpcResponseStatus(True, port_id, "Released") - else: - return RpcResponseStatus(False, port_id, response.data) - - def _handle_start_traffic_response(self, request, response, success_test): - port_id = request.params.get("port_id") - if success_test(response): - self._active_ports.add(port_id) - return RpcResponseStatus(True, port_id, "Traffic started") - else: - return RpcResponseStatus(False, port_id, response.data) - - def _handle_stop_traffic_response(self, request, response, success_test): - port_id = request.params.get("port_id") - if success_test(response): - if port_id in self._active_ports: - self._active_ports.remove(port_id) - return RpcResponseStatus(True, port_id, "Traffic stopped") - else: - return RpcResponseStatus(False, port_id, response.data) - - def _handle_get_global_stats_response(self, request, response, success_test): - if response.success: - return CGlobalStats(**response.success) - else: + rc = self.add_stream_pack(stream_list.compiled, port_id_list) + rc.annotate("Attaching streams to port {0}:".format(port_id_list)) + if rc.bad(): return False - def _handle_get_port_stats_response(self, request, response, success_test): - if response.success: - return CPortStats(**response.success) - else: - return False - def _handle_get_stream_stats_response(self, request, response, success_test): - if response.success: - return CStreamStats(**response.success) - else: + # finally, start the traffic + rc = self.start_traffic(mult, port_id_list) + rc.annotate("Starting traffic on ports {0}:".format(port_id_list)) + if rc.bad(): return False + return True - def _process_batch_result(self, req_list, resp_list, handler_func=None, success_test=default_success_test): - res_ok = True - responses = [] - if isinstance(success_test, staticmethod): - success_test = success_test.__func__ - for i, response in enumerate(resp_list): - # run handler method with its params - processed_response = handler_func(req_list[i], response, success_test) - responses.append(processed_response) - if not processed_response.success: - res_ok = False - # else: - # res_ok = False # TODO: mark in this case somehow the bad result - # print res_ok - # print responses - return res_ok, responses - - + # ------ private classes ------ # class CCommLink(object): """describes the connectivity of the stateless client method""" diff --git a/scripts/automation/trex_control_plane/console/parsing_opts.py b/scripts/automation/trex_control_plane/console/parsing_opts.py index f983d837..252d33bf 100755 --- a/scripts/automation/trex_control_plane/console/parsing_opts.py +++ b/scripts/automation/trex_control_plane/console/parsing_opts.py @@ -139,8 +139,8 @@ class CCmdArgParser(argparse.ArgumentParser): opts.ports = self.stateless_client.get_port_ids() for port in opts.ports: - if not self.stateless_client._is_ports_valid(port): - self.error("port id {0} is not a valid\n".format(port)) + if not self.stateless_client.validate_port_list(port): + self.error("port id {0} is not a valid port id\n".format(port)) return opts diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py index 2be643ab..5ba82dcb 100755 --- a/scripts/automation/trex_control_plane/console/trex_console.py +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -296,7 +296,7 @@ class TRexConsole(cmd.Cmd): return - self.stateless_client.cmd_start(opts.ports, stream_list, opts.mult, opts.force, self.annotate) + self.stateless_client.cmd_start(opts.ports, stream_list, opts.mult, opts.force) return @@ -315,7 +315,7 @@ class TRexConsole(cmd.Cmd): if opts is None: return - self.stateless_client.cmd_stop(opts.ports, self.annotate) + self.stateless_client.cmd_stop(opts.ports) return def help_stop(self): @@ -324,7 +324,7 @@ class TRexConsole(cmd.Cmd): ########## reset def do_reset (self, line): '''force stop all ports\n''' - self.stateless_client.cmd_reset(self.annotate) + self.stateless_client.cmd_reset() # tui |