diff options
Diffstat (limited to 'scripts/automation/trex_control_plane/stl/console')
-rw-r--r-- | scripts/automation/trex_control_plane/stl/console/trex_capture.py | 593 | ||||
-rwxr-xr-x | scripts/automation/trex_control_plane/stl/console/trex_console.py | 60 |
2 files changed, 635 insertions, 18 deletions
diff --git a/scripts/automation/trex_control_plane/stl/console/trex_capture.py b/scripts/automation/trex_control_plane/stl/console/trex_capture.py new file mode 100644 index 00000000..2132458e --- /dev/null +++ b/scripts/automation/trex_control_plane/stl/console/trex_capture.py @@ -0,0 +1,593 @@ +from trex_stl_lib.api import * +from trex_stl_lib.utils import parsing_opts, text_tables +import threading +import tempfile +import select + +# defines a generic monitor writer +class CaptureMonitorWriter(object): + + def deinit(self): + # by default - nothing to deinit + pass + + def handle_pkts (self, pkts): + raise NotImplementedError + + def periodic_check (self): + # by default - nothing to check + pass + + +# a stdout monitor +class CaptureMonitorWriterStdout(CaptureMonitorWriter): + def __init__ (self, logger, is_brief, start_ts): + self.logger = logger + self.is_brief = is_brief + self.start_ts = start_ts + + # unicode arrows + self.RX_ARROW = u'\u25c0\u2500\u2500' + self.TX_ARROW = u'\u25b6\u2500\u2500' + + # decode issues with Python 2 + if sys.version_info < (3,0): + self.RX_ARROW = self.RX_ARROW.encode('utf-8') + self.TX_ARROW = self.TX_ARROW.encode('utf-8') + + + self.logger.pre_cmd("Starting stdout capture monitor - verbose: '{0}'".format('low' if self.is_brief else 'high')) + self.logger.post_cmd(RC_OK) + + self.logger.log(format_text("\n*** use 'capture monitor stop' to abort capturing... ***\n", 'bold')) + + + + def get_scapy_name (self, pkt_scapy): + layer = pkt_scapy + while layer.payload and layer.payload.name not in('Padding', 'Raw'): + layer = layer.payload + + return layer.name + + + def format_origin (self, origin): + if origin == 'RX': + return '{0} {1}'.format(self.RX_ARROW, 'RX') + elif origin == 'TX': + return '{0} {1}'.format(self.TX_ARROW, 'TX') + else: + return '{0}'.format(origin) + + + def __handle_pkt (self, pkt): + pkt_bin = base64.b64decode(pkt['binary']) + + pkt_scapy = Ether(pkt_bin) + self.logger.log(format_text('\n\n#{} Port: {} {}\n'.format(pkt['index'], pkt['port'], self.format_origin(pkt['origin'])), 'bold', '')) + self.logger.log(format_text(' Type: {}, Size: {} B, TS: {:.2f} [sec]\n'.format(self.get_scapy_name(pkt_scapy), len(pkt_bin), pkt['ts'] - self.start_ts), 'bold')) + + if self.is_brief: + self.logger.log(' {0}'.format(pkt_scapy.command())) + else: + pkt_scapy.show(label_lvl = ' ') + self.logger.log('') + + return len(pkt_bin) + + + def handle_pkts (self, pkts): + try: + byte_count = 0 + for pkt in pkts: + byte_count += self.__handle_pkt(pkt) + + return byte_count + + finally: + # make sure to restore the logger + self.logger.prompt_redraw() + + +# a pipe based monitor +class CaptureMonitorWriterPipe(CaptureMonitorWriter): + def __init__ (self, logger, start_ts): + + self.logger = logger + self.fifo = None + self.start_ts = start_ts + + # generate a temp fifo pipe + self.fifo_name = tempfile.mktemp() + + try: + self.logger.pre_cmd('Starting pipe capture monitor') + os.mkfifo(self.fifo_name) + self.logger.post_cmd(RC_OK) + + self.logger.log(format_text("*** Please run 'wireshark -k -i {0}' ***".format(self.fifo_name), 'bold')) + + self.logger.pre_cmd("Waiting for Wireshark pipe connection") + + # blocks until pipe is connected + self.fifo = os.open(self.fifo_name, os.O_WRONLY) + self.logger.post_cmd(RC_OK()) + + self.logger.log(format_text('\n*** Capture monitoring started ***\n', 'bold')) + + # open for write using a PCAP writer + self.writer = RawPcapWriter(self.fifo_name, linktype = 1, sync = True) + self.writer._write_header(None) + + # register a poller + self.poll = select.poll() + self.poll.register(self.fifo, select.EPOLLERR) + + self.is_init = True + + except KeyboardInterrupt as e: + self.deinit() + self.logger.post_cmd(RC_ERR("")) + raise STLError("*** pipe monitor aborted...cleaning up") + + except OSError as e: + self.deinit() + self.logger.post_cmd(RC_ERR("")) + raise STLError("failed to create pipe {0}\n{1}".format(self.fifo_name, str(e))) + + + + def deinit (self): + try: + if self.fifo: + os.close(self.fifo) + self.fifo = None + + if self.fifo_name: + os.unlink(self.fifo_name) + self.fifo_name = None + + except OSError: + pass + + + + def periodic_check (self): + self.check_pipe() + + + def check_pipe (self): + if self.poll.poll(0): + raise STLError('pipe has been disconnected') + + + def handle_pkts (self, pkts): + # first check the pipe is alive + self.check_pipe() + + return self.handle_pkts_internal(pkts) + + + def handle_pkts_internal (self, pkts): + + byte_count = 0 + + for pkt in pkts: + pkt_bin = base64.b64decode(pkt['binary']) + ts_sec, ts_usec = sec_split_usec(pkt['ts'] - self.start_ts) + + try: + self.writer._write_packet(pkt_bin, sec = ts_sec, usec = ts_usec) + except Exception as e: + raise STLError('fail to write packets to pipe: {}'.format(str(e))) + + byte_count += len(pkt_bin) + + return byte_count + + +# capture monitor - a live capture +class CaptureMonitor(object): + def __init__ (self, client, cmd_lock, tx_port_list, rx_port_list, rate_pps, mon_type): + self.client = client + self.logger = client.logger + self.cmd_lock = cmd_lock + + self.t = None + self.writer = None + self.capture_id = None + + self.tx_port_list = tx_port_list + self.rx_port_list = rx_port_list + self.rate_pps = rate_pps + self.mon_type = mon_type + + # try to launch + try: + self.__start() + except Exception as e: + self.__stop() + raise + + + def __start (self): + + # create a capture on the server + with self.logger.supress(): + data = self.client.start_capture(self.tx_port_list, self.rx_port_list, limit = self.rate_pps, mode = 'cyclic') + + self.capture_id = data['id'] + self.start_ts = data['ts'] + + + # create a writer + if self.mon_type == 'compact': + self.writer = CaptureMonitorWriterStdout(self.logger, True, self.start_ts) + elif self.mon_type == 'verbose': + self.writer = CaptureMonitorWriterStdout(self.logger, False, self.start_ts) + elif self.mon_type == 'pipe': + self.writer = CaptureMonitorWriterPipe(self.logger, self.start_ts) + else: + raise STLError('Internal error: unknown writer type') + + # start the fetching thread + self.t = threading.Thread(target = self.__thread_cb) + self.t.setDaemon(True) + self.active = True + self.t.start() + + + # internal stop + def __stop (self): + + # stop the thread + if self.t and self.t.is_alive(): + self.active = False + self.t.join() + self.t = None + + # deinit the writer + if self.writer: + self.writer.deinit() + self.writer = None + + # take the capture ID + capture_id = self.capture_id + self.capture_id = None + + # if we are disconnected - we cannot cleanup the capture + if not self.client.is_connected(): + return + + # make sure the capture is active on the server + captures = [x['id'] for x in self.client.get_capture_status()] + if capture_id not in captures: + return + + # remove the capture + with self.logger.supress(): + self.client.stop_capture(capture_id) + + + # user call for stop (adds log) + def stop (self): + self.logger.pre_cmd("Stopping capture monitor") + + try: + self.__stop() + except Exception as e: + self.logger.post_cmd(RC_ERR("")) + raise + + self.logger.post_cmd(RC_OK()) + + + def get_mon_row (self): + + return [self.capture_id, + format_text('ACTIVE' if self.t.is_alive() else 'DEAD', 'bold'), + self.pkt_count, + format_num(self.byte_count, suffix = 'B'), + ', '.join([str(x) for x in self.tx_port_list] if self.tx_port_list else '-'), + ', '.join([str(x) for x in self.rx_port_list] if self.rx_port_list else '-') + ] + + + def is_active (self): + return self.active + + + def get_capture_id (self): + return self.capture_id + + + # sleeps with high freq checks for active + def __sleep (self): + for _ in range(5): + if not self.active: + return False + + time.sleep(0.1) + + return True + + def __lock (self): + while True: + rc = self.cmd_lock.acquire(False) + if rc: + return True + + if not self.active: + return False + time.sleep(0.1) + + def __unlock (self): + self.cmd_lock.release() + + + def __thread_cb (self): + try: + self.__thread_main_loop() + + # common errors + except STLError as e: + self.logger.log(format_text("\n\nMonitor has encountered the following error: '{}'\n".format(e.brief()), 'bold')) + self.logger.log(format_text("\n*** monitor is inactive - please restart the monitor ***\n", 'bold')) + self.logger.prompt_redraw() + + # unexpected errors + except Exception as e: + self.logger.log("\n\n*** A fatal internal error has occurred: '{}'\n".format(str(e))) + self.logger.log(format_text("\n*** monitor is inactive - please restart the monitor ***\n", 'bold')) + self.logger.prompt_redraw() + + + def __thread_main_loop (self): + self.pkt_count = 0 + self.byte_count = 0 + + while self.active: + + # sleep - if interrupt by graceful shutdown - go out + if not self.__sleep(): + return + + # check that the writer is ok + self.writer.periodic_check() + + # try to lock - if interrupt by graceful shutdown - go out + if not self.__lock(): + return + + try: + if not self.client.is_connected(): + raise STLError('client has been disconnected') + + rc = self.client._transmit("capture", params = {'command': 'fetch', 'capture_id': self.capture_id, 'pkt_limit': 10}) + if not rc: + raise STLError(rc) + + finally: + self.__unlock() + + + # no packets - skip + pkts = rc.data()['pkts'] + if not pkts: + continue + + byte_count = self.writer.handle_pkts(pkts) + + self.pkt_count += len(pkts) + self.byte_count += byte_count + + + + +# main class +class CaptureManager(object): + def __init__ (self, client, cmd_lock): + self.c = client + self.cmd_lock = cmd_lock + self.logger = client.logger + self.monitor = None + + # install parsers + + self.parser = parsing_opts.gen_parser(self, "capture", self.parse_line_internal.__doc__) + self.subparsers = self.parser.add_subparsers(title = "commands", dest="commands") + + self.install_record_parser() + self.install_monitor_parser() + + # show + self.show_parser = self.subparsers.add_parser('show', help = "show all active captures") + + # reset + self.clear_parser = self.subparsers.add_parser('clear', help = "remove all active captures") + + # register handlers + self.cmds = {'record': self.parse_record, 'monitor' : self.parse_monitor, 'clear': self.parse_clear, 'show' : self.parse_show} + + + def install_record_parser (self): + # recording + self.record_parser = self.subparsers.add_parser('record', help = "PCAP recording") + record_sub = self.record_parser.add_subparsers(title = 'commands', dest = 'record_cmd') + self.record_start_parser = record_sub.add_parser('start', help = "starts a new buffered capture") + self.record_stop_parser = record_sub.add_parser('stop', help = "stops an active buffered capture") + + # start + self.record_start_parser.add_arg_list(parsing_opts.TX_PORT_LIST, + parsing_opts.RX_PORT_LIST, + parsing_opts.LIMIT) + + # stop + self.record_stop_parser.add_arg_list(parsing_opts.CAPTURE_ID, + parsing_opts.OUTPUT_FILENAME) + + + + def install_monitor_parser (self): + # monitor + self.monitor_parser = self.subparsers.add_parser('monitor', help = 'live monitoring') + monitor_sub = self.monitor_parser.add_subparsers(title = 'commands', dest = 'mon_cmd') + self.monitor_start_parser = monitor_sub.add_parser('start', help = 'starts a monitor') + self.monitor_stop_parser = monitor_sub.add_parser('stop', help = 'stops an active monitor') + + self.monitor_start_parser.add_arg_list(parsing_opts.TX_PORT_LIST, + parsing_opts.RX_PORT_LIST, + parsing_opts.MONITOR_TYPE) + + + + def stop (self): + if self.monitor: + self.monitor.stop() + self.monitor = None + + + # main entry point for parsing commands from console + def parse_line (self, line): + try: + self.parse_line_internal(line) + except STLError as e: + self.logger.log("\nAction has failed with the following error:\n\n" + format_text(e.brief() + "\n", 'bold')) + return RC_ERR(e.brief()) + + + def parse_line_internal (self, line): + '''Manage PCAP recorders''' + + # default + if not line: + line = "show" + + opts = self.parser.parse_args(line.split()) + if not opts: + return opts + + # call the handler + self.cmds[opts.commands](opts) + + + # record methods + def parse_record (self, opts): + if opts.record_cmd == 'start': + self.parse_record_start(opts) + elif opts.record_cmd == 'stop': + self.parse_record_stop(opts) + else: + self.record_parser.formatted_error("too few arguments") + + + def parse_record_start (self, opts): + if not opts.tx_port_list and not opts.rx_port_list: + self.record_start_parser.formatted_error('please provide either --tx or --rx') + return + + rc = self.c.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit, mode = 'fixed') + + self.logger.log(format_text("*** Capturing ID is set to '{0}' ***".format(rc['id']), 'bold')) + self.logger.log(format_text("*** Please call 'capture record stop --id {0} -o <out.pcap>' when done ***\n".format(rc['id']), 'bold')) + + + def parse_record_stop (self, opts): + captures = self.c.get_capture_status() + ids = [c['id'] for c in captures] + + if self.monitor and (opts.capture_id == self.monitor.get_capture_id()): + self.record_stop_parser.formatted_error("'{0}' is a monitor, please use 'capture monitor stop'".format(opts.capture_id)) + return + + if opts.capture_id not in ids: + self.record_stop_parser.formatted_error("'{0}' is not an active capture ID".format(opts.capture_id)) + return + + self.c.stop_capture(opts.capture_id, opts.output_filename) + + + # monitor methods + def parse_monitor (self, opts): + if opts.mon_cmd == 'start': + self.parse_monitor_start(opts) + elif opts.mon_cmd == 'stop': + self.parse_monitor_stop(opts) + else: + self.monitor_parser.formatted_error("too few arguments") + + + def parse_monitor_start (self, opts): + mon_type = 'compact' + + if opts.verbose: + mon_type = 'verbose' + elif opts.pipe: + mon_type = 'pipe' + + if not opts.tx_port_list and not opts.rx_port_list: + self.monitor_start_parser.formatted_error('please provide either --tx or --rx') + return + + if self.monitor: + self.monitor.stop() + self.monitor = None + + self.monitor = CaptureMonitor(self.c, self.cmd_lock, opts.tx_port_list, opts.rx_port_list, 100, mon_type) + + + def parse_monitor_stop (self, opts): + if self.monitor: + self.monitor.stop() + self.monitor = None + + + def parse_clear (self, opts): + if self.monitor: + self.monitor.stop() + self.monitor = None + + self.c.remove_all_captures() + + + + def parse_show (self, opts): + data = self.c.get_capture_status() + + # captures + cap_table = text_tables.TRexTextTable() + cap_table.set_cols_align(["c"] * 6) + cap_table.set_cols_width([15] * 6) + + # monitor + mon_table = text_tables.TRexTextTable() + mon_table.set_cols_align(["c"] * 6) + mon_table.set_cols_width([15] * 6) + + for elem in data: + id = elem['id'] + + if self.monitor and (self.monitor.get_capture_id() == id): + row = self.monitor.get_mon_row() + mon_table.add_rows([row], header=False) + + else: + row = [id, + format_text(elem['state'], 'bold'), + '[{0}/{1}]'.format(elem['count'], elem['limit']), + format_num(elem['bytes'], suffix = 'B'), + bitfield_to_str(elem['filter']['tx']), + bitfield_to_str(elem['filter']['rx'])] + + cap_table.add_rows([row], header=False) + + cap_table.header(['ID', 'Status', 'Packets', 'Bytes', 'TX Ports', 'RX Ports']) + mon_table.header(['ID', 'Status', 'Packets Seen', 'Bytes Seen', 'TX Ports', 'RX Ports']) + + if cap_table._rows: + text_tables.print_table_with_header(cap_table, '\nActive Recorders') + + if mon_table._rows: + text_tables.print_table_with_header(mon_table, '\nActive Monitor') + + diff --git a/scripts/automation/trex_control_plane/stl/console/trex_console.py b/scripts/automation/trex_control_plane/stl/console/trex_console.py index c9956472..d36ce7b0 100755 --- a/scripts/automation/trex_control_plane/stl/console/trex_console.py +++ b/scripts/automation/trex_control_plane/stl/console/trex_console.py @@ -29,6 +29,8 @@ import string import os import sys import tty, termios +from threading import Lock +import threading try: import stl_path @@ -39,6 +41,7 @@ from trex_stl_lib.api import * from trex_stl_lib.utils.text_opts import * from trex_stl_lib.utils.common import user_input, get_current_user from trex_stl_lib.utils import parsing_opts +from .trex_capture import CaptureManager try: import trex_tui @@ -172,6 +175,10 @@ class TRexConsole(TRexGeneralCmd): def __init__(self, stateless_client, verbose = False): + # cmd lock is used to make sure background job + # of the console is not done while the user excutes commands + self.cmd_lock = Lock() + self.stateless_client = stateless_client TRexGeneralCmd.__init__(self) @@ -184,8 +191,11 @@ class TRexConsole(TRexGeneralCmd): self.intro = "\n-=TRex Console v{ver}=-\n".format(ver=__version__) self.intro += "\nType 'help' or '?' for supported actions\n" + self.cap_mngr = CaptureManager(stateless_client, self.cmd_lock) + self.postcmd(False, "") + ################### internal section ######################## @@ -231,6 +241,7 @@ class TRexConsole(TRexGeneralCmd): lines = line.split(';') try: + self.cmd_lock.acquire() for line in lines: stop = self.onecmd(line) stop = self.postcmd(stop, line) @@ -238,10 +249,15 @@ class TRexConsole(TRexGeneralCmd): return "quit" return "" + except STLError as e: print(e) return '' + finally: + self.cmd_lock.release() + + def postcmd(self, stop, line): self.prompt = self.stateless_client.generate_prompt(prefix = 'trex') @@ -347,12 +363,12 @@ class TRexConsole(TRexGeneralCmd): @verify_connected - def do_set_rx_sniffer (self, line): - '''Sets a port sniffer on RX channel as PCAP recorder''' - self.stateless_client.set_rx_sniffer_line(line) + def do_capture (self, line): + '''Manage PCAP captures''' + self.cap_mngr.parse_line(line) - def help_sniffer (self): - self.do_set_rx_sniffer("-h") + def help_capture (self): + self.do_capture("-h") @verify_connected def do_resolve (self, line): @@ -443,7 +459,9 @@ class TRexConsole(TRexGeneralCmd): def do_disconnect (self, line): '''Disconnect from the server\n''' - + + # stop any monitors before disconnecting + self.cap_mngr.stop() self.stateless_client.disconnect_line(line) @@ -688,19 +706,25 @@ class TRexConsole(TRexGeneralCmd): l=help.splitlines() print("{:<30} {:<30}".format(cmd + " - ",l[0] )) + # a custorm cmdloop wrapper def start(self): - while True: - try: - self.cmdloop() - break - except KeyboardInterrupt as e: - if not readline.get_line_buffer(): - raise KeyboardInterrupt - else: - print("") - self.intro = None - continue + try: + while True: + try: + self.cmdloop() + break + except KeyboardInterrupt as e: + if not readline.get_line_buffer(): + raise KeyboardInterrupt + else: + print("") + self.intro = None + continue + + finally: + # capture manager is not presistent - kill it before going out + self.cap_mngr.stop() if self.terminal: self.terminal.kill() @@ -933,7 +957,7 @@ def main(): with stateless_client.logger.supress(): stateless_client.disconnect(stop_traffic = False) + if __name__ == '__main__': - main() |