diff options
Diffstat (limited to 'scripts/automation/trex_control_plane/console/trex_console.py')
-rwxr-xr-x | scripts/automation/trex_control_plane/console/trex_console.py | 917 |
1 files changed, 240 insertions, 677 deletions
diff --git a/scripts/automation/trex_control_plane/console/trex_console.py b/scripts/automation/trex_control_plane/console/trex_console.py index a2c738ab..ea2f5f12 100755 --- a/scripts/automation/trex_control_plane/console/trex_console.py +++ b/scripts/automation/trex_control_plane/console/trex_console.py @@ -23,6 +23,7 @@ import json import ast import argparse import random +import readline import string import os import sys @@ -32,771 +33,312 @@ from common.trex_streams import * from client.trex_stateless_client import CTRexStatelessClient from common.text_opts import * from client_utils.general_utils import user_input, get_current_user -import parsing_opts import trex_status -from collections import namedtuple - -__version__ = "1.0" - -LoadedStreamList = namedtuple('LoadedStreamList', ['loaded', 'compiled']) - +import parsing_opts -def readch(choices=[]): - fd = sys.stdin.fileno() - old_settings = termios.tcgetattr(fd) - try: - tty.setraw(sys.stdin.fileno()) - while True: - ch = sys.stdin.read(1) - if (ord(ch) == 3) or (ord(ch) == 4): - return None - if ch in choices: - return ch - finally: - termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) - - return None - -class ConfirmMenu(object): - def __init__ (self, caption): - self.caption = "{cap} [confirm] : ".format(cap=caption) - - def show(self): - sys.stdout.write(self.caption) - input = user_input() - if input: - return False - else: - # user hit Enter - return True - +__version__ = "1.1" -class CStreamsDB(object): +class TRexGeneralCmd(cmd.Cmd): def __init__(self): - self.stream_packs = {} - - def load_streams(self, name, LoadedStreamList_obj): - if name in self.stream_packs: - return False - else: - self.stream_packs[name] = LoadedStreamList_obj - return True - - def remove_stream_packs(self, *names): - removed_streams = [] - for name in names: - removed = self.stream_packs.pop(name) - if removed: - removed_streams.append(name) - return removed_streams - - def clear(self): - self.stream_packs.clear() - - def get_loaded_streams_names(self): - return self.stream_packs.keys() - - def get_stream_pack(self, name): - return self.stream_packs.get(name) - - -# multi level cmd menu -class CmdMenu(object): - def __init__ (self): - self.menus = [] - - - def add_menu (self, caption, options): - menu = {} - menu['caption'] = caption - menu['options'] = options - self.menus.append(menu) + cmd.Cmd.__init__(self) + # configure history behaviour + self._history_file_dir = "/tmp/trex/console/" + self._history_file = self.get_history_file_full_path() + readline.set_history_length(100) + # load history, if any + self.load_console_history() - def show (self): - cur_level = 0 - print "\n" - selected_path = [] - for menu in self.menus: - # show all the options - print "{0}\n".format(menu['caption']) - for i, option in enumerate(menu['options']): - print "{0}. {1}".format(i + 1, option) + def get_console_identifier(self): + return self.__class__.__name__ - #print "\nPlease select an option: " + def get_history_file_full_path(self): + return "{dir}{filename}.hist".format(dir=self._history_file_dir, + filename=self.get_console_identifier()) - choices = range(0, len(menu['options'])) - choices = [ chr(x + 48) for x in choices] + def load_console_history(self): + if os.path.exists(self._history_file): + readline.read_history_file(self._history_file) + return - print "" - ch = readch(choices) - print "" + def save_console_history(self): + if not os.path.exists(self._history_file_dir): + os.makedirs(self._history_file_dir) + # os.mknod(self._history_file) + readline.write_history_file(self._history_file) + return - if ch == None: - return None + def emptyline(self): + """Called when an empty line is entered in response to the prompt. - selected_path.append(int(ch) - 1) + This overriding is such that when empty line is passed, **nothing happens**. + """ + return - return selected_path + def completenames(self, text, *ignored): + """ + This overriding is such that a space is added to name completion. + """ + dotext = 'do_'+text + return [a[3:]+' ' for a in self.get_names() if a.startswith(dotext)] + def precmd(self, line): + # before doing anything, save history snapshot of the console + # this is done before executing the command in case of ungraceful application exit + self.save_console_history() + return line -class AddStreamMenu(CmdMenu): - def __init__ (self): - super(AddStreamMenu, self).__init__() - self.add_menu('Please select type of stream', ['a', 'b', 'c']) - self.add_menu('Please select ISG', ['d', 'e', 'f']) +# # main console object -class TRexConsole(cmd.Cmd): +class TRexConsole(TRexGeneralCmd): """Trex Console""" - def __init__(self, stateless_client, verbose): - cmd.Cmd.__init__(self) - + def __init__(self, stateless_client, acquire_all_ports=True, verbose=False): self.stateless_client = stateless_client + TRexGeneralCmd.__init__(self) + - self.do_connect("") + self.verbose = verbose + self.acquire_all_ports = acquire_all_ports self.intro = "\n-=TRex Console v{ver}=-\n".format(ver=__version__) self.intro += "\nType 'help' or '?' for supported actions\n" - self.verbose = False - self.postcmd(False, "") - self.user_streams = {} - self.streams_db = CStreamsDB() + ################### internal section ######################## - # a cool hack - i stole this function and added space - def completenames(self, text, *ignored): - dotext = 'do_'+text - return [a[3:]+' ' for a in self.get_names() if a.startswith(dotext)] - + def get_console_identifier(self): + return "{context}_{server}".format(context=self.__class__.__name__, + server=self.stateless_client.get_system_info()['hostname']) + + def register_main_console_methods(self): + main_names = set(self.trex_console.get_names()).difference(set(dir(self.__class__))) + for name in main_names: + for prefix in 'do_', 'help_', 'complete_': + if name.startswith(prefix): + self.__dict__[name] = getattr(self.trex_console, name) - # set verbose on / off - def do_verbose (self, line): - '''Shows or set verbose mode\n''' - if line == "": - print "\nverbose is " + ("on\n" if self.verbose else "off\n") + def postcmd(self, stop, line): + if self.stateless_client.is_connected(): + self.prompt = "TRex > " + else: + self.supported_rpc = None + self.prompt = "TRex (offline) > " - elif line == "on": - self.verbose = True - self.stateless_client.set_verbose(True) - print green("\nverbose set to on\n") + return stop - elif line == "off": - self.verbose = False - self.stateless_client.set_verbose(False) - print green("\nverbose set to off\n") + def default(self, line): + print "'{0}' is an unrecognized command. type 'help' or '?' for a list\n".format(line) + @staticmethod + def tree_autocomplete(text): + dir = os.path.dirname(text) + if dir: + path = dir else: - print magenta("\nplease specify 'on' or 'off'\n") + path = "." - # query the server for registered commands - def do_query_server(self, line): - '''query the RPC server for supported remote commands\n''' - res_ok, msg = self.stateless_client.get_supported_cmds() - if not res_ok: - print format_text("[FAILED]\n", 'red', 'bold') - return - print "\nRPC server supports the following commands:\n" - for func in msg: - if func: - print func - print '' - print format_text("[SUCCESS]\n", 'green', 'bold') - return + start_string = os.path.basename(text) + + targets = [] - def do_ping (self, line): - '''Pings the RPC server\n''' + for x in os.listdir(path): + if x.startswith(start_string): + y = os.path.join(path, x) + if os.path.isfile(y): + targets.append(x + ' ') + elif os.path.isdir(y): + targets.append(x + '/') - print "\n-> Pinging RPC server" + return targets - res_ok, msg = self.stateless_client.ping() - if res_ok: - print format_text("[SUCCESS]\n", 'green', 'bold') - else: - print "\n*** " + msg + "\n" + # annotation method + @staticmethod + def annotate (desc, rc = None, err_log = None, ext_err_msg = None): + print format_text('\n{:<40}'.format(desc), 'bold'), + if rc == None: + print "\n" return - def do_force_acquire (self, line): - '''Acquires ports by force\n''' - - self.do_acquire(line, True) - - def complete_force_acquire(self, text, line, begidx, endidx): - return self.port_auto_complete(text, line, begidx, endidx, acquired=False) - - def extract_port_ids_from_line(self, line): - return {int(x) for x in line.split()} - - def extract_port_ids_from_list(self, port_list): - return {int(x) for x in port_list} + if rc == False: + # do we have a complex log object ? + if isinstance(err_log, list): + print "" + for func in err_log: + if func: + print func + print "" - def parse_ports_from_line (self, line): - port_list = set() - if line: - for port_id in line.split(' '): - if (not port_id.isdigit()) or (int(port_id) < 0) or (int(port_id) >= self.stateless_client.get_port_count()): - print "Please provide a list of ports separated by spaces between 0 and {0}".format(self.stateless_client.get_port_count() - 1) - return None + elif isinstance(err_log, str): + print "\n" + err_log + "\n" - port_list.add(int(port_id)) + print format_text("[FAILED]\n", 'red', 'bold') + if ext_err_msg: + print format_text(ext_err_msg + "\n", 'blue', 'bold') - port_list = list(port_list) + return False else: - port_list = [i for i in xrange(0, self.stateless_client.get_port_count())] - - return port_list + print format_text("[SUCCESS]\n", 'green', 'bold') + return True - def do_acquire(self, line, force=False): - '''Acquire ports\n''' + ####################### shell commands ####################### + def do_ping (self, line): + '''Ping the server\n''' - # make sure that the user wants to acquire all - args = line.split() - if len(args) < 1: - print magenta("Please provide a list of ports separated by spaces, or specify 'all' to acquire all available ports") + rc = self.stateless_client.cmd_ping() + if rc.bad(): return - if args[0] == "all": - ask = ConfirmMenu('Are you sure you want to acquire all ports ? ') - rc = ask.show() - if rc == False: - print yellow("[ABORTED]\n") - return - else: - port_list = self.stateless_client.get_port_ids() - else: - port_list = self.extract_port_ids_from_line(line) - - # rc, resp_list = self.stateless_client.take_ownership(port_list, force) - try: - res_ok, log = self.stateless_client.acquire(port_list, force) - self.prompt_response(log) - if not res_ok: - print format_text("[FAILED]\n", 'red', 'bold') - return - print format_text("[SUCCESS]\n", 'green', 'bold') - except ValueError as e: - print magenta(str(e)) - print format_text("[FAILED]\n", 'red', 'bold') + def do_test (self, line): + print self.stateless_client.get_acquired_ports() + # set verbose on / off + def do_verbose(self, line): + '''Shows or set verbose mode\n''' + if line == "": + print "\nverbose is " + ("on\n" if self.verbose else "off\n") + + elif line == "on": + self.verbose = True + self.stateless_client.set_verbose(True) + print format_text("\nverbose set to on\n", 'green', 'bold') + + elif line == "off": + self.verbose = False + self.stateless_client.set_verbose(False) + print format_text("\nverbose set to off\n", 'green', 'bold') - def port_auto_complete(self, text, line, begidx, endidx, acquired=True, active=False): - if acquired: - if not active: - ret_list = [x - for x in map(str, self.stateless_client.get_acquired_ports()) - if x.startswith(text)] - else: - ret_list = [x - for x in map(str, self.stateless_client.get_active_ports()) - if x.startswith(text)] - else: - ret_list = [x - for x in map(str, self.stateless_client.get_port_ids()) - if x.startswith(text)] - ret_list.append("all") - return ret_list - - - def complete_acquire(self, text, line, begidx, endidx): - return self.port_auto_complete(text, line, begidx, endidx, acquired=False) - - def do_release (self, line): - '''Release ports\n''' - - # if line: - # port_list = self.parse_ports_from_line(line) - # else: - # port_list = self.stateless_client.get_owned_ports() - args = line.split() - if len(args) < 1: - print "Please provide a list of ports separated by spaces, or specify 'all' to acquire all available ports" - if args[0] == "all": - ask = ConfirmMenu('Are you sure you want to release all acquired ports? ') - rc = ask.show() - if rc == False: - print yellow("[ABORTED]\n") - return - else: - port_list = self.stateless_client.get_acquired_ports() else: - port_list = self.extract_port_ids_from_line(line) - - try: - res_ok, log = self.stateless_client.release(port_list) - self.prompt_response(log) - if not res_ok: - print format_text("[FAILED]\n", 'red', 'bold') - return - print format_text("[SUCCESS]\n", 'green', 'bold') - except ValueError as e: - print magenta(str(e)) - print format_text("[FAILED]\n", 'red', 'bold') - return + print format_text("\nplease specify 'on' or 'off'\n", 'bold') - def complete_release(self, text, line, begidx, endidx): - return self.port_auto_complete(text, line, begidx, endidx) + ############### connect def do_connect (self, line): '''Connects to the server\n''' - if line == "": - res_ok, msg = self.stateless_client.connect() - else: - sp = line.split() - if (len(sp) != 2): - print "\n[usage] connect [server] [port] or without parameters\n" - return + rc = self.stateless_client.cmd_connect() + if rc.bad(): + return - res_ok, msg = self.stateless_client.connect(sp[0], sp[1]) - if res_ok: - print format_text("[SUCCESS]\n", 'green', 'bold') - else: - print "\n*** " + msg + "\n" - print format_text("[FAILED]\n", 'red', 'bold') + def do_disconnect (self, line): + '''Disconnect from the server\n''' + + rc = self.stateless_client.cmd_disconnect() + if rc.bad(): return - self.supported_rpc = self.stateless_client.get_supported_cmds().data + + ############### start - def do_rpc (self, line): - '''Launches a RPC on the server\n''' + def complete_start(self, text, line, begidx, endidx): + s = line.split() + l = len(s) - if line == "": - print "\nUsage: [method name] [param dict as string]\n" - print "Example: rpc test_add {'x': 12, 'y': 17}\n" - return + file_flags = parsing_opts.get_flags(parsing_opts.FILE_PATH) - sp = line.split(' ', 1) - method = sp[0] + if (l > 1) and (s[l - 1] in file_flags): + return TRexConsole.tree_autocomplete("") - params = None - bad_parse = False - if len(sp) > 1: + if (l > 2) and (s[l - 2] in file_flags): + return TRexConsole.tree_autocomplete(s[l - 1]) - try: - params = ast.literal_eval(sp[1]) - if not isinstance(params, dict): - bad_parse = True + def do_start(self, line): + '''Start selected traffic in specified ports on TRex\n''' - except ValueError as e1: - bad_parse = True - except SyntaxError as e2: - bad_parse = True + self.stateless_client.cmd_start_line(line) - if bad_parse: - print "\nValue should be a valid dict: '{0}'".format(sp[1]) - print "\nUsage: [method name] [param dict as string]\n" - print "Example: rpc test_add {'x': 12, 'y': 17}\n" - return - res_ok, msg = self.stateless_client.transmit(method, params) - if res_ok: - print "\nServer Response:\n\n" + pretty_json(json.dumps(msg)) + "\n" - else: - print "\n*** " + msg + "\n" - #print "Please try 'reconnect' to reconnect to server" + def help_start(self): + self.do_start("-h") + ############# stop + def do_stop(self, line): + self.stateless_client.cmd_stop_line(line) - def complete_rpc (self, text, line, begidx, endidx): - return [x - for x in self.supported_rpc - if x.startswith(text)] + - def do_status (self, line): + def help_stop(self): + self.do_stop("-h") + + ########## reset + def do_reset (self, line): + '''force stop all ports\n''' + self.stateless_client.cmd_reset() + + + # tui + def do_tui (self, line): '''Shows a graphical console\n''' if not self.stateless_client.is_connected(): - print "Not connected to server\n" + print format_text("\nNot connected to server\n", 'bold') return self.do_verbose('off') trex_status.show_trex_status(self.stateless_client) + # quit function def do_quit(self, line): '''Exit the client\n''' return True - def do_disconnect (self, line): - '''Disconnect from the server\n''' - if not self.stateless_client.is_connected(): - print "Not connected to server\n" - return - - res_ok, msg = self.stateless_client.disconnect() - if res_ok: - print format_text("[SUCCESS]\n", 'green', 'bold') - else: - print msg + "\n" - - def do_whoami (self, line): - '''Prints console user name\n''' - print "\n" + self.stateless_client.user + "\n" - - def postcmd(self, stop, line): - if self.stateless_client.is_connected(): - self.prompt = "TRex > " - else: - self.supported_rpc = None - self.prompt = "TRex (offline) > " - - return stop - - def default(self, line): - print "'{0}' is an unrecognized command. type 'help' or '?' for a list\n".format(line) - - # def do_help (self, line): - # '''Shows This Help Screen\n''' - # if line: - # try: - # func = getattr(self, 'help_' + line) - # except AttributeError: - # try: - # doc = getattr(self, 'do_' + line).__doc__ - # if doc: - # self.stdout.write("%s\n"%str(doc)) - # return - # except AttributeError: - # pass - # self.stdout.write("%s\n"%str(self.nohelp % (line,))) - # return - # func() - # return - # - # print "\nSupported Console Commands:" - # print "----------------------------\n" - # - # cmds = [x[3:] for x in self.get_names() if x.startswith("do_")] - # for cmd in cmds: - # if cmd == "EOF": - # continue - # - # try: - # doc = getattr(self, 'do_' + cmd).__doc__ - # if doc: - # help = str(doc) - # else: - # help = "*** Undocumented Function ***\n" - # except AttributeError: - # help = "*** Undocumented Function ***\n" - # - # print "{:<30} {:<30}".format(cmd + " - ", help) - - def do_stream_db_add(self, line): - '''Loads a YAML stream list serialization into user console \n''' - args = line.split() - if len(args) >= 2: - name = args[0] - yaml_path = args[1] - try: - multiplier = args[2] - except IndexError: - multiplier = 1 - stream_list = CStreamList() - loaded_obj = stream_list.load_yaml(yaml_path, multiplier) - # print self.stateless_client.pretty_json(json.dumps(loaded_obj)) - try: - compiled_streams = stream_list.compile_streams() - res_ok = self.streams_db.load_streams(name, LoadedStreamList(loaded_obj, - [StreamPack(v.stream_id, v.stream.dump()) - for k, v in compiled_streams.items()])) - if res_ok: - print green("Stream pack '{0}' loaded and added successfully\n".format(name)) - else: - print magenta("Picked name already exist. Please pick another name.\n") - except Exception as e: - print "adding new stream failed due to the following error:\n", str(e) - print format_text("[FAILED]\n", 'red', 'bold') - - return - else: - print magenta("please provide load name and YAML path, separated by space.\n" - "Optionally, you may provide a third argument to specify multiplier.\n") + + def do_help (self, line): + '''Shows This Help Screen\n''' + if line: + try: + func = getattr(self, 'help_' + line) + except AttributeError: + try: + doc = getattr(self, 'do_' + line).__doc__ + if doc: + self.stdout.write("%s\n"%str(doc)) + return + except AttributeError: + pass + self.stdout.write("%s\n"%str(self.nohelp % (line,))) + return + func() + return + + print "\nSupported Console Commands:" + print "----------------------------\n" + + cmds = [x[3:] for x in self.get_names() if x.startswith("do_")] + for cmd in cmds: + if ( (cmd == "EOF") or (cmd == "q") or (cmd == "exit")): + continue + + try: + doc = getattr(self, 'do_' + cmd).__doc__ + if doc: + help = str(doc) + else: + help = "*** Undocumented Function ***\n" + except AttributeError: + help = "*** Undocumented Function ***\n" + + print "{:<30} {:<30}".format(cmd + " - ", help) - @staticmethod - def tree_autocomplete(text): - dir = os.path.dirname(text) - if dir: - path = dir - else: - path = "." - start_string = os.path.basename(text) - return [x - for x in os.listdir(path) - if x.startswith(start_string)] - - - def complete_stream_db_add(self, text, line, begidx, endidx): - arg_num = len(line.split()) - 1 - if arg_num == 2: - return TRexConsole.tree_autocomplete(line.split()[-1]) - else: - return [text] - - def do_stream_db_show(self, line): - '''Shows the loaded stream list named [name] \n''' - args = line.split() - if args: - list_name = args[0] - try: - stream = self.streams_db.get_stream_pack(list_name)#user_streams[list_name] - if len(args) >= 2 and args[1] == "full": - print pretty_json(json.dumps(stream.compiled)) - else: - print pretty_json(json.dumps(stream.loaded)) - except KeyError as e: - print "Unknown stream list name provided" - else: - print "Available stream packs:\n{0}".format(', '.join(sorted(self.streams_db.get_loaded_streams_names()))) - - def complete_stream_db_show(self, text, line, begidx, endidx): - return [x - for x in self.streams_db.get_loaded_streams_names() - if x.startswith(text)] - - def do_stream_db_remove(self, line): - '''Removes a single loaded stream packs from loaded stream pack repository\n''' - args = line.split() - if args: - removed_streams = self.streams_db.remove_stream_packs(*args) - if removed_streams: - print green("The following stream packs were removed:") - print bold(", ".join(sorted(removed_streams))) - print format_text("[SUCCESS]\n", 'green', 'bold') - else: - print red("No streams were removed. Make sure to provide valid stream pack names.") - else: - print magenta("Please provide stream pack name(s), separated with spaces.") - - def do_stream_db_clear(self, line): - '''Clears all loaded stream packs from loaded stream pack repository\n''' - self.streams_db.clear() - print format_text("[SUCCESS]\n", 'green', 'bold') - - - def complete_stream_db_remove(self, text, line, begidx, endidx): - return [x - for x in self.streams_db.get_loaded_streams_names() - if x.startswith(text)] - - - def do_attach(self, line): - '''Assign loaded stream pack into specified ports on TRex\n''' - args = line.split() - if len(args) >= 2: - stream_pack_name = args[0] - stream_list = self.streams_db.get_stream_pack(stream_pack_name) #user_streams[args[0]] - if not stream_list: - print "Provided stream list name '{0}' doesn't exists.".format(stream_pack_name) - print format_text("[FAILED]\n", 'red', 'bold') - return - if args[1] == "all": - ask = ConfirmMenu('Are you sure you want to release all acquired ports? ') - rc = ask.show() - if rc == False: - print yellow("[ABORTED]\n") - return - else: - port_list = self.stateless_client.get_acquired_ports() - else: - port_list = self.extract_port_ids_from_line(' '.join(args[1:])) - owned = set(self.stateless_client.get_acquired_ports()) - try: - if set(port_list).issubset(owned): - res_ok, log = self.stateless_client.add_stream_pack(port_list, *stream_list.compiled) - # res_ok, msg = self.stateless_client.add_stream(port_list, stream_list.compiled) - self.prompt_response(log) - if not res_ok: - print format_text("[FAILED]\n", 'red', 'bold') - return - print format_text("[SUCCESS]\n", 'green', 'bold') - return - else: - print "Not all desired ports are acquired.\n" \ - "Acquired ports are: {acq}\n" \ - "Requested ports: {req}\n" \ - "Missing ports: {miss}".format(acq=list(owned), - req=port_list, - miss=list(set(port_list).difference(owned))) - print format_text("[FAILED]\n", 'red', 'bold') - except ValueError as e: - print magenta(str(e)) - print format_text("[FAILED]\n", 'red', 'bold') - else: - print magenta("Please provide list name and ports to attach to, " - "or specify 'all' to attach all owned ports.\n") - - def complete_attach(self, text, line, begidx, endidx): - arg_num = len(line.split()) - 1 - if arg_num == 1: - # return optional streams packs - if line.endswith(" "): - return self.port_auto_complete(text, line, begidx, endidx) - return [x - for x in self.streams_db.get_loaded_streams_names() - if x.startswith(text)] - elif arg_num >= 2: - # return optional ports to attach to - return self.port_auto_complete(text, line, begidx, endidx) - else: - return [text] - - def prompt_response(self, response_obj): - resp_list = response_obj if isinstance(response_obj, list) else [response_obj] - def format_return_status(return_status): - if return_status: - return green("OK") - else: - return red("FAIL") - - for response in resp_list: - response_str = "{id:^3} - {msg} ({stat})".format(id=response.id, - msg=response.msg, - stat=format_return_status(response.success)) - print response_str - return - - def do_remove_all_streams(self, line): - '''Acquire ports\n''' - - # make sure that the user wants to acquire all - args = line.split() - if len(args) < 1: - print magenta("Please provide a list of ports separated by spaces, " - "or specify 'all' to remove from all acquired ports") - return - if args[0] == "all": - ask = ConfirmMenu('Are you sure you want to remove all stream packs from all acquired ports? ') - rc = ask.show() - if rc == False: - print yellow("[ABORTED]\n") - return - else: - port_list = self.stateless_client.get_acquired_ports() - else: - port_list = self.extract_port_ids_from_line(line) - - # rc, resp_list = self.stateless_client.take_ownership(port_list, force) - try: - res_ok, log = self.stateless_client.remove_all_streams(port_list) - self.prompt_response(log) - if not res_ok: - print format_text("[FAILED]\n", 'red', 'bold') - return - print format_text("[SUCCESS]\n", 'green', 'bold') - except ValueError as e: - print magenta(str(e)) - print format_text("[FAILED]\n", 'red', 'bold') - - def complete_remove_all_streams(self, text, line, begidx, endidx): - return self.port_auto_complete(text, line, begidx, endidx) - - def do_start_traffic(self, line): - '''Start pre-submitted traffic in specified ports on TRex\n''' - # make sure that the user wants to acquire all - # parser = parsing_opts.gen_parser("start_traffic", self.do_start_traffic.__doc__, - # parsing_opts.PORT_LIST_WITH_ALL, parsing_opts.MULTIPLIER) - # opts = parser.parse_args(line.split()) - # - # print opts - # return - # # return - # # if not opts.port_list: - # # print magenta("Please provide a list of ports separated by spaces, " - # # "or specify 'all' to start traffic on all acquired ports") - # # return - # - # if "all" in opts.port_list: - # ask = ConfirmMenu('Are you sure you want to start traffic at all acquired ports? ') - # rc = ask.show() - # if rc == False: - # print yellow("[ABORTED]\n") - # return - # else: - # port_list = self.stateless_client.get_acquired_ports() - # else: - # try: - # port_list = self.extract_port_ids_from_list(opts.port_list) - # except ValueError as e: - # print magenta(e) - # return - - args = line.split() - if len(args) < 1: - print magenta("Please provide a list of ports separated by spaces, " - "or specify 'all' to start traffic on all acquired ports") - return - if args[0] == "all": - ask = ConfirmMenu('Are you sure you want to start traffic at all acquired ports? ') - rc = ask.show() - if rc == False: - print yellow("[ABORTED]\n") - return - else: - port_list = self.stateless_client.get_acquired_ports() - else: - port_list = self.extract_port_ids_from_line(line) - - try: - res_ok, log = self.stateless_client.start_traffic(1.0, port_id=port_list) - self.prompt_response(log) - if not res_ok: - print format_text("[FAILED]\n", 'red', 'bold') - return - print format_text("[SUCCESS]\n", 'green', 'bold') - except ValueError as e: - print magenta(str(e)) - print format_text("[FAILED]\n", 'red', 'bold') - - def complete_start_traffic(self, text, line, begidx, endidx): - return self.port_auto_complete(text, line, begidx, endidx) + do_exit = do_EOF = do_q = do_quit - def help_start_traffic(self): - self.do_start_traffic("-h") - def do_stop_traffic(self, line): - '''Stop active traffic in specified ports on TRex\n''' - # make sure that the user wants to acquire all - args = line.split() - if len(args) < 1: - print magenta("Please provide a list of ports separated by spaces, " - "or specify 'all' to stop traffic on all acquired ports") - return - if args[0] == "all": - ask = ConfirmMenu('Are you sure you want to start traffic at all acquired ports? ') - rc = ask.show() - if rc == False: - print yellow("[ABORTED]\n") - return - else: - port_list = self.stateless_client.get_active_ports() - else: - port_list = self.extract_port_ids_from_line(line) - - try: - res_ok, log = self.stateless_client.stop_traffic(port_list) - self.prompt_response(log) - if not res_ok: - print format_text("[FAILED]\n", 'red', 'bold') - return - print format_text("[SUCCESS]\n", 'green', 'bold') - except ValueError as e: - print magenta(str(e)) - print format_text("[FAILED]\n", 'red', 'bold') +# +def is_valid_file(filename): + if not os.path.isfile(filename): + raise argparse.ArgumentTypeError("The file '%s' does not exist" % filename) - def complete_stop_traffic(self, text, line, begidx, endidx): - return self.port_auto_complete(text, line, begidx, endidx, active=True) + return filename - # aliasing - do_exit = do_EOF = do_q = do_quit def setParserOptions(): parser = argparse.ArgumentParser(prog="trex_console.py") @@ -805,12 +347,13 @@ def setParserOptions(): default = "localhost", type = str) - parser.add_argument("-p", "--port", help = "TRex Server Port [default is 5050]\n", - default = 5050, + parser.add_argument("-p", "--port", help = "TRex Server Port [default is 4501]\n", + default = 4501, type = int) - parser.add_argument("-z", "--pub", help = "TRex Async Publisher Port [default is 4500]\n", + parser.add_argument("--async_port", help = "TRex ASync Publisher Port [default is 4500]\n", default = 4500, + dest='pub', type = int) parser.add_argument("-u", "--user", help = "User Name [default is currently logged in user]\n", @@ -821,18 +364,38 @@ def setParserOptions(): action="store_true", help="Switch ON verbose option. Default is: OFF.", default = False) + + parser.add_argument("--no_acquire", dest="acquire", + action="store_false", help="Acquire all ports on connect. Default is: ON.", + default = True) + + parser.add_argument("--batch", dest="batch", + nargs = 1, + type = is_valid_file, + help = "Run the console in a batch mode with file", + default = None) + return parser + def main(): parser = setParserOptions() options = parser.parse_args() # Stateless client connection stateless_client = CTRexStatelessClient(options.user, options.server, options.port, options.pub) + rc = stateless_client.cmd_connect() + if rc.bad(): + return + if options.batch: + cont = stateless_client.run_script_file(options.batch[0]) + if not cont: + return + # console try: - console = TRexConsole(stateless_client, options.verbose) + console = TRexConsole(stateless_client, options.acquire, options.verbose) console.cmdloop() except KeyboardInterrupt as e: print "\n\n*** Caught Ctrl + C... Exiting...\n\n" |