diff options
4 files changed, 521 insertions, 664 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 168853b3..5a7b1873 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -24,148 +24,331 @@ class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'] msg=self.msg, stat="success" if self.success else "fail") -# RpcResponseStatus = namedtuple('RpcResponseStatus', ['success', 'id', 'msg']) +# simple class to represent complex return value +class RC: + + def __init__ (self, rc, data): + self.rc = rc + self.data = data + + def good (self): + return self.rc + + def bad (self): + return not self.rc + + def data (self): + if self.good(): + return self.data + else: + return "" + + def err (self): + if self.bad(): + return self.data + else: + return "" + +RC_OK = RC(True, "") +def RC_ERR (err): + return RC(False, err) + +class RC_LIST: + def __init__ (self): + self.rc_list = [] + + def add (self, rc): + self.rc_list.append(rc) + + def good(self): + return all([x.good() for x in self.rc_list]) + + def bad (self): + not self.good() + + def data (self): + return [x.data() for x in self.rc_list] + + def err (self): + return [x.err() for x in self.rc_list] + + +# describes a single port +class Port: + + STATE_DOWN = 0 + STATE_IDLE = 1 + STATE_STREAMS = 2 + STATE_TX = 3 + STATE_PAUSE = 4 + + def __init__ (self, port_id, user, transmit): + self.port_id = port_id + self.state = self.STATE_IDLE + self.handler = None + self.transmit = transmit + self.user = user + + self.streams = {} + + def err (self, msg): + return RC_ERR("port {0} : {1}".format(self.port_id, msg)) + + # take the port + def acquire (self, force = False): + params = {"port_id": self.port_id, + "user": self.user, + "force": force} + + command = RpcCmdData("acquire", params) + rc = self.transmit(command.method, command.params) + if rc.success: + self.handler = rc.data + return RC_OK + else: + return RC_ERR(rc.data) + + + # release the port + def release (self): + params = {"port_id": self.port_id, + "handler": self.handler} + + command = RpcCmdData("release", params) + rc = self.transmit(command.method, command.params) + if rc.success: + self.handler = rc.data + return RC_OK + else: + return RC_ERR(rc.data) + + def is_acquired (self): + return (self.handler != None) + + def is_active (self): + return (self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE) + + def sync (self, sync_data): + + self.handler = sync_data['handler'] + + if sync_data['state'] == "DOWN": + self.state = self.STATE_DOWN + elif sync_data['state'] == "IDLE": + self.state = self.STATE_IDLE + elif sync_data['state'] == "STREAMS": + self.state = self.STATE_STREAMS + elif sync_data['state'] == "TX": + self.state = self.STATE_TX + elif sync_data['state'] == "PAUSE": + self.state = self.STATE_PAUSE + else: + raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, sync_data['state'])) + + return RC_OK + + + # return TRUE if write commands + def is_port_writeable (self): + # operations on port can be done on state idle or state sreams + return ((self.state == STATE_IDLE) or (self.state == STATE_STREAMS)) + + # add stream to the port + def add_stream (self, stream_id, stream_obj): + + if not self.is_port_writeable(): + return self.err("Please stop port before attempting to add streams") + + + params = {"handler": self.handler, + "port_id": self.port_id, + "stream_id": stream_id, + "stream": stream_obj.dump()} + + rc, data = self.transmit("add_stream", params) + if not rc: + return self.err(data) + + # add the stream + self.streams[stream_id] = stream_obj + + # the only valid state now + self.state = self.STATE_STREAMS + + return RC_OK + + # remove stream from port + def remove_stream (self, stream_id): + + if not stream_id in self.streams: + return self.err("stream {0} does not exists".format(stream_id)) + + params = {"handler": self.handler, + "port_id": self.port_id, + "stream_id": stream_id} + + + rc, data = self.transmit("remove_stream", params) + if not rc: + return self.err(data) + + self.streams[stream_id] = None + + return RC_OK + + # remove all the streams + def remove_all_streams (self): + for stream_id in self.streams.keys(): + rc = self.remove_stream(stream_id) + if rc.bad(): + return rc + + return RC_OK + + # start traffic + def start (self, mul): + if self.state == self.STATE_DOWN: + return self.err("Unable to start traffic - port is down") + + if self.state == self.STATE_IDLE: + return self.err("Unable to start traffic - no streams attached to port") + + if self.state == self.STATE_TX: + return self.err("Unable to start traffic - port is already transmitting") + + params = {"handler": self.handler, + "port_id": self.port_id, + "mul": mul} + + rc, data = self.transmit("remove_stream", params) + if not rc: + return self.err(data) + + self.state = self.STATE_TX + + return RC_OK + + def stop (self): + if (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE): + return self.err("Unable to stop traffic - port is down") + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc, data = self.transmit("stop_traffic", params) + if not rc: + return self.err(data) + + # only valid state after stop + self.state = self.STREAMS + + return RC_OK + class CTRexStatelessClient(object): """docstring for CTRexStatelessClient""" - def __init__(self, username, server="localhost", sync_port=5050, async_port = 4500, virtual=False): + def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, virtual=False): super(CTRexStatelessClient, self).__init__() self.user = username self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual) self.verbose = False self._conn_handler = {} self._active_ports = set() - self._stats = CTRexStatsManager("port", "stream") self._system_info = None self._server_version = None self.__err_log = None self._async_client = CTRexAsyncClient(async_port) + self.connected = False - # ----- decorator methods ----- # - def acquired(func): - def wrapper_f(self, *args, **kwargs): - # print func.__name__ - # print args - # print kwargs - port_ids = kwargs.get("port_id") - # if not port_ids: - # # print "FROM ARGS!" - # # print args - # port_ids = args[0] - if isinstance(port_ids, int): - # make sure port_ids is a list - port_ids = [port_ids] - bad_ids = set() - # print "=============" - # print port_ids - for port_id in port_ids: - port_owned = self._conn_handler.get(port_id) - if not port_owned: - bad_ids.add(port_id) - # elif active_and_owned: # stronger condition than just owned, hence gets precedence - # if port_owned and port_id in self._active_ports: - # continue - # else: - # bad_ids.add(port_id) - else: - continue - if bad_ids: - # Some port IDs are not according to desires status - raise ValueError("The requested method ('{0}') cannot be invoked since port IDs {1} aren't " - "acquired".format(func.__name__, list(bad_ids))) - else: - return func(self, *args, **kwargs) - return wrapper_f - - def force_status(owned=True, active_and_owned=False): - def wrapper(func): - def wrapper_f(self, *args, **kwargs): - # print args - # print kwargs - port_ids = kwargs.get("port_id") - if not port_ids: - #print "FROM ARGS!" - #print args - port_ids = args[0] - if isinstance(port_ids, int): - # make sure port_ids is a list - port_ids = [port_ids] - bad_ids = set() - # print "=============" - # print port_ids - for port_id in port_ids: - port_owned = self._conn_handler.get(port_id) - if owned and not port_owned: - bad_ids.add(port_id) - elif active_and_owned: # stronger condition than just owned, hence gets precedence - if port_owned and port_id in self._active_ports: - continue - else: - bad_ids.add(port_id) - else: - continue - if bad_ids: - # Some port IDs are not according to desires status - raise ValueError("The requested method ('{0}') cannot be invoked since port IDs {1} are not " - "at allowed states".format(func.__name__, list(bad_ids))) - else: - return func(self, *args, **kwargs) - return wrapper_f - return wrapper - - @property - def system_info(self): - if not self._system_info: - rc, info = self.get_system_info() - if rc: - self._system_info = info - else: - self.__err_log = info - return self._system_info if self._system_info else "Unknown" + ############# helper functions section ############## - @property - def server_version(self): - if not self._server_version: - rc, ver_info = self.get_version() - if rc: - self._server_version = ver_info - else: - self.__err_log = ver_info - return self._server_version if self._server_version else "Unknown" + def __validate_port_list(self, port_id): + if isinstance(port_id, list) or isinstance(port_id, set): + # check each item of the sequence + return all([self._is_ports_valid(port) + for port in port_id]) + elif (isinstance(port_id, int)) and (port_id >= 0) and (port_id <= self.get_port_count()): + return True + else: + return False + + def __ports (self, port_id_list): + if port_id_list == None: + return range(0, self.get_port_count()) + + for port_id in port_id_list: + if not isinstance(port_id, int) or (port_id < 0) or (port_id > self.get_port_count()): + raise ValueError("bad port id {0}".format(port_id)) + + return [port_id_list] if isinstance(port_id_list, int) else port_id_list + + ############ boot up section ################ - def is_connected(self): - return self.comm_link.is_connected + # connection sequence + def connect (self): - # ----- user-access methods ----- # - def connect(self): - rc, err = self.comm_link.connect() + self.connected = False + + # connect + rc, data = self.comm_link.connect() if not rc: - return rc, err - return self._init_sync() + return RC_ERR(data) - def get_stats_async (self): - return self._async_client.get_stats() - def get_connection_port (self): - return self.comm_link.port + # cache system info + rc, data = self.transmit("get_system_info") + if not rc: + return RC_ERR(data) + + self.system_info = data + + # cache supported cmds + rc, data = self.transmit("get_supported_cmds") + if not rc: + return RC_ERR(data) + + self.supported_cmds = data + + # create ports + self.ports = [] + for port_id in xrange(0, self.get_port_count()): + self.ports.append(Port(port_id, self.user, self.transmit)) + + # acquire all ports + rc = self.acquire() + if rc.bad(): + return rc + + rc = self.sync_with_server() + if rc.bad(): + return rc + + self.connected = True + + return RC_OK + + def is_connected (self): + return self.connected + def disconnect(self): - return self.comm_link.disconnect() + self.connected = False + self.comm_link.disconnect() - def ping(self): - return self.transmit("ping") + + ########### cached queries (no server traffic) ########### def get_supported_cmds(self): - return self.transmit("get_supported_cmds") + return self.supported_cmds def get_version(self): - return self.transmit("get_version") + return self.server_version def get_system_info(self): - return self.transmit("get_system_info") + return self.system_info def get_port_count(self): return self.system_info.get("port_count") @@ -177,205 +360,167 @@ class CTRexStatelessClient(object): else: return port_ids - def sync_user(self, sync_streams=False): - return self.transmit("sync_user", {"user": self.user, "sync_streams": sync_streams}) + def get_stats_async (self): + return self._async_client.get_stats() + + def get_connection_port (self): + return self.comm_link.port def get_acquired_ports(self): - return self._conn_handler.keys() + return [port for port in self.ports if port.is_acquired()] + def get_active_ports(self): - return list(self._active_ports) + return [port for port in self.ports if port.is_active()] def set_verbose(self, mode): self.comm_link.set_verbose(mode) self.verbose = mode - def acquire(self, port_id, force=False): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - if isinstance(port_id, list) or isinstance(port_id, set): - # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications - commands = [RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force}) - for p_id in port_ids] - rc, resp_list = self.transmit_batch(commands) - if rc: - return self._process_batch_result(commands, resp_list, self._handle_acquire_response) - else: - params = {"port_id": port_id, - "user": self.user, - "force": force} - command = RpcCmdData("acquire", params) - return self._handle_acquire_response(command, - self.transmit(command.method, command.params), - self.default_success_test) - - @force_status(owned=True) - def release(self, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - if isinstance(port_id, list) or isinstance(port_id, set): - # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications - commands = [RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) - for p_id in port_ids] - rc, resp_list = self.transmit_batch(commands) - if rc: - return self._process_batch_result(commands, resp_list, self._handle_release_response, - success_test=self.ack_success_test) - else: - self._conn_handler.pop(port_id) - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} - command = RpcCmdData("release", params) - return self._handle_release_response(command, - self.transmit(command.method, command.params), - self.ack_success_test) + ############# server actions ################ - @acquired - def add_stream(self, stream_id, stream_obj, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") + # ping server + def ping(self): + rc, info = self.transmit("ping") + return RC(rc, info) + + + def sync_with_server(self, sync_streams=False): + rc, data = self.transmit("sync_user", {"user": self.user, "sync_streams": sync_streams}) + if not rc: + return RC_ERR(data) + + for port_info in data: + + rc = self.ports[port_info['port_id']].sync(port_info) + if rc.bad(): + return rc + + return RC_OK + + + + ########## port commands ############## + # acquire ports, if port_list is none - get all + def acquire (self, port_id_list = None, force = False): + port_id_list = self.__ports(port_id_list) + + rc_list = RC_LIST() + + for port_id in port_id_list: + rc = self.ports[port_id].acquire(force) + rc_list.add(rc) + + return rc_list + + # release ports + def release (self, port_id_list = None): + port_id_list = self.__ports(port_id_list) + + rc_list = RC_LIST() + + for port_id in port_id_list: + rc, msg = self.ports[port_id].release(force) + rc_list.add(rc) + + return rc_list + + + def add_stream(self, stream_id, stream_obj, port_id_list = None): assert isinstance(stream_obj, CStream) - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id, - "stream_id": stream_id, - "stream": stream_obj.dump()} - return self.transmit("add_stream", params) - @acquired - def add_stream_pack(self, stream_pack_list, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") + port_id_list = self.__ports(port_id_list) + + rc_list = RC_LIST() + + for port_id in port_id_list: + rc = self.ports[port_id].add_stream(stream_id, stream_obj) + rc_list.add(rc) + + return rc_list + + + def add_stream_pack(self, stream_pack_list, port_id_list = None): + + port_id_list = self.__ports(port_id_list) + + rc_list = RC_LIST() - # since almost every run contains more than one transaction with server, handle all as batch mode - port_ids = set(port_id) # convert to set to avoid duplications - commands = [] for stream_pack in stream_pack_list: - commands.extend([RpcCmdData("add_stream", {"port_id": p_id, - "handler": self._conn_handler.get(p_id), - "stream_id": stream_pack.stream_id, - "stream": stream_pack.stream} - ) - for p_id in port_ids] - ) - res_ok, resp_list = self.transmit_batch(commands) - if not res_ok: - return res_ok, resp_list - - return self._process_batch_result(commands, resp_list, self._handle_add_stream_response, - success_test=self.ack_success_test) - - @force_status(owned=True) - def remove_stream(self, stream_id, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id, - "stream_id": stream_id} - return self.transmit("remove_stream", params) + rc = self.add_stream(stream_pack.stream_id, stream_pack.stream, port_id_list) + rc_list.add(rc) + + return rc_list - def remove_all_streams(self, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - if isinstance(port_id, list) or isinstance(port_id, set): - # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications - commands = [RpcCmdData("remove_all_streams", {"port_id": p_id, "handler": self._conn_handler.get(p_id)}) - for p_id in port_ids] - rc, resp_list = self.transmit_batch(commands) - if rc: - return self._process_batch_result(commands, resp_list, self._handle_remove_streams_response, - success_test=self.ack_success_test) - else: - params = {"port_id": port_id, - "handler": self._conn_handler.get(port_id)} - command = RpcCmdData("remove_all_streams", params) - return self._handle_remove_streams_response(command, - self.transmit(command.method, command.params), - self.ack_success_test) - pass - @force_status(owned=True)#, active_and_owned=True) - def get_all_streams(self, port_id, get_pkt = False): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id, - "get_pkt": get_pkt} - return self.transmit("get_all_streams", params) + def remove_stream(self, stream_id, port_id_list = None): + port_id_list = self.__ports(port_id_list) + + rc_list = RC_LIST() + + for port_id in port_id_list: + rc = self.ports[port_id].remove_stream(stream_id) + rc_list.add(rc) + + return rc_list + - @force_status(owned=True)#, active_and_owned=True) - def get_stream_id_list(self, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} - return self.transmit("get_stream_list", params) - @force_status(owned=True, active_and_owned=True) + def remove_all_streams(self, port_id_list = None): + port_id_list = self.__ports(port_id_list) + + rc_list = RC_LIST() + + for port_id in port_id_list: + rc = self.ports[port_id].remove_all_streams() + rc_list.add(rc) + + return rc_list + + def get_stream(self, stream_id, port_id, get_pkt = False): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id, - "stream_id": stream_id, - "get_pkt": get_pkt} - return self.transmit("get_stream_list", params) - def start_traffic(self, multiplier, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - if isinstance(port_id, list) or isinstance(port_id, set): - # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications - commands = [RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), - "port_id": p_id, - "mul": multiplier}) - for p_id in port_ids] - rc, resp_list = self.transmit_batch(commands) - if rc: - return self._process_batch_result(commands, resp_list, self._handle_start_traffic_response, - success_test=self.ack_success_test) - else: - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id, - "mul": multiplier} - command = RpcCmdData("start_traffic", params) - return self._handle_start_traffic_response(command, - self.transmit(command.method, command.params), - self.ack_success_test) - - def stop_traffic(self, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - if isinstance(port_id, list) or isinstance(port_id, set): - # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications - if not port_ids: - # don't invoke if port ids is empty - return True, [] - commands = [RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) - for p_id in port_ids] - rc, resp_list = self.transmit_batch(commands) - if rc: - return self._process_batch_result(commands, resp_list, self._handle_stop_traffic_response, - success_test=self.ack_success_test) - else: - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} - command = RpcCmdData("stop_traffic", params) - return self._handle_stop_traffic_response(command, - self.transmit(command.method, command.params), - self.ack_success_test) + return self.ports[port_id].get_stream(stream_id) + + + def get_all_streams(self, port_id, get_pkt = False): + + return self.ports[port_id].get_all_streams() + + + def get_stream_id_list(self, port_id): + + return self.ports[port_id].get_stream_id_list() + + + def start_traffic (self, multiplier, port_id_list = None): + + port_id_list = self.__ports(port_id_list) + + rc_list = RC_LIST() + + for port_id in port_id_list: + rc = self.ports[port_id].start(multiplier) + rc_list.add(rc) + + return rc_list + + + + def stop_traffic (self, port_id_list = None): + + port_id_list = self.__ports(port_id_list) + + rc_list = RC_LIST() + + for port_id in port_id_list: + rc = self.ports[port_id].stop() + rc_list.add(rc) + + return rc_list -# def get_global_stats(self): -# command = RpcCmdData("get_global_stats", {}) -# return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params)) -# # return self.transmit("get_global_stats") - @force_status(owned=True, active_and_owned=True) def get_port_stats(self, port_id=None): if not self._is_ports_valid(port_id): raise ValueError("Provided illegal port id input") @@ -393,7 +538,6 @@ class CTRexStatelessClient(object): command = RpcCmdData("get_port_stats", params) return self._handle_get_port_stats_response(command, self.transmit(command.method, command.params)) - @force_status(owned=True, active_and_owned=True) def get_stream_stats(self, port_id=None): if not self._is_ports_valid(port_id): raise ValueError("Provided illegal port id input") @@ -411,26 +555,6 @@ class CTRexStatelessClient(object): command = RpcCmdData("get_stream_stats", params) return self._handle_get_stream_stats_response(command, self.transmit(command.method, command.params)) - # ----- internal methods ----- # - def _init_sync(self): - # get server version and system info - err = False - if self.server_version == "Unknown" or self.system_info == "Unknown": - self.disconnect() - return False, self.__err_log - # sync with previous session - res_ok, port_info = self.sync_user() - if not res_ok: - self.disconnect() - return False, port_info - else: - # handle sync data - for port in port_info: - self._conn_handler[port.get("port_id")] = port.get("handler") - if port.get("state") == "transmitting": - # port is active - self._active_ports.add(port.get("port_id")) - return True, "" def transmit(self, method_name, params={}): @@ -439,17 +563,6 @@ class CTRexStatelessClient(object): def transmit_batch(self, batch_list): return self.comm_link.transmit_batch(batch_list) - @staticmethod - def _object_decoder(obj_type, obj_data): - if obj_type == "global": - return CGlobalStats(**obj_data) - elif obj_type == "port": - return CPortStats(**obj_data) - elif obj_type == "stream": - return CStreamStats(**obj_data) - else: - # Do not serialize the data into class - return obj_data @staticmethod def default_success_test(result_obj): @@ -474,35 +587,37 @@ class CTRexStatelessClient(object): # def cmd_reset (self, annotate_func): - ports = self.get_port_ids() # sync with the server - rc, log = self._init_sync() - annotate_func("Syncing with the server:", rc, log) - if not rc: - return False - + rc = self.sync_with_server() + annotate_func("Syncing with the server:", rc.good(), rc.err()) + if rc.bad(): + return rc # force acquire all ports - rc, log = self.acquire(ports, force = True) - annotate_func("Force acquiring all ports:", rc, log) - if not rc: - return False + rc = self.acquire(force = True) + annotate_func("Force acquiring all ports:", rc.good(), rc.err()) + if rc.bad(): + return rc - # force stop - rc, log = self.stop_traffic(ports) - annotate_func("Stop traffic on all ports:", rc, log) - if not rc: - return False + + # force stop all ports + port_id_list = self.get_active_ports() + rc = self.stop_traffic(port_id_list) + annotate_func("Stop traffic on all ports:", rc.good(), rc.err()) + if rc.bad(): + return rc + + return # remove all streams - rc, log = self.remove_all_streams(ports) - annotate_func("Removing all streams from all ports:", rc, log) - if not rc: - return False + rc = self.remove_all_streams(ports) + annotate_func("Removing all streams from all ports:", rc.good(), rc.err()) + if rc.bad(): + return rc # TODO: clear stats - return True + return RC_OK # stop cmd @@ -511,7 +626,7 @@ class CTRexStatelessClient(object): # find the relveant ports active_ports = set(self.get_active_ports()).intersection(ports) if not active_ports: - annotate_func("No active traffic on porivded ports") + annotate_func("No active traffic on porvided ports") return True rc, log = self.stop_traffic(active_ports) @@ -524,7 +639,7 @@ class CTRexStatelessClient(object): # start cmd def cmd_start (self, ports, stream_list, mult, force, annotate_func): - if force: + if (force and set(self.get_active_ports()).intersection(ports)): rc = self.cmd_stop(ports, annotate_func) if not rc: return False @@ -626,15 +741,6 @@ class CTRexStatelessClient(object): else: return False - def _is_ports_valid(self, port_id): - if isinstance(port_id, list) or isinstance(port_id, set): - # check each item of the sequence - return all([self._is_ports_valid(port) - for port in port_id]) - elif (isinstance(port_id, int)) and (port_id >= 0) and (port_id <= self.get_port_count()): - return True - else: - return False def _process_batch_result(self, req_list, resp_list, handler_func=None, success_test=default_success_test): res_ok = True diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py index 58491aba..077c82ad 100755 --- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py +++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py @@ -110,7 +110,7 @@ class JsonRpcClient(object): return id, msg - def invoke_rpc_method (self, method_name, params = {}, block = False): + def invoke_rpc_method (self, method_name, params = {}, block = True): if not self.connected: return False, "Not connected to server" @@ -120,7 +120,7 @@ class JsonRpcClient(object): # low level send of string message - def send_raw_msg (self, msg, block = False): + def send_raw_msg (self, msg, block = True): self.verbose_msg("Sending Request To Server:\n\n" + self.pretty_json(msg) + "\n") if block: @@ -248,272 +248,3 @@ class JsonRpcClient(object): if hasattr(self, "context"): self.context.destroy(linger=0) -# MOVE THIS TO DAN'S FILE -class TrexStatelessClient(JsonRpcClient): - - def __init__ (self, server, port, user): - - super(TrexStatelessClient, self).__init__(server, port) - - self.user = user - self.port_handlers = {} - - self.supported_cmds = [] - self.system_info = None - self.server_version = None - - - def whoami (self): - return self.user - - def ping_rpc_server(self): - - return self.invoke_rpc_method("ping", block = False) - - def get_rpc_server_version (self): - return self.server_version - - def get_system_info (self): - if not self.system_info: - return {} - - return self.system_info - - def get_supported_cmds(self): - if not self.supported_cmds: - return {} - - return self.supported_cmds - - def get_port_count (self): - if not self.system_info: - return 0 - - return self.system_info["port_count"] - - # sync the client with all the server required data - def sync (self): - - # get server version - rc, msg = self.invoke_rpc_method("get_version") - if not rc: - self.disconnect() - return rc, msg - - self.server_version = msg - - # get supported commands - rc, msg = self.invoke_rpc_method("get_supported_cmds") - if not rc: - self.disconnect() - return rc, msg - - self.supported_cmds = [str(x) for x in msg if x] - - # get system info - rc, msg = self.invoke_rpc_method("get_system_info") - if not rc: - self.disconnect() - return rc, msg - - self.system_info = msg - - return True, "" - - def connect (self): - rc, err = super(TrexStatelessClient, self).connect() - if not rc: - return rc, err - - return self.sync() - - - # take ownership over ports - def take_ownership (self, port_id_array, force = False): - if not self.connected: - return False, "Not connected to server" - - batch = self.create_batch() - - for port_id in port_id_array: - batch.add("acquire", params = {"port_id":port_id, "user":self.user, "force":force}) - - rc, resp_list = batch.invoke() - if not rc: - return rc, resp_list - - for i, rc in enumerate(resp_list): - if rc[0]: - self.port_handlers[port_id_array[i]] = rc[1] - - return True, resp_list - - - def release_ports (self, port_id_array): - batch = self.create_batch() - - for port_id in port_id_array: - - # let the server handle un-acquired errors - if self.port_handlers.get(port_id): - handler = self.port_handlers[port_id] - else: - handler = "" - - batch.add("release", params = {"port_id":port_id, "handler":handler}) - - - rc, resp_list = batch.invoke() - if not rc: - return rc, resp_list - - for i, rc in enumerate(resp_list): - if rc[0]: - self.port_handlers.pop(port_id_array[i]) - - return True, resp_list - - def get_owned_ports (self): - return self.port_handlers.keys() - - # fetch port stats - def get_port_stats (self, port_id_array): - if not self.connected: - return False, "Not connected to server" - - batch = self.create_batch() - - # empty list means all - if port_id_array == []: - port_id_array = list([x for x in xrange(0, self.system_info["port_count"])]) - - for port_id in port_id_array: - - # let the server handle un-acquired errors - if self.port_handlers.get(port_id): - handler = self.port_handlers[port_id] - else: - handler = "" - - batch.add("get_port_stats", params = {"port_id":port_id, "handler":handler}) - - - rc, resp_list = batch.invoke() - - return rc, resp_list - - # snapshot will take a snapshot of all your owned ports for streams and etc. - def snapshot(self): - - - if len(self.get_owned_ports()) == 0: - return {} - - snap = {} - - batch = self.create_batch() - - for port_id in self.get_owned_ports(): - - batch.add("get_port_stats", params = {"port_id": port_id, "handler": self.port_handlers[port_id]}) - batch.add("get_stream_list", params = {"port_id": port_id, "handler": self.port_handlers[port_id]}) - - rc, resp_list = batch.invoke() - if not rc: - return rc, resp_list - - # split the list to 2s - index = 0 - for port_id in self.get_owned_ports(): - if not resp_list[index] or not resp_list[index + 1]: - snap[port_id] = None - continue - - # fetch the first two - stats = resp_list[index][1] - stream_list = resp_list[index + 1][1] - - port = {} - port['status'] = stats['status'] - port['stream_list'] = [] - - # get all the streams - if len(stream_list) > 0: - batch = self.create_batch() - for stream_id in stream_list: - batch.add("get_stream", params = {"port_id": port_id, "stream_id": stream_id, "handler": self.port_handlers[port_id]}) - - rc, stream_resp_list = batch.invoke() - if not rc: - port = {} - - port['streams'] = {} - for i, resp in enumerate(stream_resp_list): - if resp[0]: - port['streams'][stream_list[i]] = resp[1] - - snap[port_id] = port - - # move to next one - index += 2 - - - return snap - - # add stream - # def add_stream (self, port_id, stream_id, isg, next_stream_id, packet, vm=[]): - # if not port_id in self.get_owned_ports(): - # return False, "Port {0} is not owned... please take ownership before adding streams".format(port_id) - # - # handler = self.port_handlers[port_id] - # - # stream = {} - # stream['enabled'] = True - # stream['self_start'] = True - # stream['isg'] = isg - # stream['next_stream_id'] = next_stream_id - # stream['packet'] = {} - # stream['packet']['binary'] = packet - # stream['packet']['meta'] = "" - # stream['vm'] = vm - # stream['rx_stats'] = {} - # stream['rx_stats']['enabled'] = False - # - # stream['mode'] = {} - # stream['mode']['type'] = 'continuous' - # stream['mode']['pps'] = 10.0 - # - # params = {} - # params['handler'] = handler - # params['stream'] = stream - # params['port_id'] = port_id - # params['stream_id'] = stream_id - # - # print params - # return self.invoke_rpc_method('add_stream', params = params) - - def add_stream(self, port_id_array, stream_pack_list): - batch = self.create_batch() - - for port_id in port_id_array: - for stream_pack in stream_pack_list: - params = {"port_id": port_id, - "handler": self.port_handlers[port_id], - "stream_id": stream_pack.stream_id, - "stream": stream_pack.stream} - batch.add("add_stream", params=params) - rc, resp_list = batch.invoke() - if not rc: - return rc, resp_list - - for i, rc in enumerate(resp_list): - if rc[0]: - print "Stream {0} - {1}".format(i, rc[1]) - # self.port_handlers[port_id_array[i]] = rc[1] - - return True, resp_list - - # return self.invoke_rpc_method('add_stream', params = params) - -if __name__ == "__main__": - pass
\ No newline at end of file diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py index 06ae762a..2be643ab 100755 --- a/scripts/automation/trex_control_plane/console/trex_console.py +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -160,7 +160,51 @@ class TRexConsole(cmd.Cmd): for x in os.listdir(path) if x.startswith(start_string)] + # annotation method + @staticmethod + def annotate (desc, rc = None, err_log = None, ext_err_msg = None): + print format_text('\n{:<40}'.format(desc), 'bold'), + if rc == None: + print "\n" + return + + if rc == False: + # do we have a complex log object ? + if isinstance(err_log, list): + print "" + for func in err_log: + if func: + print func + print "" + + elif isinstance(err_log, str): + print "\n" + err_log + "\n" + + print format_text("[FAILED]\n", 'red', 'bold') + if ext_err_msg: + print format_text(ext_err_msg + "\n", 'blue', 'bold') + + return False + + else: + print format_text("[SUCCESS]\n", 'green', 'bold') + return True + + ####################### shell commands ####################### + def do_ping (self, line): + '''Ping the server\n''' + + rc = self.stateless_client.ping() + if rc.good(): + print format_text("[SUCCESS]\n", 'green', 'bold') + else: + print "\n*** " + rc.err() + "\n" + print format_text("[FAILED]\n", 'red', 'bold') + return + + def do_test (self, line): + print self.stateless_client.get_acquired_ports() # set verbose on / off def do_verbose(self, line): @@ -171,65 +215,41 @@ class TRexConsole(cmd.Cmd): elif line == "on": self.verbose = True self.stateless_client.set_verbose(True) - print green("\nverbose set to on\n") + print format_text("\nverbose set to on\n", 'green', 'bold') elif line == "off": self.verbose = False self.stateless_client.set_verbose(False) - print green("\nverbose set to off\n") + print format_text("\nverbose set to off\n", 'green', 'bold') else: - print magenta("\nplease specify 'on' or 'off'\n") + print format_text("\nplease specify 'on' or 'off'\n", 'bold') + ############### connect def do_connect (self, line): '''Connects to the server\n''' - res_ok, msg = self.stateless_client.connect() - if res_ok: + rc = self.stateless_client.connect() + if rc.good(): print format_text("[SUCCESS]\n", 'green', 'bold') else: - print "\n*** " + msg + "\n" + print "\n*** " + rc.err() + "\n" print format_text("[FAILED]\n", 'red', 'bold') return - self.supported_rpc = self.stateless_client.get_supported_cmds().data - if self.acquire_all_ports: - res_ok, log = self.stateless_client.acquire(self.stateless_client.get_port_ids()) - if not res_ok: - print "\n*** Failed to acquire all ports... exiting...""" + def do_disconnect (self, line): + '''Disconnect from the server\n''' - @staticmethod - def annotate (desc, rc = None, err_log = None, ext_err_msg = None): - print format_text('\n{:<40}'.format(desc), 'bold'), - if rc == None: - print "\n" + if not self.stateless_client.is_connected(): + print "Not connected to server\n" return - if rc == False: - # do we have a complex log object ? - if isinstance(err_log, list): - print "" - for func in err_log: - if func: - print func - print "" - - elif isinstance(err_log, str): - print "\n" + err_log + "\n" - - print format_text("[FAILED]\n", 'red', 'bold') - if ext_err_msg: - print format_text(ext_err_msg + "\n", 'blue', 'bold') - - return False - - else: - print format_text("[SUCCESS]\n", 'green', 'bold') - return True - + self.stateless_client.disconnect() + print format_text("[SUCCESS]\n", 'green', 'bold') + ############### start def complete_start(self, text, line, begidx, endidx): diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index 907b9cf4..7f2382d3 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -186,22 +186,22 @@ TrexStatelessPort::get_state_as_string() const { switch (get_state()) { case PORT_STATE_DOWN: - return "down"; + return "DOWN"; case PORT_STATE_IDLE: - return "no streams"; + return "IDLE"; case PORT_STATE_STREAMS: - return "with streams, idle"; + return "STREAMS"; case PORT_STATE_TX: - return "transmitting"; + return "TX"; case PORT_STATE_PAUSE: - return "paused"; + return "PAUSE"; } - return "unknown"; + return "UNKNOWN"; } void |