diff options
5 files changed, 817 insertions, 949 deletions
diff --git a/scripts/automation/trex_control_plane/client/trex_stateless_client.py b/scripts/automation/trex_control_plane/client/trex_stateless_client.py index 168853b3..0df2ac5d 100755 --- a/scripts/automation/trex_control_plane/client/trex_stateless_client.py +++ b/scripts/automation/trex_control_plane/client/trex_stateless_client.py @@ -6,12 +6,16 @@ try: except ImportError: # support import for Python 3 import client.outer_packages + from client_utils.jsonrpc_client import JsonRpcClient, BatchMessage from client_utils.packet_builder import CTRexPktBuilder import json from common.trex_stats import * from common.trex_streams import * from collections import namedtuple +from common.text_opts import * +import parsing_opts +import time from trex_async_client import CTRexAsyncClient @@ -24,148 +28,417 @@ class RpcResponseStatus(namedtuple('RpcResponseStatus', ['success', 'id', 'msg'] msg=self.msg, stat="success" if self.success else "fail") -# RpcResponseStatus = namedtuple('RpcResponseStatus', ['success', 'id', 'msg']) +# simple class to represent complex return value +class RC: + + def __init__ (self, rc = None, data = None): + self.rc_list = [] + + if (rc != None) and (data != None): + tuple_rc = namedtuple('RC', ['rc', 'data']) + self.rc_list.append(tuple_rc(rc, data)) + + def add (self, rc): + self.rc_list += rc.rc_list + + def good (self): + return all([x.rc for x in self.rc_list]) + + def bad (self): + return not self.good() + + def data (self): + return all([x.data if x.rc else "" for x in self.rc_list]) + + def err (self): + return all([x.data if not x.rc else "" for x in self.rc_list]) + + def annotate (self, desc = None): + if desc: + print format_text('\n{:<40}'.format(desc), 'bold'), + + if self.bad(): + # print all the errors + print "" + for x in self.rc_list: + if not x.rc: + print format_text("\n{0}".format(x.data), 'bold') + + print "" + print format_text("[FAILED]\n", 'red', 'bold') + + + else: + print format_text("[SUCCESS]\n", 'green', 'bold') + + +def RC_OK(): + return RC(True, "") +def RC_ERR (err): + return RC(False, err) + + +LoadedStreamList = namedtuple('LoadedStreamList', ['loaded', 'compiled']) + +# describes a stream DB +class CStreamsDB(object): + + def __init__(self): + self.stream_packs = {} + + def load_yaml_file (self, filename): + + stream_pack_name = filename + if stream_pack_name in self.get_loaded_streams_names(): + self.remove_stream_packs(stream_pack_name) + + stream_list = CStreamList() + loaded_obj = stream_list.load_yaml(filename) + + try: + compiled_streams = stream_list.compile_streams() + rc = self.load_streams(stream_pack_name, + LoadedStreamList(loaded_obj, + [StreamPack(v.stream_id, v.stream.dump()) + for k, v in compiled_streams.items()])) + + except Exception as e: + return None + + return self.get_stream_pack(stream_pack_name) + + def load_streams(self, name, LoadedStreamList_obj): + if name in self.stream_packs: + return False + else: + self.stream_packs[name] = LoadedStreamList_obj + return True + + def remove_stream_packs(self, *names): + removed_streams = [] + for name in names: + removed = self.stream_packs.pop(name) + if removed: + removed_streams.append(name) + return removed_streams + + def clear(self): + self.stream_packs.clear() + + def get_loaded_streams_names(self): + return self.stream_packs.keys() + + def stream_pack_exists (self, name): + return name in self.get_loaded_streams_names() + + def get_stream_pack(self, name): + if not self.stream_pack_exists(name): + return None + else: + return self.stream_packs.get(name) + + +# describes a single port +class Port: + + STATE_DOWN = 0 + STATE_IDLE = 1 + STATE_STREAMS = 2 + STATE_TX = 3 + STATE_PAUSE = 4 + + def __init__ (self, port_id, user, transmit): + self.port_id = port_id + self.state = self.STATE_IDLE + self.handler = None + self.transmit = transmit + self.user = user + + self.streams = {} + + def err (self, msg): + return RC_ERR("port {0} : {1}".format(self.port_id, msg)) + + def ok (self): + return RC_OK() + + # take the port + def acquire (self, force = False): + params = {"port_id": self.port_id, + "user": self.user, + "force": force} + + command = RpcCmdData("acquire", params) + rc = self.transmit(command.method, command.params) + if rc.success: + self.handler = rc.data + return self.ok() + else: + return self.err(rc.data) + + + # release the port + def release (self): + params = {"port_id": self.port_id, + "handler": self.handler} + + command = RpcCmdData("release", params) + rc = self.transmit(command.method, command.params) + if rc.success: + self.handler = rc.data + return self.ok() + else: + return self.err(rc.data) + + def is_acquired (self): + return (self.handler != None) + + def is_active (self): + return (self.state == self.STATE_TX ) or (self.state == self.STATE_PAUSE) + + def sync (self, sync_data): + + self.handler = sync_data['handler'] + + if sync_data['state'] == "DOWN": + self.state = self.STATE_DOWN + elif sync_data['state'] == "IDLE": + self.state = self.STATE_IDLE + elif sync_data['state'] == "STREAMS": + self.state = self.STATE_STREAMS + elif sync_data['state'] == "TX": + self.state = self.STATE_TX + elif sync_data['state'] == "PAUSE": + self.state = self.STATE_PAUSE + else: + raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, sync_data['state'])) + + return self.ok() + + + # return TRUE if write commands + def is_port_writeable (self): + # operations on port can be done on state idle or state sreams + return ((self.state == self.STATE_IDLE) or (self.state == self.STATE_STREAMS)) + + # add stream to the port + def add_stream (self, stream_id, stream_obj): + + if not self.is_port_writeable(): + return self.err("Please stop port before attempting to add streams") + + + params = {"handler": self.handler, + "port_id": self.port_id, + "stream_id": stream_id, + "stream": stream_obj} + + rc, data = self.transmit("add_stream", params) + if not rc: + r = self.err(data) + print r.good() + + # add the stream + self.streams[stream_id] = stream_obj + + # the only valid state now + self.state = self.STATE_STREAMS + + return self.ok() + + # remove stream from port + def remove_stream (self, stream_id): + + if not stream_id in self.streams: + return self.err("stream {0} does not exists".format(stream_id)) + + params = {"handler": self.handler, + "port_id": self.port_id, + "stream_id": stream_id} + + + rc, data = self.transmit("remove_stream", params) + if not rc: + return self.err(data) + + self.streams[stream_id] = None + + return self.ok() + + # remove all the streams + def remove_all_streams (self): + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc, data = self.transmit("remove_all_streams", params) + if not rc: + return self.err(data) + + self.streams = {} + + return self.ok() + + # start traffic + def start (self, mul): + if self.state == self.STATE_DOWN: + return self.err("Unable to start traffic - port is down") + + if self.state == self.STATE_IDLE: + return self.err("Unable to start traffic - no streams attached to port") + + if self.state == self.STATE_TX: + return self.err("Unable to start traffic - port is already transmitting") + + params = {"handler": self.handler, + "port_id": self.port_id, + "mul": mul} + + rc, data = self.transmit("start_traffic", params) + if not rc: + return self.err(data) + + self.state = self.STATE_TX + + return self.ok() + + # stop traffic + # with force ignores the cached state and sends the command + def stop (self, force = False): + + if (not force) and (self.state != self.STATE_TX) and (self.state != self.STATE_PAUSE): + return self.err("port is not transmitting") + + params = {"handler": self.handler, + "port_id": self.port_id} + + rc, data = self.transmit("stop_traffic", params) + if not rc: + return self.err(data) + + # only valid state after stop + self.state = self.STATE_STREAMS + + return self.ok() + + class CTRexStatelessClient(object): """docstring for CTRexStatelessClient""" - def __init__(self, username, server="localhost", sync_port=5050, async_port = 4500, virtual=False): + def __init__(self, username, server="localhost", sync_port = 5050, async_port = 4500, virtual=False): super(CTRexStatelessClient, self).__init__() self.user = username self.comm_link = CTRexStatelessClient.CCommLink(server, sync_port, virtual) self.verbose = False self._conn_handler = {} self._active_ports = set() - self._stats = CTRexStatsManager("port", "stream") self._system_info = None self._server_version = None self.__err_log = None self._async_client = CTRexAsyncClient(async_port) + self.streams_db = CStreamsDB() - # ----- decorator methods ----- # - def acquired(func): - def wrapper_f(self, *args, **kwargs): - # print func.__name__ - # print args - # print kwargs - port_ids = kwargs.get("port_id") - # if not port_ids: - # # print "FROM ARGS!" - # # print args - # port_ids = args[0] - if isinstance(port_ids, int): - # make sure port_ids is a list - port_ids = [port_ids] - bad_ids = set() - # print "=============" - # print port_ids - for port_id in port_ids: - port_owned = self._conn_handler.get(port_id) - if not port_owned: - bad_ids.add(port_id) - # elif active_and_owned: # stronger condition than just owned, hence gets precedence - # if port_owned and port_id in self._active_ports: - # continue - # else: - # bad_ids.add(port_id) - else: - continue - if bad_ids: - # Some port IDs are not according to desires status - raise ValueError("The requested method ('{0}') cannot be invoked since port IDs {1} aren't " - "acquired".format(func.__name__, list(bad_ids))) - else: - return func(self, *args, **kwargs) - return wrapper_f - - def force_status(owned=True, active_and_owned=False): - def wrapper(func): - def wrapper_f(self, *args, **kwargs): - # print args - # print kwargs - port_ids = kwargs.get("port_id") - if not port_ids: - #print "FROM ARGS!" - #print args - port_ids = args[0] - if isinstance(port_ids, int): - # make sure port_ids is a list - port_ids = [port_ids] - bad_ids = set() - # print "=============" - # print port_ids - for port_id in port_ids: - port_owned = self._conn_handler.get(port_id) - if owned and not port_owned: - bad_ids.add(port_id) - elif active_and_owned: # stronger condition than just owned, hence gets precedence - if port_owned and port_id in self._active_ports: - continue - else: - bad_ids.add(port_id) - else: - continue - if bad_ids: - # Some port IDs are not according to desires status - raise ValueError("The requested method ('{0}') cannot be invoked since port IDs {1} are not " - "at allowed states".format(func.__name__, list(bad_ids))) - else: - return func(self, *args, **kwargs) - return wrapper_f - return wrapper - - @property - def system_info(self): - if not self._system_info: - rc, info = self.get_system_info() - if rc: - self._system_info = info - else: - self.__err_log = info - return self._system_info if self._system_info else "Unknown" - - @property - def server_version(self): - if not self._server_version: - rc, ver_info = self.get_version() - if rc: - self._server_version = ver_info - else: - self.__err_log = ver_info - return self._server_version if self._server_version else "Unknown" + self.connected = False + + ############# helper functions section ############## + + def validate_port_list(self, port_id_list): + if not isinstance(port_id_list, list): + print type(port_id_list) + return False + + # check each item of the sequence + return all([ (port_id >= 0) and (port_id < self.get_port_count()) + for port_id in port_id_list ]) + + # some preprocessing for port argument + def __ports (self, port_id_list): - def is_connected(self): - return self.comm_link.is_connected + # none means all + if port_id_list == None: + return range(0, self.get_port_count()) - # ----- user-access methods ----- # - def connect(self): - rc, err = self.comm_link.connect() + # always list + if isinstance(port_id_list, int): + port_id_list = [port_id_list] + + if not isinstance(port_id_list, list): + raise ValueError("bad port id list: {0}".format(port_id_list)) + + for port_id in port_id_list: + if not isinstance(port_id, int) or (port_id < 0) or (port_id > self.get_port_count()): + raise ValueError("bad port id {0}".format(port_id)) + + return port_id_list + + ############ boot up section ################ + + # connection sequence + def connect (self): + + self.connected = False + + # connect + rc, data = self.comm_link.connect() if not rc: - return rc, err - return self._init_sync() + return RC_ERR(data) - def get_stats_async (self): - return self._async_client.get_stats() + # cache system info + rc, data = self.transmit("get_system_info") + if not rc: + return RC_ERR(data) + + self.system_info = data + + # cache supported cmds + rc, data = self.transmit("get_supported_cmds") + if not rc: + return RC_ERR(data) + + self.supported_cmds = data + + # create ports + self.ports = [] + for port_id in xrange(0, self.get_port_count()): + self.ports.append(Port(port_id, self.user, self.transmit)) + + # acquire all ports + rc = self.acquire() + if rc.bad(): + return rc + + rc = self.sync_with_server() + if rc.bad(): + return rc + + self.connected = True + + return RC_OK() + + def is_connected (self): + return self.connected - def get_connection_port (self): - return self.comm_link.port def disconnect(self): - return self.comm_link.disconnect() + self.connected = False + self.comm_link.disconnect() + return RC_OK() - def ping(self): - return self.transmit("ping") + + + ########### cached queries (no server traffic) ########### def get_supported_cmds(self): - return self.transmit("get_supported_cmds") + return self.supported_cmds def get_version(self): - return self.transmit("get_version") + return self.server_version def get_system_info(self): - return self.transmit("get_system_info") + return self.system_info def get_port_count(self): return self.system_info.get("port_count") @@ -177,483 +450,406 @@ class CTRexStatelessClient(object): else: return port_ids - def sync_user(self, sync_streams=False): - return self.transmit("sync_user", {"user": self.user, "sync_streams": sync_streams}) + def get_stats_async (self): + return self._async_client.get_stats() + + def get_connection_port (self): + return self.comm_link.port + + def get_connection_ip (self): + return self.comm_link.server def get_acquired_ports(self): - return self._conn_handler.keys() + return [port.port_id for port in self.ports if port.is_acquired()] + def get_active_ports(self): - return list(self._active_ports) + return [port.port_id for port in self.ports if port.is_active()] def set_verbose(self, mode): self.comm_link.set_verbose(mode) self.verbose = mode - def acquire(self, port_id, force=False): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - if isinstance(port_id, list) or isinstance(port_id, set): - # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications - commands = [RpcCmdData("acquire", {"port_id": p_id, "user": self.user, "force": force}) - for p_id in port_ids] - rc, resp_list = self.transmit_batch(commands) - if rc: - return self._process_batch_result(commands, resp_list, self._handle_acquire_response) - else: - params = {"port_id": port_id, - "user": self.user, - "force": force} - command = RpcCmdData("acquire", params) - return self._handle_acquire_response(command, - self.transmit(command.method, command.params), - self.default_success_test) - - @force_status(owned=True) - def release(self, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - if isinstance(port_id, list) or isinstance(port_id, set): - # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications - commands = [RpcCmdData("release", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) - for p_id in port_ids] - rc, resp_list = self.transmit_batch(commands) - if rc: - return self._process_batch_result(commands, resp_list, self._handle_release_response, - success_test=self.ack_success_test) - else: - self._conn_handler.pop(port_id) - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} - command = RpcCmdData("release", params) - return self._handle_release_response(command, - self.transmit(command.method, command.params), - self.ack_success_test) - - @acquired - def add_stream(self, stream_id, stream_obj, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - assert isinstance(stream_obj, CStream) - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id, - "stream_id": stream_id, - "stream": stream_obj.dump()} - return self.transmit("add_stream", params) + ############# server actions ################ + + # ping server + def ping(self): + rc, info = self.transmit("ping") + return RC(rc, info) + + + def sync_with_server(self, sync_streams=False): + rc, data = self.transmit("sync_user", {"user": self.user, "sync_streams": sync_streams}) + if not rc: + return RC_ERR(data) + + for port_info in data: + + rc = self.ports[port_info['port_id']].sync(port_info) + if rc.bad(): + return rc + + return RC_OK() + + + + ########## port commands ############## + # acquire ports, if port_list is none - get all + def acquire (self, port_id_list = None, force = False): + port_id_list = self.__ports(port_id_list) + + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].acquire(force)) + + return rc + + # release ports + def release (self, port_id_list = None): + port_id_list = self.__ports(port_id_list) + + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].release(force)) + + return rc + + + def add_stream(self, stream_id, stream_obj, port_id_list = None): + + port_id_list = self.__ports(port_id_list) + + rc = RC() - @acquired - def add_stream_pack(self, stream_pack_list, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") + for port_id in port_id_list: + rc.add(self.ports[port_id].add_stream(stream_id, stream_obj)) + + return rc + + + def add_stream_pack(self, stream_pack_list, port_id_list = None): + + port_id_list = self.__ports(port_id_list) + + rc = RC() - # since almost every run contains more than one transaction with server, handle all as batch mode - port_ids = set(port_id) # convert to set to avoid duplications - commands = [] for stream_pack in stream_pack_list: - commands.extend([RpcCmdData("add_stream", {"port_id": p_id, - "handler": self._conn_handler.get(p_id), - "stream_id": stream_pack.stream_id, - "stream": stream_pack.stream} - ) - for p_id in port_ids] - ) - res_ok, resp_list = self.transmit_batch(commands) - if not res_ok: - return res_ok, resp_list - - return self._process_batch_result(commands, resp_list, self._handle_add_stream_response, - success_test=self.ack_success_test) - - @force_status(owned=True) - def remove_stream(self, stream_id, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id, - "stream_id": stream_id} - return self.transmit("remove_stream", params) - - def remove_all_streams(self, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - if isinstance(port_id, list) or isinstance(port_id, set): - # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications - commands = [RpcCmdData("remove_all_streams", {"port_id": p_id, "handler": self._conn_handler.get(p_id)}) - for p_id in port_ids] - rc, resp_list = self.transmit_batch(commands) - if rc: - return self._process_batch_result(commands, resp_list, self._handle_remove_streams_response, - success_test=self.ack_success_test) - else: - params = {"port_id": port_id, - "handler": self._conn_handler.get(port_id)} - command = RpcCmdData("remove_all_streams", params) - return self._handle_remove_streams_response(command, - self.transmit(command.method, command.params), - self.ack_success_test) - pass + rc.add(self.add_stream(stream_pack.stream_id, stream_pack.stream, port_id_list)) + + return rc - @force_status(owned=True)#, active_and_owned=True) - def get_all_streams(self, port_id, get_pkt = False): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id, - "get_pkt": get_pkt} - return self.transmit("get_all_streams", params) - - @force_status(owned=True)#, active_and_owned=True) - def get_stream_id_list(self, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} - return self.transmit("get_stream_list", params) - - @force_status(owned=True, active_and_owned=True) + + def remove_stream(self, stream_id, port_id_list = None): + port_id_list = self.__ports(port_id_list) + + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].remove_stream(stream_id)) + + return rc + + + + def remove_all_streams(self, port_id_list = None): + port_id_list = self.__ports(port_id_list) + + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].remove_all_streams()) + + return rc + + def get_stream(self, stream_id, port_id, get_pkt = False): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id, - "stream_id": stream_id, - "get_pkt": get_pkt} - return self.transmit("get_stream_list", params) - - def start_traffic(self, multiplier, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - if isinstance(port_id, list) or isinstance(port_id, set): - # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications - commands = [RpcCmdData("start_traffic", {"handler": self._conn_handler.get(p_id), - "port_id": p_id, - "mul": multiplier}) - for p_id in port_ids] - rc, resp_list = self.transmit_batch(commands) - if rc: - return self._process_batch_result(commands, resp_list, self._handle_start_traffic_response, - success_test=self.ack_success_test) - else: - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id, - "mul": multiplier} - command = RpcCmdData("start_traffic", params) - return self._handle_start_traffic_response(command, - self.transmit(command.method, command.params), - self.ack_success_test) - - def stop_traffic(self, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - if isinstance(port_id, list) or isinstance(port_id, set): - # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications - if not port_ids: - # don't invoke if port ids is empty - return True, [] - commands = [RpcCmdData("stop_traffic", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) - for p_id in port_ids] - rc, resp_list = self.transmit_batch(commands) - if rc: - return self._process_batch_result(commands, resp_list, self._handle_stop_traffic_response, - success_test=self.ack_success_test) - else: - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} - command = RpcCmdData("stop_traffic", params) - return self._handle_stop_traffic_response(command, - self.transmit(command.method, command.params), - self.ack_success_test) - -# def get_global_stats(self): -# command = RpcCmdData("get_global_stats", {}) -# return self._handle_get_global_stats_response(command, self.transmit(command.method, command.params)) -# # return self.transmit("get_global_stats") - - @force_status(owned=True, active_and_owned=True) + + return self.ports[port_id].get_stream(stream_id) + + + def get_all_streams(self, port_id, get_pkt = False): + + return self.ports[port_id].get_all_streams() + + + def get_stream_id_list(self, port_id): + + return self.ports[port_id].get_stream_id_list() + + + def start_traffic (self, multiplier, port_id_list = None): + + port_id_list = self.__ports(port_id_list) + + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].start(multiplier)) + + return rc + + + + def stop_traffic (self, port_id_list = None, force = False): + + port_id_list = self.__ports(port_id_list) + rc = RC() + + for port_id in port_id_list: + rc.add(self.ports[port_id].stop(force)) + + return rc + + def get_port_stats(self, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - if isinstance(port_id, list) or isinstance(port_id, set): - # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications - commands = [RpcCmdData("get_port_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) - for p_id in port_ids] - rc, resp_list = self.transmit_batch(commands) - if rc: - self._process_batch_result(commands, resp_list, self._handle_get_port_stats_response) - else: - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} - command = RpcCmdData("get_port_stats", params) - return self._handle_get_port_stats_response(command, self.transmit(command.method, command.params)) + pass - @force_status(owned=True, active_and_owned=True) def get_stream_stats(self, port_id=None): - if not self._is_ports_valid(port_id): - raise ValueError("Provided illegal port id input") - if isinstance(port_id, list) or isinstance(port_id, set): - # handle as batch mode - port_ids = set(port_id) # convert to set to avoid duplications - commands = [RpcCmdData("get_stream_stats", {"handler": self._conn_handler.get(p_id), "port_id": p_id}) - for p_id in port_ids] - rc, resp_list = self.transmit_batch(commands) - if rc: - self._process_batch_result(commands, resp_list, self._handle_get_stream_stats_response) - else: - params = {"handler": self._conn_handler.get(port_id), - "port_id": port_id} - command = RpcCmdData("get_stream_stats", params) - return self._handle_get_stream_stats_response(command, self.transmit(command.method, command.params)) - - # ----- internal methods ----- # - def _init_sync(self): - # get server version and system info - err = False - if self.server_version == "Unknown" or self.system_info == "Unknown": - self.disconnect() - return False, self.__err_log - # sync with previous session - res_ok, port_info = self.sync_user() - if not res_ok: - self.disconnect() - return False, port_info - else: - # handle sync data - for port in port_info: - self._conn_handler[port.get("port_id")] = port.get("handler") - if port.get("state") == "transmitting": - # port is active - self._active_ports.add(port.get("port_id")) - return True, "" + pass def transmit(self, method_name, params={}): return self.comm_link.transmit(method_name, params) - def transmit_batch(self, batch_list): - return self.comm_link.transmit_batch(batch_list) - - @staticmethod - def _object_decoder(obj_type, obj_data): - if obj_type == "global": - return CGlobalStats(**obj_data) - elif obj_type == "port": - return CPortStats(**obj_data) - elif obj_type == "stream": - return CStreamStats(**obj_data) - else: - # Do not serialize the data into class - return obj_data - @staticmethod - def default_success_test(result_obj): - if result_obj.success: - return True - else: - return False - @staticmethod - def ack_success_test(result_obj): - if result_obj.success and result_obj.data == "ACK": - return True - else: - return False + ######################### Console (high level) API ######################### + def cmd_ping (self): + rc = self.ping() + rc.annotate("Pinging the server on '{0}' port '{1}': ".format(self.get_connection_ip(), self.get_connection_port())) + return rc - ######################### Console (high level) API ######################### + def cmd_connect (self): + rc = self.connect() + rc.annotate() + return rc + + def cmd_disconnect (self): + rc = self.disconnect() + rc.annotate() + return rc # reset - # acquire, stop, remove streams and clear stats - # - # - def cmd_reset (self, annotate_func): + def cmd_reset (self): - ports = self.get_port_ids() # sync with the server - rc, log = self._init_sync() - annotate_func("Syncing with the server:", rc, log) - if not rc: - return False + rc = self.sync_with_server() + rc.annotate("Syncing with the server:") + if rc.bad(): + return rc + rc = self.acquire(force = True) + rc.annotate("Force acquiring all ports:") + if rc.bad(): + return rc - # force acquire all ports - rc, log = self.acquire(ports, force = True) - annotate_func("Force acquiring all ports:", rc, log) - if not rc: - return False - # force stop - rc, log = self.stop_traffic(ports) - annotate_func("Stop traffic on all ports:", rc, log) - if not rc: - return False + # force stop all ports + rc = self.stop_traffic(self.get_port_ids(), True) + rc.annotate("Stop traffic on all ports:") + if rc.bad(): + return rc + # remove all streams - rc, log = self.remove_all_streams(ports) - annotate_func("Removing all streams from all ports:", rc, log) - if not rc: - return False + rc = self.remove_all_streams(self.get_port_ids()) + rc.annotate("Removing all streams from all ports:") + if rc.bad(): + return rc # TODO: clear stats - return True + return RC_OK() # stop cmd - def cmd_stop (self, ports, annotate_func): + def cmd_stop (self, port_id_list): # find the relveant ports - active_ports = set(self.get_active_ports()).intersection(ports) + active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) + if not active_ports: - annotate_func("No active traffic on porivded ports") - return True + msg = "No active traffic on porvided ports" + print format_text(msg, 'bold') + return RC_ERR(msg) - rc, log = self.stop_traffic(active_ports) - annotate_func("Stopping traffic on ports {0}:".format([port for port in active_ports]), rc, log) - if not rc: - return False + rc = self.stop_traffic(active_ports) + rc.annotate("Stopping traffic on port(s) {0}:".format(port_id_list)) + if rc.bad(): + return rc - return True + return RC_OK() # start cmd - def cmd_start (self, ports, stream_list, mult, force, annotate_func): + def cmd_start (self, port_id_list, stream_list, mult, force): - if force: - rc = self.cmd_stop(ports, annotate_func) - if not rc: - return False + active_ports = list(set(self.get_active_ports()).intersection(port_id_list)) - rc, log = self.remove_all_streams(ports) - annotate_func("Removing all streams from ports {0}:".format([port for port in ports]), rc, log, - "Please either retry with --force or stop traffic") - if not rc: - return False + if active_ports: + if not force: + msg = "Port(s) {0} are active - please stop them or add '--force'".format(active_ports) + print format_text(msg, 'bold') + return RC_ERR(msg) + else: + rc = self.cmd_stop(active_ports) + if not rc: + return rc - rc, log = self.add_stream_pack(stream_list.compiled, port_id= ports) - annotate_func("Attaching streams to port {0}:".format([port for port in ports]), rc, log) - if not rc: - return False - # finally, start the traffic - rc, log = self.start_traffic(mult, ports) - annotate_func("Starting traffic on ports {0}:".format([port for port in ports]), rc, log) - if not rc: - return False + rc = self.remove_all_streams(port_id_list) + rc.annotate("Removing all streams from port(s) {0}:".format(port_id_list)) + if rc.bad(): + return rc - return True - # ----- handler internal methods ----- # - def _handle_general_response(self, request, response, msg, success_test=None): - port_id = request.params.get("port_id") - if not success_test: - success_test = self.default_success_test - if success_test(response): - self._conn_handler[port_id] = response.data - return RpcResponseStatus(True, port_id, msg) - else: - return RpcResponseStatus(False, port_id, response.data) + rc = self.add_stream_pack(stream_list.compiled, port_id_list) + rc.annotate("Attaching streams to port(s) {0}:".format(port_id_list)) + if rc.bad(): + return rc - def _handle_acquire_response(self, request, response, success_test): - port_id = request.params.get("port_id") - if success_test(response): - self._conn_handler[port_id] = response.data - return RpcResponseStatus(True, port_id, "Acquired") - else: - return RpcResponseStatus(False, port_id, response.data) + # finally, start the traffic + rc = self.start_traffic(mult, port_id_list) + rc.annotate("Starting traffic on port(s) {0}:".format(port_id_list)) + if rc.bad(): + return rc + + return RC_OK() + + ############## High Level API With Parser ################ + def cmd_start_line (self, line): + '''Start selected traffic in specified ports on TRex\n''' + # define a parser + parser = parsing_opts.gen_parser(self, + "start", + self.cmd_start_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL, + parsing_opts.FORCE, + parsing_opts.STREAM_FROM_PATH_OR_FILE, + parsing_opts.DURATION, + parsing_opts.MULTIPLIER) + + opts = parser.parse_args(line.split()) + + if opts is None: + return RC_ERR("bad command line paramters") + + if opts.db: + stream_list = self.stream_db.get_stream_pack(opts.db) + rc = RC(stream_list != None) + rc.annotate("Load stream pack (from DB):") + if rc.bad(): + return RC_ERR("Failed to load stream pack") - def _handle_add_stream_response(self, request, response, success_test): - port_id = request.params.get("port_id") - stream_id = request.params.get("stream_id") - if success_test(response): - return RpcResponseStatus(True, port_id, "Stream {0} added".format(stream_id)) else: - return RpcResponseStatus(False, port_id, response.data) + # load streams from file + stream_list = self.streams_db.load_yaml_file(opts.file[0]) + rc = RC(stream_list != None) + rc.annotate("Load stream pack (from file):") + if stream_list == None: + return RC_ERR("Failed to load stream pack") - def _handle_remove_streams_response(self, request, response, success_test): - port_id = request.params.get("port_id") - if success_test(response): - return RpcResponseStatus(True, port_id, "Removed") - else: - return RpcResponseStatus(False, port_id, response.data) - def _handle_release_response(self, request, response, success_test): - port_id = request.params.get("port_id") - if success_test(response): - del self._conn_handler[port_id] - return RpcResponseStatus(True, port_id, "Released") - else: - return RpcResponseStatus(False, port_id, response.data) + return self.cmd_start(opts.ports, stream_list, opts.mult, opts.force) - def _handle_start_traffic_response(self, request, response, success_test): - port_id = request.params.get("port_id") - if success_test(response): - self._active_ports.add(port_id) - return RpcResponseStatus(True, port_id, "Traffic started") - else: - return RpcResponseStatus(False, port_id, response.data) - - def _handle_stop_traffic_response(self, request, response, success_test): - port_id = request.params.get("port_id") - if success_test(response): - if port_id in self._active_ports: - self._active_ports.remove(port_id) - return RpcResponseStatus(True, port_id, "Traffic stopped") - else: - return RpcResponseStatus(False, port_id, response.data) + def cmd_stop_line (self, line): + '''Stop active traffic in specified ports on TRex\n''' + parser = parsing_opts.gen_parser(self, + "stop", + self.cmd_stop_line.__doc__, + parsing_opts.PORT_LIST_WITH_ALL) - def _handle_get_global_stats_response(self, request, response, success_test): - if response.success: - return CGlobalStats(**response.success) - else: - return False + opts = parser.parse_args(line.split()) + if opts is None: + return RC_ERR("bad command line paramters") - def _handle_get_port_stats_response(self, request, response, success_test): - if response.success: - return CPortStats(**response.success) - else: - return False + return self.cmd_stop(opts.ports) - def _handle_get_stream_stats_response(self, request, response, success_test): - if response.success: - return CStreamStats(**response.success) - else: - return False - def _is_ports_valid(self, port_id): - if isinstance(port_id, list) or isinstance(port_id, set): - # check each item of the sequence - return all([self._is_ports_valid(port) - for port in port_id]) - elif (isinstance(port_id, int)) and (port_id >= 0) and (port_id <= self.get_port_count()): - return True - else: - return False + def cmd_reset_line (self, line): + return self.cmd_reset() + + + def cmd_exit_line (self, line): + print format_text("Exiting\n", 'bold') + # a way to exit + return RC_ERR("exit") + + + def cmd_wait_line (self, line): + '''wait for a period of time\n''' + + parser = parsing_opts.gen_parser(self, + "wait", + self.cmd_wait_line.__doc__, + parsing_opts.DURATION) + + opts = parser.parse_args(line.split()) + if opts is None: + return RC_ERR("bad command line paramters") + + delay_sec = opts.d if opts.d else 1 - def _process_batch_result(self, req_list, resp_list, handler_func=None, success_test=default_success_test): - res_ok = True - responses = [] - if isinstance(success_test, staticmethod): - success_test = success_test.__func__ - for i, response in enumerate(resp_list): - # run handler method with its params - processed_response = handler_func(req_list[i], response, success_test) - responses.append(processed_response) - if not processed_response.success: - res_ok = False - # else: - # res_ok = False # TODO: mark in this case somehow the bad result - # print res_ok - # print responses - return res_ok, responses + print format_text("Waiting for {0} seconds...\n".format(delay_sec), 'bold') + time.sleep(delay_sec) + return RC_OK() + # run a script of commands + def run_script_file (self, filename): + + print format_text("\nRunning script file '{0}'...".format(filename), 'bold') + + rc = self.cmd_connect() + if rc.bad(): + return + + with open(filename) as f: + script_lines = f.readlines() + + cmd_table = {} + + # register all the commands + cmd_table['start'] = self.cmd_start_line + cmd_table['stop'] = self.cmd_stop_line + cmd_table['reset'] = self.cmd_reset_line + cmd_table['wait'] = self.cmd_wait_line + cmd_table['exit'] = self.cmd_exit_line + + for index, line in enumerate(script_lines): + line = line.strip() + if line == "": + continue + if line.startswith("#"): + continue + + sp = line.split(' ', 1) + cmd = sp[0] + if len(sp) == 2: + args = sp[1] + else: + args = "" + + print format_text("Executing line {0} : '{1}'\n".format(index, line)) + + if not cmd in cmd_table: + print "\n*** Error at line {0} : '{1}'\n".format(index, line) + print format_text("unknown command '{0}'\n".format(cmd), 'bold') + return False + + rc = cmd_table[cmd](args) + if rc.bad(): + return False + + print format_text("\n[Done]", 'bold') + + return True + + ################################# # ------ private classes ------ # class CCommLink(object): """describes the connectivity of the stateless client method""" @@ -713,3 +909,4 @@ class CTRexStatelessClient(object): if __name__ == "__main__": pass + diff --git a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py index 58491aba..077c82ad 100755 --- a/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py +++ b/scripts/automation/trex_control_plane/client_utils/jsonrpc_client.py @@ -110,7 +110,7 @@ class JsonRpcClient(object): return id, msg - def invoke_rpc_method (self, method_name, params = {}, block = False): + def invoke_rpc_method (self, method_name, params = {}, block = True): if not self.connected: return False, "Not connected to server" @@ -120,7 +120,7 @@ class JsonRpcClient(object): # low level send of string message - def send_raw_msg (self, msg, block = False): + def send_raw_msg (self, msg, block = True): self.verbose_msg("Sending Request To Server:\n\n" + self.pretty_json(msg) + "\n") if block: @@ -248,272 +248,3 @@ class JsonRpcClient(object): if hasattr(self, "context"): self.context.destroy(linger=0) -# MOVE THIS TO DAN'S FILE -class TrexStatelessClient(JsonRpcClient): - - def __init__ (self, server, port, user): - - super(TrexStatelessClient, self).__init__(server, port) - - self.user = user - self.port_handlers = {} - - self.supported_cmds = [] - self.system_info = None - self.server_version = None - - - def whoami (self): - return self.user - - def ping_rpc_server(self): - - return self.invoke_rpc_method("ping", block = False) - - def get_rpc_server_version (self): - return self.server_version - - def get_system_info (self): - if not self.system_info: - return {} - - return self.system_info - - def get_supported_cmds(self): - if not self.supported_cmds: - return {} - - return self.supported_cmds - - def get_port_count (self): - if not self.system_info: - return 0 - - return self.system_info["port_count"] - - # sync the client with all the server required data - def sync (self): - - # get server version - rc, msg = self.invoke_rpc_method("get_version") - if not rc: - self.disconnect() - return rc, msg - - self.server_version = msg - - # get supported commands - rc, msg = self.invoke_rpc_method("get_supported_cmds") - if not rc: - self.disconnect() - return rc, msg - - self.supported_cmds = [str(x) for x in msg if x] - - # get system info - rc, msg = self.invoke_rpc_method("get_system_info") - if not rc: - self.disconnect() - return rc, msg - - self.system_info = msg - - return True, "" - - def connect (self): - rc, err = super(TrexStatelessClient, self).connect() - if not rc: - return rc, err - - return self.sync() - - - # take ownership over ports - def take_ownership (self, port_id_array, force = False): - if not self.connected: - return False, "Not connected to server" - - batch = self.create_batch() - - for port_id in port_id_array: - batch.add("acquire", params = {"port_id":port_id, "user":self.user, "force":force}) - - rc, resp_list = batch.invoke() - if not rc: - return rc, resp_list - - for i, rc in enumerate(resp_list): - if rc[0]: - self.port_handlers[port_id_array[i]] = rc[1] - - return True, resp_list - - - def release_ports (self, port_id_array): - batch = self.create_batch() - - for port_id in port_id_array: - - # let the server handle un-acquired errors - if self.port_handlers.get(port_id): - handler = self.port_handlers[port_id] - else: - handler = "" - - batch.add("release", params = {"port_id":port_id, "handler":handler}) - - - rc, resp_list = batch.invoke() - if not rc: - return rc, resp_list - - for i, rc in enumerate(resp_list): - if rc[0]: - self.port_handlers.pop(port_id_array[i]) - - return True, resp_list - - def get_owned_ports (self): - return self.port_handlers.keys() - - # fetch port stats - def get_port_stats (self, port_id_array): - if not self.connected: - return False, "Not connected to server" - - batch = self.create_batch() - - # empty list means all - if port_id_array == []: - port_id_array = list([x for x in xrange(0, self.system_info["port_count"])]) - - for port_id in port_id_array: - - # let the server handle un-acquired errors - if self.port_handlers.get(port_id): - handler = self.port_handlers[port_id] - else: - handler = "" - - batch.add("get_port_stats", params = {"port_id":port_id, "handler":handler}) - - - rc, resp_list = batch.invoke() - - return rc, resp_list - - # snapshot will take a snapshot of all your owned ports for streams and etc. - def snapshot(self): - - - if len(self.get_owned_ports()) == 0: - return {} - - snap = {} - - batch = self.create_batch() - - for port_id in self.get_owned_ports(): - - batch.add("get_port_stats", params = {"port_id": port_id, "handler": self.port_handlers[port_id]}) - batch.add("get_stream_list", params = {"port_id": port_id, "handler": self.port_handlers[port_id]}) - - rc, resp_list = batch.invoke() - if not rc: - return rc, resp_list - - # split the list to 2s - index = 0 - for port_id in self.get_owned_ports(): - if not resp_list[index] or not resp_list[index + 1]: - snap[port_id] = None - continue - - # fetch the first two - stats = resp_list[index][1] - stream_list = resp_list[index + 1][1] - - port = {} - port['status'] = stats['status'] - port['stream_list'] = [] - - # get all the streams - if len(stream_list) > 0: - batch = self.create_batch() - for stream_id in stream_list: - batch.add("get_stream", params = {"port_id": port_id, "stream_id": stream_id, "handler": self.port_handlers[port_id]}) - - rc, stream_resp_list = batch.invoke() - if not rc: - port = {} - - port['streams'] = {} - for i, resp in enumerate(stream_resp_list): - if resp[0]: - port['streams'][stream_list[i]] = resp[1] - - snap[port_id] = port - - # move to next one - index += 2 - - - return snap - - # add stream - # def add_stream (self, port_id, stream_id, isg, next_stream_id, packet, vm=[]): - # if not port_id in self.get_owned_ports(): - # return False, "Port {0} is not owned... please take ownership before adding streams".format(port_id) - # - # handler = self.port_handlers[port_id] - # - # stream = {} - # stream['enabled'] = True - # stream['self_start'] = True - # stream['isg'] = isg - # stream['next_stream_id'] = next_stream_id - # stream['packet'] = {} - # stream['packet']['binary'] = packet - # stream['packet']['meta'] = "" - # stream['vm'] = vm - # stream['rx_stats'] = {} - # stream['rx_stats']['enabled'] = False - # - # stream['mode'] = {} - # stream['mode']['type'] = 'continuous' - # stream['mode']['pps'] = 10.0 - # - # params = {} - # params['handler'] = handler - # params['stream'] = stream - # params['port_id'] = port_id - # params['stream_id'] = stream_id - # - # print params - # return self.invoke_rpc_method('add_stream', params = params) - - def add_stream(self, port_id_array, stream_pack_list): - batch = self.create_batch() - - for port_id in port_id_array: - for stream_pack in stream_pack_list: - params = {"port_id": port_id, - "handler": self.port_handlers[port_id], - "stream_id": stream_pack.stream_id, - "stream": stream_pack.stream} - batch.add("add_stream", params=params) - rc, resp_list = batch.invoke() - if not rc: - return rc, resp_list - - for i, rc in enumerate(resp_list): - if rc[0]: - print "Stream {0} - {1}".format(i, rc[1]) - # self.port_handlers[port_id_array[i]] = rc[1] - - return True, resp_list - - # return self.invoke_rpc_method('add_stream', params = params) - -if __name__ == "__main__": - pass
\ No newline at end of file diff --git a/scripts/automation/trex_control_plane/console/parsing_opts.py b/scripts/automation/trex_control_plane/console/parsing_opts.py index f983d837..c154ce24 100755 --- a/scripts/automation/trex_control_plane/console/parsing_opts.py +++ b/scripts/automation/trex_control_plane/console/parsing_opts.py @@ -135,12 +135,13 @@ class CCmdArgParser(argparse.ArgumentParser): if opts is None: return None - if opts.all_ports: + if getattr(opts, "all_ports", None): opts.ports = self.stateless_client.get_port_ids() - for port in opts.ports: - if not self.stateless_client._is_ports_valid(port): - self.error("port id {0} is not a valid\n".format(port)) + if getattr(opts, "ports", None): + for port in opts.ports: + if not self.stateless_client.validate_port_list([port]): + self.error("port id '{0}' is not a valid port id\n".format(port)) return opts diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py index 5470e694..88e8dede 100755 --- a/scripts/automation/trex_control_plane/console/trex_console.py +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -30,72 +30,12 @@ import tty, termios import trex_root_path from common.trex_streams import * from client.trex_stateless_client import CTRexStatelessClient -from client.trex_stateless_client import RpcResponseStatus from common.text_opts import * from client_utils.general_utils import user_input, get_current_user -import parsing_opts import trex_status -from collections import namedtuple -__version__ = "1.0" - -LoadedStreamList = namedtuple('LoadedStreamList', ['loaded', 'compiled']) - -class CStreamsDB(object): - - def __init__(self): - self.stream_packs = {} - - def load_yaml_file (self, filename): - - stream_pack_name = filename - if stream_pack_name in self.get_loaded_streams_names(): - self.remove_stream_packs(stream_pack_name) - - stream_list = CStreamList() - loaded_obj = stream_list.load_yaml(filename) - try: - compiled_streams = stream_list.compile_streams() - rc = self.load_streams(stream_pack_name, - LoadedStreamList(loaded_obj, - [StreamPack(v.stream_id, v.stream.dump()) - for k, v in compiled_streams.items()])) - - except Exception as e: - return None - - return self.get_stream_pack(stream_pack_name) - - def load_streams(self, name, LoadedStreamList_obj): - if name in self.stream_packs: - return False - else: - self.stream_packs[name] = LoadedStreamList_obj - return True - - def remove_stream_packs(self, *names): - removed_streams = [] - for name in names: - removed = self.stream_packs.pop(name) - if removed: - removed_streams.append(name) - return removed_streams - - def clear(self): - self.stream_packs.clear() - - def get_loaded_streams_names(self): - return self.stream_packs.keys() - - def stream_pack_exists (self, name): - return name in self.get_loaded_streams_names() - - def get_stream_pack(self, name): - if not self.stream_pack_exists(name): - return None - else: - return self.stream_packs.get(name) +__version__ = "1.0" # @@ -111,15 +51,11 @@ class TRexConsole(cmd.Cmd): self.verbose = verbose self.acquire_all_ports = acquire_all_ports - self.do_connect("") - self.intro = "\n-=TRex Console v{ver}=-\n".format(ver=__version__) self.intro += "\nType 'help' or '?' for supported actions\n" self.postcmd(False, "") - self.streams_db = CStreamsDB() - ################### internal section ######################## @@ -155,51 +91,23 @@ class TRexConsole(cmd.Cmd): path = dir else: path = "." - start_string = os.path.basename(text) - return [x - for x in os.listdir(path) - if x.startswith(start_string)] - - ####################### shell commands ####################### - # set verbose on / off - def do_verbose(self, line): - '''Shows or set verbose mode\n''' - if line == "": - print "\nverbose is " + ("on\n" if self.verbose else "off\n") - - elif line == "on": - self.verbose = True - self.stateless_client.set_verbose(True) - print green("\nverbose set to on\n") - - elif line == "off": - self.verbose = False - self.stateless_client.set_verbose(False) - print green("\nverbose set to off\n") - else: - print magenta("\nplease specify 'on' or 'off'\n") - - ############### connect - def do_connect (self, line): - '''Connects to the server\n''' - - res_ok, msg = self.stateless_client.connect() - if res_ok: - print format_text("[SUCCESS]\n", 'green', 'bold') - else: - print "\n*** " + msg + "\n" - print format_text("[FAILED]\n", 'red', 'bold') - return + start_string = os.path.basename(text) + + targets = [] - self.supported_rpc = self.stateless_client.get_supported_cmds().data + for x in os.listdir(path): + if x.startswith(start_string): + y = os.path.join(path, x) + if os.path.isfile(y): + targets.append(x + ' ') + elif os.path.isdir(y): + targets.append(x + '/') - if self.acquire_all_ports: - res_ok, log = self.stateless_client.acquire(self.stateless_client.get_port_ids()) - if not res_ok: - print "\n*** Failed to acquire all ports... exiting...""" + return targets + # annotation method @staticmethod def annotate (desc, rc = None, err_log = None, ext_err_msg = None): print format_text('\n{:<40}'.format(desc), 'bold'), @@ -230,6 +138,54 @@ class TRexConsole(cmd.Cmd): return True + ####################### shell commands ####################### + def do_ping (self, line): + '''Ping the server\n''' + + rc = self.stateless_client.cmd_ping() + if rc.bad(): + return + + def do_test (self, line): + print self.stateless_client.get_acquired_ports() + + # set verbose on / off + def do_verbose(self, line): + '''Shows or set verbose mode\n''' + if line == "": + print "\nverbose is " + ("on\n" if self.verbose else "off\n") + + elif line == "on": + self.verbose = True + self.stateless_client.set_verbose(True) + print format_text("\nverbose set to on\n", 'green', 'bold') + + elif line == "off": + self.verbose = False + self.stateless_client.set_verbose(False) + print format_text("\nverbose set to off\n", 'green', 'bold') + + else: + print format_text("\nplease specify 'on' or 'off'\n", 'bold') + + + ############### connect + def do_connect (self, line): + '''Connects to the server\n''' + + rc = self.stateless_client.cmd_connect() + if rc.bad(): + return + + + def do_disconnect (self, line): + '''Disconnect from the server\n''' + + rc = self.stateless_client.cmd_disconnect() + if rc.bad(): + return + + ############### start def complete_start(self, text, line, begidx, endidx): @@ -247,37 +203,7 @@ class TRexConsole(cmd.Cmd): def do_start(self, line): '''Start selected traffic in specified ports on TRex\n''' - # make sure that the user wants to acquire all - parser = parsing_opts.gen_parser(self.stateless_client, - "start", - self.do_start.__doc__, - parsing_opts.PORT_LIST_WITH_ALL, - parsing_opts.FORCE, - parsing_opts.STREAM_FROM_PATH_OR_FILE, - parsing_opts.DURATION, - parsing_opts.MULTIPLIER) - - opts = parser.parse_args(line.split()) - - if opts is None: - return - - if opts.db: - stream_list = self.stream_db.get_stream_pack(opts.db) - self.annotate("Load stream pack (from DB):", (stream_list != None)) - if stream_list == None: - return - - else: - # load streams from file - stream_list = self.streams_db.load_yaml_file(opts.file[0]) - self.annotate("Load stream pack (from file):", (stream_list != None)) - if stream_list == None: - return - - - self.stateless_client.cmd_start(opts.ports, stream_list, opts.mult, opts.force, self.annotate) - return + self.stateless_client.cmd_start_line(line) def help_start(self): @@ -285,18 +211,9 @@ class TRexConsole(cmd.Cmd): ############# stop def do_stop(self, line): - '''Stop active traffic in specified ports on TRex\n''' - parser = parsing_opts.gen_parser(self.stateless_client, - "stop", - self.do_stop.__doc__, - parsing_opts.PORT_LIST_WITH_ALL) - - opts = parser.parse_args(line.split()) - if opts is None: - return + self.stateless_client.cmd_stop_line(line) - self.stateless_client.cmd_stop(opts.ports, self.annotate) - return + def help_stop(self): self.do_stop("-h") @@ -304,7 +221,7 @@ class TRexConsole(cmd.Cmd): ########## reset def do_reset (self, line): '''force stop all ports\n''' - self.stateless_client.cmd_reset(self.annotate) + self.stateless_client.cmd_reset() # tui @@ -312,7 +229,7 @@ class TRexConsole(cmd.Cmd): '''Shows a graphical console\n''' if not self.stateless_client.is_connected(): - print "Not connected to server\n" + print format_text("\nNot connected to server\n", 'bold') return self.do_verbose('off') @@ -364,6 +281,14 @@ class TRexConsole(cmd.Cmd): do_exit = do_EOF = do_q = do_quit +# +def is_valid_file(filename): + if not os.path.isfile(filename): + raise argparse.ArgumentTypeError("The file '%s' does not exist" % filename) + + return filename + + def setParserOptions(): parser = argparse.ArgumentParser(prog="trex_console.py") @@ -393,6 +318,12 @@ def setParserOptions(): action="store_false", help="Acquire all ports on connect. Default is: ON.", default = True) + parser.add_argument("--batch", dest="batch", + nargs = 1, + type = is_valid_file, + help = "Run the console in a batch mode with file", + default = None) + return parser @@ -402,7 +333,15 @@ def main(): # Stateless client connection stateless_client = CTRexStatelessClient(options.user, options.server, options.port, options.pub) + rc = stateless_client.cmd_connect() + if rc.bad(): + return + if options.batch: + cont = stateless_client.run_script_file(options.batch[0]) + if not cont: + return + # console try: console = TRexConsole(stateless_client, options.acquire, options.verbose) diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index 907b9cf4..7f2382d3 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -186,22 +186,22 @@ TrexStatelessPort::get_state_as_string() const { switch (get_state()) { case PORT_STATE_DOWN: - return "down"; + return "DOWN"; case PORT_STATE_IDLE: - return "no streams"; + return "IDLE"; case PORT_STATE_STREAMS: - return "with streams, idle"; + return "STREAMS"; case PORT_STATE_TX: - return "transmitting"; + return "TX"; case PORT_STATE_PAUSE: - return "paused"; + return "PAUSE"; } - return "unknown"; + return "UNKNOWN"; } void |