diff options
Diffstat (limited to 'scripts/automation/trex_control_plane/stl/console')
5 files changed, 2161 insertions, 0 deletions
diff --git a/scripts/automation/trex_control_plane/stl/console/__init__.py b/scripts/automation/trex_control_plane/stl/console/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/scripts/automation/trex_control_plane/stl/console/__init__.py diff --git a/scripts/automation/trex_control_plane/stl/console/stl_path.py b/scripts/automation/trex_control_plane/stl/console/stl_path.py new file mode 100644 index 00000000..f15c666e --- /dev/null +++ b/scripts/automation/trex_control_plane/stl/console/stl_path.py @@ -0,0 +1,7 @@ +import sys, os + +# FIXME to the write path for trex_stl_lib +sys.path.insert(0, "../") + +STL_PROFILES_PATH = os.path.join(os.pardir, 'profiles') + diff --git a/scripts/automation/trex_control_plane/stl/console/trex_console.py b/scripts/automation/trex_control_plane/stl/console/trex_console.py new file mode 100755 index 00000000..b23b5f1f --- /dev/null +++ b/scripts/automation/trex_control_plane/stl/console/trex_console.py @@ -0,0 +1,889 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +Dan Klein, Itay Marom +Cisco Systems, Inc. + +Copyright (c) 2015-2015 Cisco Systems, Inc. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +from __future__ import print_function + +import subprocess +import cmd +import json +import ast +import argparse +import random +import readline +import string +import os +import sys +import tty, termios + +try: + import stl_path +except: + from . import stl_path +from trex_stl_lib.api import * + +from trex_stl_lib.utils.text_opts import * +from trex_stl_lib.utils.common import user_input, get_current_user +from trex_stl_lib.utils import parsing_opts + +try: + import trex_tui +except: + from . import trex_tui + +from functools import wraps + +__version__ = "2.0" + +# console custom logger +class ConsoleLogger(LoggerApi): + def __init__ (self): + self.prompt_redraw = None + + def write (self, msg, newline = True): + if newline: + print(msg) + else: + print(msg, end=' ') + + def flush (self): + sys.stdout.flush() + + # override this for the prompt fix + def async_log (self, msg, level = LoggerApi.VERBOSE_REGULAR, newline = True): + self.log(msg, level, newline) + if ( (self.level >= LoggerApi.VERBOSE_REGULAR) and self.prompt_redraw ): + self.prompt_redraw() + self.flush() + + +def set_window_always_on_top (title): + # we need the GDK module, if not available - ignroe this command + try: + if sys.version_info < (3,0): + from gtk import gdk + else: + #from gi.repository import Gdk as gdk + return + + except ImportError: + return + + # search the window and set it as above + root = gdk.get_default_root_window() + + for id in root.property_get('_NET_CLIENT_LIST')[2]: + w = gdk.window_foreign_new(id) + if w: + name = w.property_get('WM_NAME')[2] + if name == title: + w.set_keep_above(True) + gdk.window_process_all_updates() + break + + +class TRexGeneralCmd(cmd.Cmd): + def __init__(self): + cmd.Cmd.__init__(self) + # configure history behaviour + self._history_file_dir = "/tmp/trex/console/" + self._history_file = self.get_history_file_full_path() + readline.set_history_length(100) + # load history, if any + self.load_console_history() + + + def get_console_identifier(self): + return self.__class__.__name__ + + def get_history_file_full_path(self): + return "{dir}{filename}.hist".format(dir=self._history_file_dir, + filename=self.get_console_identifier()) + + def load_console_history(self): + if os.path.exists(self._history_file): + readline.read_history_file(self._history_file) + return + + def save_console_history(self): + if not os.path.exists(self._history_file_dir): + # make the directory available for every user + try: + original_umask = os.umask(0) + os.makedirs(self._history_file_dir, mode = 0o777) + finally: + os.umask(original_umask) + + + # os.mknod(self._history_file) + readline.write_history_file(self._history_file) + return + + def print_history (self): + + length = readline.get_current_history_length() + + for i in range(1, length + 1): + cmd = readline.get_history_item(i) + print("{:<5} {:}".format(i, cmd)) + + def get_history_item (self, index): + length = readline.get_current_history_length() + if index > length: + print(format_text("please select an index between {0} and {1}".format(0, length))) + return None + + return readline.get_history_item(index) + + + def emptyline(self): + """Called when an empty line is entered in response to the prompt. + + This overriding is such that when empty line is passed, **nothing happens**. + """ + return + + def completenames(self, text, *ignored): + """ + This overriding is such that a space is added to name completion. + """ + dotext = 'do_'+text + return [a[3:]+' ' for a in self.get_names() if a.startswith(dotext)] + + +# +# main console object +class TRexConsole(TRexGeneralCmd): + """Trex Console""" + + def __init__(self, stateless_client, verbose = False): + + self.stateless_client = stateless_client + + TRexGeneralCmd.__init__(self) + + self.tui = trex_tui.TrexTUI(stateless_client) + self.terminal = None + + self.verbose = verbose + + self.intro = "\n-=TRex Console v{ver}=-\n".format(ver=__version__) + self.intro += "\nType 'help' or '?' for supported actions\n" + + self.postcmd(False, "") + + + ################### internal section ######################## + + def prompt_redraw (self): + self.postcmd(False, "") + sys.stdout.write("\n" + self.prompt + readline.get_line_buffer()) + sys.stdout.flush() + + + def verify_connected(f): + @wraps(f) + def wrap(*args): + inst = args[0] + func_name = f.__name__ + if func_name.startswith("do_"): + func_name = func_name[3:] + + if not inst.stateless_client.is_connected(): + print(format_text("\n'{0}' cannot be executed on offline mode\n".format(func_name), 'bold')) + return + + ret = f(*args) + return ret + + return wrap + + + def get_console_identifier(self): + return "{context}_{server}".format(context=get_current_user(), + server=self.stateless_client.get_connection_info()['server']) + + def register_main_console_methods(self): + main_names = set(self.trex_console.get_names()).difference(set(dir(self.__class__))) + for name in main_names: + for prefix in 'do_', 'help_', 'complete_': + if name.startswith(prefix): + self.__dict__[name] = getattr(self.trex_console, name) + + def precmd(self, line): + # before doing anything, save history snapshot of the console + # this is done before executing the command in case of ungraceful application exit + self.save_console_history() + + lines = line.split(';') + try: + for line in lines: + stop = self.onecmd(line) + stop = self.postcmd(stop, line) + if stop: + return "quit" + + return "" + except STLError as e: + print(e) + return '' + + + def postcmd(self, stop, line): + self.prompt = self.stateless_client.generate_prompt(prefix = 'trex') + return stop + + + def default(self, line): + print("'{0}' is an unrecognized command. type 'help' or '?' for a list\n".format(line)) + + @staticmethod + def tree_autocomplete(text): + dir = os.path.dirname(text) + if dir: + path = dir + else: + path = "." + + + start_string = os.path.basename(text) + + targets = [] + + for x in os.listdir(path): + if x.startswith(start_string): + y = os.path.join(path, x) + if os.path.isfile(y): + targets.append(x + ' ') + elif os.path.isdir(y): + targets.append(x + '/') + + return targets + + + ####################### shell commands ####################### + @verify_connected + def do_ping (self, line): + '''Ping the server\n''' + self.stateless_client.ping_line(line) + + + @verify_connected + def do_shutdown (self, line): + '''Sends the server a shutdown request\n''' + self.stateless_client.shutdown_line(line) + + # set verbose on / off + def do_verbose(self, line): + '''Shows or set verbose mode\n''' + if line == "": + print("\nverbose is " + ("on\n" if self.verbose else "off\n")) + + elif line == "on": + self.verbose = True + self.stateless_client.set_verbose("high") + print(format_text("\nverbose set to on\n", 'green', 'bold')) + + elif line == "off": + self.verbose = False + self.stateless_client.set_verbose("normal") + print(format_text("\nverbose set to off\n", 'green', 'bold')) + + else: + print(format_text("\nplease specify 'on' or 'off'\n", 'bold')) + + # show history + def help_history (self): + self.do_history("-h") + + def do_shell (self, line): + self.do_history(line) + + def do_push (self, line): + '''Push a local PCAP file\n''' + self.stateless_client.push_line(line) + + def help_push (self): + self.do_push("-h") + + def do_portattr (self, line): + '''Change/show port(s) attributes\n''' + self.stateless_client.set_port_attr_line(line) + + def help_portattr (self): + self.do_portattr("-h") + + @verify_connected + def do_map (self, line): + '''Maps ports topology\n''' + ports = self.stateless_client.get_acquired_ports() + if not ports: + print("No ports acquired\n") + return + + with self.stateless_client.logger.supress(): + table = stl_map_ports(self.stateless_client, ports = ports) + + + print(format_text('\nAcquired ports topology:\n', 'bold', 'underline')) + + # bi-dir ports + print(format_text('Bi-directional ports:\n','underline')) + for port_a, port_b in table['bi']: + print("port {0} <--> port {1}".format(port_a, port_b)) + + print("") + + # unknown ports + print(format_text('Mapping unknown:\n','underline')) + for port in table['unknown']: + print("port {0}".format(port)) + print("") + + + + + def do_history (self, line): + '''Manage the command history\n''' + + item = parsing_opts.ArgumentPack(['item'], + {"nargs": '?', + 'metavar': 'item', + 'type': parsing_opts.check_negative, + 'help': "an history item index", + 'default': 0}) + + parser = parsing_opts.gen_parser(self.stateless_client, + "history", + self.do_history.__doc__, + item) + + opts = parser.parse_args(line.split()) + if opts is None: + return + + if opts.item == 0: + self.print_history() + else: + cmd = self.get_history_item(opts.item) + if cmd == None: + return + + print("Executing '{0}'".format(cmd)) + + return self.onecmd(cmd) + + + + ############### connect + def do_connect (self, line): + '''Connects to the server and acquire ports\n''' + + self.stateless_client.connect_line(line) + + def help_connect (self): + self.do_connect("-h") + + def do_disconnect (self, line): + '''Disconnect from the server\n''' + + self.stateless_client.disconnect_line(line) + + + @verify_connected + def do_acquire (self, line): + '''Acquire ports\n''' + + self.stateless_client.acquire_line(line) + + + @verify_connected + def do_release (self, line): + '''Release ports\n''' + self.stateless_client.release_line(line) + + def do_reacquire (self, line): + '''reacquire all the ports under your logged user name''' + self.stateless_client.reacquire_line(line) + + def help_acquire (self): + self.do_acquire("-h") + + def help_release (self): + self.do_release("-h") + + def help_reacquire (self): + self.do_reacquire("-h") + + ############### start + + def complete_start(self, text, line, begidx, endidx): + s = line.split() + l = len(s) + + file_flags = parsing_opts.get_flags(parsing_opts.FILE_PATH) + + if (l > 1) and (s[l - 1] in file_flags): + return TRexConsole.tree_autocomplete("") + + if (l > 2) and (s[l - 2] in file_flags): + return TRexConsole.tree_autocomplete(s[l - 1]) + + complete_push = complete_start + + @verify_connected + def do_start(self, line): + '''Start selected traffic in specified port(s) on TRex\n''' + + self.stateless_client.start_line(line) + + + + def help_start(self): + self.do_start("-h") + + ############# stop + @verify_connected + def do_stop(self, line): + '''stops port(s) transmitting traffic\n''' + + self.stateless_client.stop_line(line) + + def help_stop(self): + self.do_stop("-h") + + ############# update + @verify_connected + def do_update(self, line): + '''update speed of port(s)currently transmitting traffic\n''' + + self.stateless_client.update_line(line) + + def help_update (self): + self.do_update("-h") + + ############# pause + @verify_connected + def do_pause(self, line): + '''pause port(s) transmitting traffic\n''' + + self.stateless_client.pause_line(line) + + ############# resume + @verify_connected + def do_resume(self, line): + '''resume port(s) transmitting traffic\n''' + + self.stateless_client.resume_line(line) + + + + ########## reset + @verify_connected + def do_reset (self, line): + '''force stop all ports\n''' + self.stateless_client.reset_line(line) + + + ######### validate + @verify_connected + def do_validate (self, line): + '''validates port(s) stream configuration\n''' + + self.stateless_client.validate_line(line) + + + @verify_connected + def do_stats(self, line): + '''Fetch statistics from TRex server by port\n''' + self.stateless_client.show_stats_line(line) + + + def help_stats(self): + self.do_stats("-h") + + @verify_connected + def do_streams(self, line): + '''Fetch statistics from TRex server by port\n''' + self.stateless_client.show_streams_line(line) + + + def help_streams(self): + self.do_streams("-h") + + @verify_connected + def do_clear(self, line): + '''Clear cached local statistics\n''' + self.stateless_client.clear_stats_line(line) + + + def help_clear(self): + self.do_clear("-h") + + + def help_events (self): + self.do_events("-h") + + def do_events (self, line): + '''shows events recieved from server\n''' + self.stateless_client.get_events_line(line) + + + def complete_profile(self, text, line, begidx, endidx): + return self.complete_start(text,line, begidx, endidx) + + def do_profile (self, line): + '''shows information about a profile''' + self.stateless_client.show_profile_line(line) + + # tui + @verify_connected + def do_tui (self, line): + '''Shows a graphical console\n''' + parser = parsing_opts.gen_parser(self.stateless_client, + "tui", + self.do_tui.__doc__, + parsing_opts.XTERM, + parsing_opts.LOCKED) + + opts = parser.parse_args(line.split()) + + if not opts: + return opts + if opts.xterm: + if not os.path.exists('/usr/bin/xterm'): + print(format_text("XTERM does not exists on this machine", 'bold')) + return + + info = self.stateless_client.get_connection_info() + + exe = './trex-console --top -t -q -s {0} -p {1} --async_port {2}'.format(info['server'], info['sync_port'], info['async_port']) + cmd = ['/usr/bin/xterm', '-geometry', '{0}x{1}'.format(self.tui.MIN_COLS, self.tui.MIN_ROWS), '-sl', '0', '-title', 'trex_tui', '-e', exe] + + # detach child + self.terminal = subprocess.Popen(cmd, preexec_fn = os.setpgrp) + + return + + + try: + with self.stateless_client.logger.supress(): + self.tui.show(self.stateless_client, self.save_console_history, locked = opts.locked) + + except self.tui.ScreenSizeException as e: + print(format_text(str(e) + "\n", 'bold')) + + + def help_tui (self): + do_tui("-h") + + + # quit function + def do_quit(self, line): + '''Exit the client\n''' + return True + + + def do_help (self, line): + '''Shows This Help Screen\n''' + if line: + try: + func = getattr(self, 'help_' + line) + except AttributeError: + try: + doc = getattr(self, 'do_' + line).__doc__ + if doc: + self.stdout.write("%s\n"%str(doc)) + return + except AttributeError: + pass + self.stdout.write("%s\n"%str(self.nohelp % (line,))) + return + func() + return + + print("\nSupported Console Commands:") + print("----------------------------\n") + + cmds = [x[3:] for x in self.get_names() if x.startswith("do_")] + hidden = ['EOF', 'q', 'exit', 'h', 'shell'] + for cmd in cmds: + if cmd in hidden: + continue + + try: + doc = getattr(self, 'do_' + cmd).__doc__ + if doc: + help = str(doc) + else: + help = "*** Undocumented Function ***\n" + except AttributeError: + help = "*** Undocumented Function ***\n" + + l=help.splitlines() + print("{:<30} {:<30}".format(cmd + " - ",l[0] )) + + # a custorm cmdloop wrapper + def start(self): + while True: + try: + self.cmdloop() + break + except KeyboardInterrupt as e: + if not readline.get_line_buffer(): + raise KeyboardInterrupt + else: + print("") + self.intro = None + continue + + if self.terminal: + self.terminal.kill() + + # aliases + do_exit = do_EOF = do_q = do_quit + do_h = do_history + + +# run a script of commands +def run_script_file (self, filename, stateless_client): + + self.logger.log(format_text("\nRunning script file '{0}'...".format(filename), 'bold')) + + with open(filename) as f: + script_lines = f.readlines() + + cmd_table = {} + + # register all the commands + cmd_table['start'] = stateless_client.start_line + cmd_table['stop'] = stateless_client.stop_line + cmd_table['reset'] = stateless_client.reset_line + + for index, line in enumerate(script_lines, start = 1): + line = line.strip() + if line == "": + continue + if line.startswith("#"): + continue + + sp = line.split(' ', 1) + cmd = sp[0] + if len(sp) == 2: + args = sp[1] + else: + args = "" + + stateless_client.logger.log(format_text("Executing line {0} : '{1}'\n".format(index, line))) + + if not cmd in cmd_table: + print("\n*** Error at line {0} : '{1}'\n".format(index, line)) + stateless_client.logger.log(format_text("unknown command '{0}'\n".format(cmd), 'bold')) + return False + + cmd_table[cmd](args) + + stateless_client.logger.log(format_text("\n[Done]", 'bold')) + + return True + + +# +def is_valid_file(filename): + if not os.path.isfile(filename): + raise argparse.ArgumentTypeError("The file '%s' does not exist" % filename) + + return filename + + + +def setParserOptions(): + parser = argparse.ArgumentParser(prog="trex_console.py") + + parser.add_argument("-s", "--server", help = "TRex Server [default is localhost]", + default = "localhost", + type = str) + + parser.add_argument("-p", "--port", help = "TRex Server Port [default is 4501]\n", + default = 4501, + type = int) + + parser.add_argument("--async_port", help = "TRex ASync Publisher Port [default is 4500]\n", + default = 4500, + dest='pub', + type = int) + + parser.add_argument("-u", "--user", help = "User Name [default is currently logged in user]\n", + default = get_current_user(), + type = str) + + parser.add_argument("-v", "--verbose", dest="verbose", + action="store_true", help="Switch ON verbose option. Default is: OFF.", + default = False) + + + group = parser.add_mutually_exclusive_group() + + group.add_argument("-a", "--acquire", dest="acquire", + nargs = '+', + type = int, + help="Acquire ports on connect. default is all available ports", + default = None) + + group.add_argument("-r", "--readonly", dest="readonly", + action="store_true", + help="Starts console in a read only mode", + default = False) + + + parser.add_argument("-f", "--force", dest="force", + action="store_true", + help="Force acquire the requested ports", + default = False) + + parser.add_argument("--batch", dest="batch", + nargs = 1, + type = is_valid_file, + help = "Run the console in a batch mode with file", + default = None) + + parser.add_argument("-t", "--tui", dest="tui", + action="store_true", help="Starts with TUI mode", + default = False) + + parser.add_argument("-x", "--xtui", dest="xtui", + action="store_true", help="Starts with XTERM TUI mode", + default = False) + + parser.add_argument("--top", dest="top", + action="store_true", help="Set the window as always on top", + default = False) + + parser.add_argument("-q", "--quiet", dest="quiet", + action="store_true", help="Starts with all outputs suppressed", + default = False) + + return parser + +# a simple info printed on log on +def show_intro (logger, c): + x = c.get_server_system_info() + ver = c.get_server_version().get('version', 'N/A') + + # find out which NICs the server has + port_types = {} + for port in x['ports']: + if 'supp_speeds' in port: + speed = max(port['supp_speeds']) // 1000 + else: + speed = port['speed'] + key = (speed, port.get('description', port['driver'])) + if key not in port_types: + port_types[key] = 0 + port_types[key] += 1 + + port_line = '' + for k, v in port_types.items(): + port_line += "{0} x {1}Gbps @ {2}\t".format(v, k[0], k[1]) + + logger.log(format_text("\nServer Info:\n", 'underline')) + logger.log("Server version: {:>}".format(format_text(ver, 'bold'))) + logger.log("Server CPU: {:>}".format(format_text("{:>} x {:>}".format(x.get('dp_core_count'), x.get('core_type')), 'bold'))) + logger.log("Ports count: {:>}".format(format_text(port_line, 'bold'))) + + +def main(): + parser = setParserOptions() + options = parser.parse_args() + + if options.xtui: + options.tui = True + + # always on top + if options.top: + set_window_always_on_top('trex_tui') + + + # Stateless client connection + if options.quiet: + verbose_level = LoggerApi.VERBOSE_QUIET + elif options.verbose: + verbose_level = LoggerApi.VERBOSE_HIGH + else: + verbose_level = LoggerApi.VERBOSE_REGULAR + + # Stateless client connection + logger = ConsoleLogger() + stateless_client = STLClient(username = options.user, + server = options.server, + sync_port = options.port, + async_port = options.pub, + verbose_level = verbose_level, + logger = logger) + + # TUI or no acquire will give us READ ONLY mode + try: + stateless_client.connect() + except STLError as e: + logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold')) + return + + if not options.tui and not options.readonly: + try: + # acquire all ports + stateless_client.acquire(options.acquire, force = options.force) + except STLError as e: + logger.log("Log:\n" + format_text(e.brief() + "\n", 'bold')) + + logger.log("\n*** Failed to acquire all required ports ***\n") + return + + if options.readonly: + logger.log(format_text("\nRead only mode - only few commands will be available", 'bold')) + + show_intro(logger, stateless_client) + + + # a script mode + if options.batch: + cont = run_script_file(options.batch[0], stateless_client) + if not cont: + return + + # console + try: + console = TRexConsole(stateless_client, options.verbose) + logger.prompt_redraw = console.prompt_redraw + + # TUI + if options.tui: + console.do_tui("-x" if options.xtui else "-l") + + else: + console.start() + + except KeyboardInterrupt as e: + print("\n\n*** Caught Ctrl + C... Exiting...\n\n") + + finally: + with stateless_client.logger.supress(): + stateless_client.disconnect(stop_traffic = False) + +if __name__ == '__main__': + + main() + diff --git a/scripts/automation/trex_control_plane/stl/console/trex_root_path.py b/scripts/automation/trex_control_plane/stl/console/trex_root_path.py new file mode 100755 index 00000000..de4ec03b --- /dev/null +++ b/scripts/automation/trex_control_plane/stl/console/trex_root_path.py @@ -0,0 +1,15 @@ +#!/router/bin/python + +import os +import sys + +def add_root_to_path (): + """adds trex_control_plane root dir to script path, up to `depth` parent dirs""" + root_dirname = 'trex_control_plane' + file_path = os.path.dirname(os.path.realpath(__file__)) + + components = file_path.split(os.sep) + sys.path.append( str.join(os.sep, components[:components.index(root_dirname)+1]) ) + return + +add_root_to_path() diff --git a/scripts/automation/trex_control_plane/stl/console/trex_tui.py b/scripts/automation/trex_control_plane/stl/console/trex_tui.py new file mode 100644 index 00000000..d7db6d30 --- /dev/null +++ b/scripts/automation/trex_control_plane/stl/console/trex_tui.py @@ -0,0 +1,1250 @@ +from __future__ import print_function + +import termios +import sys +import os +import time +import threading + +from collections import OrderedDict, deque +from texttable import ansi_len + + +import datetime +import readline + + +if sys.version_info > (3,0): + from io import StringIO +else: + from cStringIO import StringIO + +from trex_stl_lib.utils.text_opts import * +from trex_stl_lib.utils import text_tables +from trex_stl_lib import trex_stl_stats +from trex_stl_lib.utils.filters import ToggleFilter + +class TUIQuit(Exception): + pass + + +# for STL exceptions +from trex_stl_lib.api import * + +def ascii_split (s): + output = [] + + lines = s.split('\n') + for elem in lines: + if ansi_len(elem) > 0: + output.append(elem) + + return output + +class SimpleBar(object): + def __init__ (self, desc, pattern): + self.desc = desc + self.pattern = pattern + self.pattern_len = len(pattern) + self.index = 0 + + def show (self, buffer): + if self.desc: + print(format_text("{0} {1}".format(self.desc, self.pattern[self.index]), 'bold'), file = buffer) + else: + print(format_text("{0}".format(self.pattern[self.index]), 'bold'), file = buffer) + + self.index = (self.index + 1) % self.pattern_len + + +# base type of a panel +class TrexTUIPanel(object): + def __init__ (self, mng, name): + + self.mng = mng + self.name = name + self.stateless_client = mng.stateless_client + self.is_graph = False + + def show (self, buffer): + raise NotImplementedError("must implement this") + + def get_key_actions (self): + raise NotImplementedError("must implement this") + + + def get_name (self): + return self.name + + +# dashboard panel +class TrexTUIDashBoard(TrexTUIPanel): + + FILTER_ACQUIRED = 1 + FILTER_ALL = 2 + + def __init__ (self, mng): + super(TrexTUIDashBoard, self).__init__(mng, "dashboard") + + self.ports = self.stateless_client.get_all_ports() + + self.key_actions = OrderedDict() + + self.key_actions['c'] = {'action': self.action_clear, 'legend': 'clear', 'show': True} + self.key_actions['p'] = {'action': self.action_pause, 'legend': 'pause', 'show': True, 'color': 'red'} + self.key_actions['r'] = {'action': self.action_resume, 'legend': 'resume', 'show': True, 'color': 'blue'} + + self.key_actions['o'] = {'action': self.action_show_owned, 'legend': 'owned ports', 'show': True} + self.key_actions['n'] = {'action': self.action_reset_view, 'legend': 'reset view', 'show': True} + self.key_actions['a'] = {'action': self.action_show_all, 'legend': 'all ports', 'show': True} + + # register all the ports to the toggle action + for port_id in self.ports: + self.key_actions[str(port_id)] = {'action': self.action_toggle_port(port_id), 'legend': 'port {0}'.format(port_id), 'show': False} + + + self.toggle_filter = ToggleFilter(self.ports) + + if self.stateless_client.get_acquired_ports(): + self.action_show_owned() + else: + self.action_show_all() + + + def get_showed_ports (self): + return self.toggle_filter.filter_items() + + + def show (self, buffer): + stats = self.stateless_client._get_formatted_stats(self.get_showed_ports()) + # print stats to screen + for stat_type, stat_data in stats.items(): + text_tables.print_table_with_header(stat_data.text_table, stat_type, buffer = buffer) + + + def get_key_actions (self): + allowed = OrderedDict() + + + allowed['n'] = self.key_actions['n'] + allowed['o'] = self.key_actions['o'] + allowed['a'] = self.key_actions['a'] + for i in self.ports: + allowed[str(i)] = self.key_actions[str(i)] + + + if self.get_showed_ports(): + allowed['c'] = self.key_actions['c'] + + # if not all ports are acquired - no operations + if not (set(self.get_showed_ports()) <= set(self.stateless_client.get_acquired_ports())): + return allowed + + # if any/some ports can be resumed + if set(self.get_showed_ports()) & set(self.stateless_client.get_paused_ports()): + allowed['r'] = self.key_actions['r'] + + # if any/some ports are transmitting - support those actions + if set(self.get_showed_ports()) & set(self.stateless_client.get_transmitting_ports()): + allowed['p'] = self.key_actions['p'] + + + return allowed + + + ######### actions + def action_pause (self): + try: + rc = self.stateless_client.pause(ports = self.get_showed_ports()) + except STLError: + pass + + return "" + + + + def action_resume (self): + try: + self.stateless_client.resume(ports = self.get_showed_ports()) + except STLError: + pass + + return "" + + + def action_reset_view (self): + self.toggle_filter.reset() + return "" + + def action_show_owned (self): + self.toggle_filter.reset() + self.toggle_filter.toggle_items(*self.stateless_client.get_acquired_ports()) + return "" + + def action_show_all (self): + self.toggle_filter.reset() + self.toggle_filter.toggle_items(*self.stateless_client.get_all_ports()) + return "" + + def action_clear (self): + self.stateless_client.clear_stats(self.toggle_filter.filter_items()) + return "cleared all stats" + + + def action_toggle_port(self, port_id): + def action_toggle_port_x(): + self.toggle_filter.toggle_item(port_id) + return "" + + return action_toggle_port_x + + + +# streams stats +class TrexTUIStreamsStats(TrexTUIPanel): + def __init__ (self, mng): + super(TrexTUIStreamsStats, self).__init__(mng, "sstats") + + self.key_actions = OrderedDict() + + self.key_actions['c'] = {'action': self.action_clear, 'legend': 'clear', 'show': True} + + + def show (self, buffer): + stats = self.stateless_client._get_formatted_stats(port_id_list = None, stats_mask = trex_stl_stats.SS_COMPAT) + # print stats to screen + for stat_type, stat_data in stats.items(): + text_tables.print_table_with_header(stat_data.text_table, stat_type, buffer = buffer) + pass + + + def get_key_actions (self): + return self.key_actions + + def action_clear (self): + self.stateless_client.flow_stats.clear_stats() + + return "" + + +# latency stats +class TrexTUILatencyStats(TrexTUIPanel): + def __init__ (self, mng): + super(TrexTUILatencyStats, self).__init__(mng, "lstats") + self.key_actions = OrderedDict() + self.key_actions['c'] = {'action': self.action_clear, 'legend': 'clear', 'show': True} + self.key_actions['h'] = {'action': self.action_toggle_histogram, 'legend': 'histogram toggle', 'show': True} + self.is_histogram = False + + + def show (self, buffer): + if self.is_histogram: + stats = self.stateless_client._get_formatted_stats(port_id_list = None, stats_mask = trex_stl_stats.LH_COMPAT) + else: + stats = self.stateless_client._get_formatted_stats(port_id_list = None, stats_mask = trex_stl_stats.LS_COMPAT) + # print stats to screen + for stat_type, stat_data in stats.items(): + if stat_type == 'latency_statistics': + untouched_header = ' (usec)' + else: + untouched_header = '' + text_tables.print_table_with_header(stat_data.text_table, stat_type, untouched_header = untouched_header, buffer = buffer) + + def get_key_actions (self): + return self.key_actions + + def action_toggle_histogram (self): + self.is_histogram = not self.is_histogram + return "" + + def action_clear (self): + self.stateless_client.latency_stats.clear_stats() + return "" + + +# utilization stats +class TrexTUIUtilizationStats(TrexTUIPanel): + def __init__ (self, mng): + super(TrexTUIUtilizationStats, self).__init__(mng, "ustats") + self.key_actions = {} + + def show (self, buffer): + stats = self.stateless_client._get_formatted_stats(port_id_list = None, stats_mask = trex_stl_stats.UT_COMPAT) + # print stats to screen + for stat_type, stat_data in stats.items(): + text_tables.print_table_with_header(stat_data.text_table, stat_type, buffer = buffer) + + def get_key_actions (self): + return self.key_actions + + +# log +class TrexTUILog(): + def __init__ (self): + self.log = [] + + def add_event (self, msg): + self.log.append("[{0}] {1}".format(str(datetime.datetime.now().time()), msg)) + + def show (self, buffer, max_lines = 4): + + cut = len(self.log) - max_lines + if cut < 0: + cut = 0 + + print(format_text("\nLog:", 'bold', 'underline'), file = buffer) + + for msg in self.log[cut:]: + print(msg, file = buffer) + + +# a predicate to wrap function as a bool +class Predicate(object): + def __init__ (self, func): + self.func = func + + def __nonzero__ (self): + return True if self.func() else False + def __bool__ (self): + return True if self.func() else False + + +# Panels manager (contains server panels) +class TrexTUIPanelManager(): + def __init__ (self, tui): + self.tui = tui + self.stateless_client = tui.stateless_client + self.ports = self.stateless_client.get_all_ports() + self.locked = False + + self.panels = {} + self.panels['dashboard'] = TrexTUIDashBoard(self) + self.panels['sstats'] = TrexTUIStreamsStats(self) + self.panels['lstats'] = TrexTUILatencyStats(self) + self.panels['ustats'] = TrexTUIUtilizationStats(self) + + self.key_actions = OrderedDict() + + # we allow console only when ports are acquired + self.key_actions['ESC'] = {'action': self.action_none, 'legend': 'console', 'show': Predicate(lambda : not self.locked)} + + self.key_actions['q'] = {'action': self.action_none, 'legend': 'quit', 'show': True} + self.key_actions['d'] = {'action': self.action_show_dash, 'legend': 'dashboard', 'show': True} + self.key_actions['s'] = {'action': self.action_show_sstats, 'legend': 'streams', 'show': True} + self.key_actions['l'] = {'action': self.action_show_lstats, 'legend': 'latency', 'show': True} + self.key_actions['u'] = {'action': self.action_show_ustats, 'legend': 'util', 'show': True} + + + # start with dashboard + self.main_panel = self.panels['dashboard'] + + # log object + self.log = TrexTUILog() + + self.generate_legend() + + self.conn_bar = SimpleBar('status: ', ['|','/','-','\\']) + self.dis_bar = SimpleBar('status: ', ['X', ' ']) + self.show_log = False + + + def generate_legend (self): + + self.legend = "\n{:<12}".format("browse:") + + for k, v in self.key_actions.items(): + if v['show']: + x = "'{0}' - {1}, ".format(k, v['legend']) + if v.get('color'): + self.legend += "{:}".format(format_text(x, v.get('color'))) + else: + self.legend += "{:}".format(x) + + + self.legend += "\n{:<12}".format(self.main_panel.get_name() + ":") + + for k, v in self.main_panel.get_key_actions().items(): + if v['show']: + x = "'{0}' - {1}, ".format(k, v['legend']) + + if v.get('color'): + self.legend += "{:}".format(format_text(x, v.get('color'))) + else: + self.legend += "{:}".format(x) + + + def print_connection_status (self, buffer): + if self.tui.get_state() == self.tui.STATE_ACTIVE: + self.conn_bar.show(buffer = buffer) + else: + self.dis_bar.show(buffer = buffer) + + def print_legend (self, buffer): + print(format_text(self.legend, 'bold'), file = buffer) + + + # on window switch or turn on / off of the TUI we call this + def init (self, show_log = False, locked = False): + self.show_log = show_log + self.locked = locked + self.generate_legend() + + def show (self, show_legend, buffer): + self.main_panel.show(buffer) + self.print_connection_status(buffer) + + if show_legend: + self.generate_legend() + self.print_legend(buffer) + + if self.show_log: + self.log.show(buffer) + + + def handle_key (self, ch): + # check for the manager registered actions + if ch in self.key_actions: + msg = self.key_actions[ch]['action']() + + # check for main panel actions + elif ch in self.main_panel.get_key_actions(): + msg = self.main_panel.get_key_actions()[ch]['action']() + + else: + return False + + self.generate_legend() + return True + + #if msg == None: + # return False + #else: + # if msg: + # self.log.add_event(msg) + # return True + + + # actions + + def action_none (self): + return None + + def action_show_dash (self): + self.main_panel = self.panels['dashboard'] + self.init(self.show_log) + return "" + + def action_show_port (self, port_id): + def action_show_port_x (): + self.main_panel = self.panels['port {0}'.format(port_id)] + self.init() + return "" + + return action_show_port_x + + + def action_show_sstats (self): + self.main_panel = self.panels['sstats'] + self.init(self.show_log) + return "" + + + def action_show_lstats (self): + self.main_panel = self.panels['lstats'] + self.init(self.show_log) + return "" + + def action_show_ustats(self): + self.main_panel = self.panels['ustats'] + self.init(self.show_log) + return "" + + + +# ScreenBuffer is a class designed to +# avoid inline delays when reprinting the screen +class ScreenBuffer(): + def __init__ (self, redraw_cb): + self.snapshot = '' + self.lock = threading.Lock() + + self.redraw_cb = redraw_cb + self.update_flag = False + + + def start (self): + self.active = True + self.t = threading.Thread(target = self.__handler) + self.t.setDaemon(True) + self.t.start() + + def stop (self): + self.active = False + self.t.join() + + + # request an update + def update (self): + self.update_flag = True + + # fetch the screen, return None if no new screen exists yet + def get (self): + + if not self.snapshot: + return None + + # we have a snapshot - fetch it + with self.lock: + x = self.snapshot + self.snapshot = None + return x + + + def __handler (self): + + while self.active: + if self.update_flag: + self.__redraw() + + time.sleep(0.01) + + # redraw the next screen + def __redraw (self): + buffer = StringIO() + + self.redraw_cb(buffer) + + with self.lock: + self.snapshot = buffer + self.update_flag = False + +# a policer class to make sure no too-fast redraws +# occurs - it filters fast bursts of redraws +class RedrawPolicer(): + def __init__ (self, rate): + self.ts = 0 + self.marked = False + self.rate = rate + self.force = False + + def mark_for_redraw (self, force = False): + self.marked = True + if force: + self.force = True + + def should_redraw (self): + dt = time.time() - self.ts + return self.force or (self.marked and (dt > self.rate)) + + def reset (self, restart = False): + self.ts = time.time() + self.marked = restart + self.force = False + + +# shows a textual top style window +class TrexTUI(): + + STATE_ACTIVE = 0 + STATE_LOST_CONT = 1 + STATE_RECONNECT = 2 + is_graph = False + + MIN_ROWS = 50 + MIN_COLS = 111 + + + class ScreenSizeException(Exception): + def __init__ (self, cols, rows): + msg = "TUI requires console screen size of at least {0}x{1}, current is {2}x{3}".format(TrexTUI.MIN_COLS, + TrexTUI.MIN_ROWS, + cols, + rows) + super(TrexTUI.ScreenSizeException, self).__init__(msg) + + + def __init__ (self, stateless_client): + self.stateless_client = stateless_client + + self.tui_global_lock = threading.Lock() + self.pm = TrexTUIPanelManager(self) + self.sb = ScreenBuffer(self.redraw_handler) + + def redraw_handler (self, buffer): + # this is executed by the screen buffer - should be protected against TUI commands + with self.tui_global_lock: + self.pm.show(show_legend = self.async_keys.is_legend_mode(), buffer = buffer) + + def clear_screen (self, lines = 50): + # reposition the cursor + sys.stdout.write("\x1b[0;0H") + + # clear all lines + for i in range(lines): + sys.stdout.write("\x1b[0K") + if i < (lines - 1): + sys.stdout.write("\n") + + # reposition the cursor + sys.stdout.write("\x1b[0;0H") + + + + def show (self, client, save_console_history, show_log = False, locked = False): + + rows, cols = os.popen('stty size', 'r').read().split() + if (int(rows) < TrexTUI.MIN_ROWS) or (int(cols) < TrexTUI.MIN_COLS): + raise self.ScreenSizeException(rows = rows, cols = cols) + + with AsyncKeys(client, save_console_history, self.tui_global_lock, locked) as async_keys: + sys.stdout.write("\x1bc") + self.async_keys = async_keys + self.show_internal(show_log, locked) + + + + def show_internal (self, show_log, locked): + + self.pm.init(show_log, locked) + + self.state = self.STATE_ACTIVE + + # create print policers + self.full_redraw = RedrawPolicer(0.5) + self.keys_redraw = RedrawPolicer(0.05) + self.full_redraw.mark_for_redraw() + + + try: + self.sb.start() + + while True: + # draw and handle user input + status = self.async_keys.tick(self.pm) + + # prepare the next frame + self.prepare(status) + time.sleep(0.01) + self.draw_screen() + + with self.tui_global_lock: + self.handle_state_machine() + + except TUIQuit: + print("\nExiting TUI...") + + finally: + self.sb.stop() + + print("") + + + + # handle state machine + def handle_state_machine (self): + # regular state + if self.state == self.STATE_ACTIVE: + # if no connectivity - move to lost connecitivty + if not self.stateless_client.async_client.is_alive(): + self.stateless_client._invalidate_stats(self.pm.ports) + self.state = self.STATE_LOST_CONT + + + # lost connectivity + elif self.state == self.STATE_LOST_CONT: + # got it back + if self.stateless_client.async_client.is_alive(): + # move to state reconnect + self.state = self.STATE_RECONNECT + + + # restored connectivity - try to reconnect + elif self.state == self.STATE_RECONNECT: + + try: + self.stateless_client.connect() + self.stateless_client.acquire() + self.state = self.STATE_ACTIVE + except STLError: + self.state = self.STATE_LOST_CONT + + + # logic before printing + def prepare (self, status): + if status == AsyncKeys.STATUS_REDRAW_ALL: + self.full_redraw.mark_for_redraw(force = True) + + elif status == AsyncKeys.STATUS_REDRAW_KEYS: + self.keys_redraw.mark_for_redraw() + + if self.full_redraw.should_redraw(): + self.sb.update() + self.full_redraw.reset(restart = True) + + return + + + # draw once + def draw_screen (self): + + # check for screen buffer's new screen + x = self.sb.get() + + # we have a new screen to draw + if x: + self.clear_screen() + + self.async_keys.draw(x) + sys.stdout.write(x.getvalue()) + sys.stdout.flush() + + # maybe we need to redraw the keys + elif self.keys_redraw.should_redraw(): + sys.stdout.write("\x1b[4A") + self.async_keys.draw(sys.stdout) + sys.stdout.flush() + + # reset the policer for next time + self.keys_redraw.reset() + + + + + def get_state (self): + return self.state + + +class TokenParser(object): + def __init__ (self, seq): + self.buffer = list(seq) + + def pop (self): + return self.buffer.pop(0) + + + def peek (self): + if not self.buffer: + return None + return self.buffer[0] + + def next_token (self): + if not self.peek(): + return None + + token = self.pop() + + # special chars + if token == '\x1b' and self.peek() == '[': + token += self.pop() + if self.peek(): + token += self.pop() + + return token + + def parse (self): + tokens = [] + + while True: + token = self.next_token() + if token == None: + break + tokens.append(token) + + return tokens + + +# handles async IO +class AsyncKeys: + + MODE_LEGEND = 1 + MODE_CONSOLE = 2 + + STATUS_NONE = 0 + STATUS_REDRAW_KEYS = 1 + STATUS_REDRAW_ALL = 2 + + def __init__ (self, client, save_console_history, tui_global_lock, locked = False): + self.tui_global_lock = tui_global_lock + + self.engine_console = AsyncKeysEngineConsole(self, client, save_console_history) + self.engine_legend = AsyncKeysEngineLegend(self) + self.locked = locked + + if locked: + self.engine = self.engine_legend + self.locked = True + else: + self.engine = self.engine_console + self.locked = False + + def __enter__ (self): + # init termios + self.old_settings = termios.tcgetattr(sys.stdin) + new_settings = termios.tcgetattr(sys.stdin) + new_settings[3] = new_settings[3] & ~(termios.ECHO | termios.ICANON) # lflags + new_settings[6][termios.VMIN] = 0 # cc + new_settings[6][termios.VTIME] = 0 # cc + termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new_settings) + + # huge buffer - no print without flush + sys.stdout = open('/dev/stdout', 'w', TrexTUI.MIN_COLS * TrexTUI.MIN_COLS * 2) + return self + + def __exit__ (self, type, value, traceback): + termios.tcsetattr(sys.stdin, termios.TCSADRAIN, self.old_settings) + + # restore sys.stdout + sys.stdout.close() + sys.stdout = sys.__stdout__ + + + def is_legend_mode (self): + return self.engine.get_type() == AsyncKeys.MODE_LEGEND + + def is_console_mode (self): + return self.engine.get_type == AsyncKeys.MODE_CONSOLE + + def switch (self): + if self.is_legend_mode(): + self.engine = self.engine_console + else: + self.engine = self.engine_legend + + + # parse the buffer to manageble tokens + def parse_tokens (self, seq): + + tokens = [] + chars = list(seq) + + while chars: + token = chars.pop(0) + + # special chars + if token == '\x1b' and chars[0] == '[': + token += chars.pop(0) + token += chars.pop(0) + + tokens.append(token) + + return tokens + + def handle_token (self, token, pm): + # ESC for switch + if token == '\x1b': + if not self.locked: + self.switch() + return self.STATUS_REDRAW_ALL + + # EOF (ctrl + D) + if token == '\x04': + raise TUIQuit() + + # pass tick to engine + return self.engine.tick(token, pm) + + + def tick (self, pm): + rc = self.STATUS_NONE + + # fetch the stdin buffer + seq = os.read(sys.stdin.fileno(), 1024).decode() + if not seq: + return self.STATUS_NONE + + # parse all the tokens from the buffer + tokens = TokenParser(seq).parse() + + # process them + for token in tokens: + token_rc = self.handle_token(token, pm) + rc = max(rc, token_rc) + + + return rc + + + def draw (self, buffer): + self.engine.draw(buffer) + + + +# Legend engine +class AsyncKeysEngineLegend: + def __init__ (self, async): + self.async = async + + def get_type (self): + return self.async.MODE_LEGEND + + def tick (self, seq, pm): + + if seq == 'q': + raise TUIQuit() + + # ignore escapes + if len(seq) > 1: + return AsyncKeys.STATUS_NONE + + rc = pm.handle_key(seq) + return AsyncKeys.STATUS_REDRAW_ALL if rc else AsyncKeys.STATUS_NONE + + def draw (self, buffer): + pass + + +# console engine +class AsyncKeysEngineConsole: + def __init__ (self, async, client, save_console_history): + self.async = async + self.lines = deque(maxlen = 100) + + self.generate_prompt = client.generate_prompt + self.save_console_history = save_console_history + + self.ac = {'start' : client.start_line, + 'stop' : client.stop_line, + 'pause' : client.pause_line, + 'clear' : client.clear_stats_line, + 'push' : client.push_line, + 'resume' : client.resume_line, + 'reset' : client.reset_line, + 'update' : client.update_line, + 'connect' : client.connect_line, + 'disconnect' : client.disconnect_line, + 'acquire' : client.acquire_line, + 'release' : client.release_line, + 'quit' : self.action_quit, + 'q' : self.action_quit, + 'exit' : self.action_quit, + 'help' : self.action_help, + '?' : self.action_help} + + # fetch readline history and add relevants + for i in range(0, readline.get_current_history_length()): + cmd = readline.get_history_item(i) + if cmd and cmd.split()[0] in self.ac: + self.lines.appendleft(CmdLine(cmd)) + + # new line + self.lines.appendleft(CmdLine('')) + self.line_index = 0 + self.last_status = '' + + def action_quit (self, _): + raise TUIQuit() + + def action_help (self, _): + return ' '.join([format_text(cmd, 'bold') for cmd in self.ac.keys()]) + + def get_type (self): + return self.async.MODE_CONSOLE + + + def handle_escape_char (self, seq): + # up + if seq == '\x1b[A': + self.line_index = min(self.line_index + 1, len(self.lines) - 1) + + # down + elif seq == '\x1b[B': + self.line_index = max(self.line_index - 1, 0) + + # left + elif seq == '\x1b[D': + self.lines[self.line_index].go_left() + + # right + elif seq == '\x1b[C': + self.lines[self.line_index].go_right() + + # del + elif seq == '\x1b[3~': + self.lines[self.line_index].del_key() + + # home + elif seq == '\x1b[H': + self.lines[self.line_index].home_key() + + # end + elif seq == '\x1b[F': + self.lines[self.line_index].end_key() + return True + + # unknown key + else: + return AsyncKeys.STATUS_NONE + + return AsyncKeys.STATUS_REDRAW_KEYS + + + def tick (self, seq, _): + + # handle escape chars + if len(seq) > 1: + return self.handle_escape_char(seq) + + # handle each char + for ch in seq: + return self.handle_single_key(ch) + + + + def handle_single_key (self, ch): + # newline + if ch == '\n': + self.handle_cmd() + + # backspace + elif ch == '\x7f': + self.lines[self.line_index].backspace() + + # TAB + elif ch == '\t': + tokens = self.lines[self.line_index].get().split() + if not tokens: + return + + if len(tokens) == 1: + self.handle_tab_names(tokens[0]) + else: + self.handle_tab_files(tokens) + + + # simple char + else: + self.lines[self.line_index] += ch + + return AsyncKeys.STATUS_REDRAW_KEYS + + + # handle TAB key for completing function names + def handle_tab_names (self, cur): + matching_cmds = [x for x in self.ac if x.startswith(cur)] + + common = os.path.commonprefix([x for x in self.ac if x.startswith(cur)]) + if common: + if len(matching_cmds) == 1: + self.lines[self.line_index].set(common + ' ') + self.last_status = '' + else: + self.lines[self.line_index].set(common) + self.last_status = 'ambigious: '+ ' '.join([format_text(cmd, 'bold') for cmd in matching_cmds]) + + + # handle TAB for completing filenames + def handle_tab_files (self, tokens): + + # only commands with files + if tokens[0] not in {'start', 'push'}: + return + + # '-f' with no paramters - no partial and use current dir + if tokens[-1] == '-f': + partial = '' + d = '.' + + # got a partial path + elif tokens[-2] == '-f': + partial = tokens.pop() + + # check for dirs + dirname, basename = os.path.dirname(partial), os.path.basename(partial) + if os.path.isdir(dirname): + d = dirname + partial = basename + else: + d = '.' + else: + return + + # fetch all dirs and files matching wildcard + files = [] + for x in os.listdir(d): + if os.path.isdir(os.path.join(d, x)): + files.append(x + '/') + elif x.endswith( ('.py', 'yaml', 'pcap', 'cap', 'erf') ): + files.append(x) + + # dir might not have the files + if not files: + self.last_status = format_text('no loadble files under path', 'bold') + return + + + # find all the matching files + matching_files = [x for x in files if x.startswith(partial)] if partial else files + + # do we have a longer common than partial ? + common = os.path.commonprefix([x for x in files if x.startswith(partial)]) + if not common: + common = partial + + tokens.append(os.path.join(d, common) if d is not '.' else common) + + # reforge the line + newline = ' '.join(tokens) + + if len(matching_files) == 1: + if os.path.isfile(tokens[-1]): + newline += ' ' + + self.lines[self.line_index].set(newline) + self.last_status = '' + else: + self.lines[self.line_index].set(newline) + self.last_status = ' '.join([format_text(f, 'bold') for f in matching_files[:5]]) + if len(matching_files) > 5: + self.last_status += ' ... [{0} more matches]'.format(len(matching_files) - 5) + + + + def split_cmd (self, cmd): + s = cmd.split(' ', 1) + op = s[0] + param = s[1] if len(s) == 2 else '' + return op, param + + + def handle_cmd (self): + + cmd = self.lines[self.line_index].get().strip() + if not cmd: + return + + op, param = self.split_cmd(cmd) + + func = self.ac.get(op) + if func: + with self.async.tui_global_lock: + func_rc = func(param) + + # take out the empty line + empty_line = self.lines.popleft() + assert(empty_line.ro_line == '') + + if not self.lines or self.lines[0].ro_line != cmd: + self.lines.appendleft(CmdLine(cmd)) + + # back in + self.lines.appendleft(empty_line) + self.line_index = 0 + readline.add_history(cmd) + self.save_console_history() + + # back to readonly + for line in self.lines: + line.invalidate() + + assert(self.lines[0].modified == False) + color = None + if not func: + self.last_status = "unknown command: '{0}'".format(format_text(cmd.split()[0], 'bold')) + else: + # internal commands + if isinstance(func_rc, str): + self.last_status = func_rc + + # RC response + else: + # success + if func_rc: + self.last_status = format_text("[OK]", 'green') + # errors + else: + err_msgs = ascii_split(str(func_rc)) + self.last_status = format_text(err_msgs[0], 'red') + if len(err_msgs) > 1: + self.last_status += " [{0} more errors messages]".format(len(err_msgs) - 1) + color = 'red' + + + + # trim too long lines + if ansi_len(self.last_status) > TrexTUI.MIN_COLS: + self.last_status = format_text(self.last_status[:TrexTUI.MIN_COLS] + "...", color, 'bold') + + + def draw (self, buffer): + buffer.write("\nPress 'ESC' for navigation panel...\n") + buffer.write("status: \x1b[0K{0}\n".format(self.last_status)) + buffer.write("\n{0}\x1b[0K".format(self.generate_prompt(prefix = 'tui'))) + self.lines[self.line_index].draw(buffer) + + +# a readline alike command line - can be modified during edit +class CmdLine(object): + def __init__ (self, line): + self.ro_line = line + self.w_line = None + self.modified = False + self.cursor_index = len(line) + + def get (self): + if self.modified: + return self.w_line + else: + return self.ro_line + + def set (self, line, cursor_pos = None): + self.w_line = line + self.modified = True + + if cursor_pos is None: + self.cursor_index = len(self.w_line) + else: + self.cursor_index = cursor_pos + + + def __add__ (self, other): + assert(0) + + + def __str__ (self): + return self.get() + + + def __iadd__ (self, other): + + self.set(self.get()[:self.cursor_index] + other + self.get()[self.cursor_index:], + cursor_pos = self.cursor_index + len(other)) + + return self + + + def backspace (self): + if self.cursor_index == 0: + return + + self.set(self.get()[:self.cursor_index - 1] + self.get()[self.cursor_index:], + self.cursor_index - 1) + + + def del_key (self): + if self.cursor_index == len(self.get()): + return + + self.set(self.get()[:self.cursor_index] + self.get()[self.cursor_index + 1:], + self.cursor_index) + + def home_key (self): + self.cursor_index = 0 + + def end_key (self): + self.cursor_index = len(self.get()) + + def invalidate (self): + self.modified = False + self.w_line = None + self.cursor_index = len(self.ro_line) + + def go_left (self): + self.cursor_index = max(0, self.cursor_index - 1) + + def go_right (self): + self.cursor_index = min(len(self.get()), self.cursor_index + 1) + + def draw (self, buffer): + buffer.write(self.get()) + buffer.write('\b' * (len(self.get()) - self.cursor_index)) + |