From 641fed03d8e407b6dca94f5280b9a1b4c768f601 Mon Sep 17 00:00:00 2001 From: imarom Date: Thu, 19 Jan 2017 13:30:48 +0200 Subject: fine tune Signed-off-by: imarom --- .../trex_control_plane/stl/console/trex_capture.py | 174 ++++++++++++++++----- .../stl/trex_stl_lib/trex_stl_client.py | 7 +- src/rpc-server/commands/trex_rpc_cmd_general.cpp | 1 + 3 files changed, 143 insertions(+), 39 deletions(-) diff --git a/scripts/automation/trex_control_plane/stl/console/trex_capture.py b/scripts/automation/trex_control_plane/stl/console/trex_capture.py index e5708e9b..67b6c08c 100644 --- a/scripts/automation/trex_control_plane/stl/console/trex_capture.py +++ b/scripts/automation/trex_control_plane/stl/console/trex_capture.py @@ -2,9 +2,10 @@ from trex_stl_lib.api import * from trex_stl_lib.utils import parsing_opts, text_tables import threading import tempfile +import select class CaptureMonitorWriter(object): - def init (self): + def init (self, start_ts): raise NotImplementedError def deinit(self): @@ -13,6 +14,9 @@ class CaptureMonitorWriter(object): def handle_pkts (self, pkts): raise NotImplementedError + def periodic_check (self): + raise NotImplementedError + class CaptureMonitorWriterStdout(CaptureMonitorWriter): def __init__ (self, logger, is_brief): @@ -24,21 +28,29 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter): self.RX_ARROW = u'\u25c0\u2500\u2500' self.TX_ARROW = u'\u25b6\u2500\u2500' - def init (self): - self.logger.log(format_text("\nStarting capture monitor on selected ports", 'bold')) - self.logger.log(format_text("*** any captured packet will be displayed on screen ***\n")) - self.logger.log(format_text("('capture monitor stop' to abort capturing...)\n", 'bold')) + def init (self, start_ts): + self.start_ts = start_ts + + self.logger.pre_cmd("Starting stdout capture monitor - verbose: '{0}'".format('low' if self.is_brief else 'high')) + self.logger.post_cmd(RC_OK) + + self.logger.log(format_text("\n*** use 'capture monitor stop' to abort capturing... ***\n", 'bold')) def deinit (self): pass + + def periodic_check (self): + return RC_OK() def handle_pkts (self, pkts): for pkt in pkts: self.__handle_pkt(pkt) self.logger.prompt_redraw() + return True + def get_scapy_name (self, pkt_scapy): layer = pkt_scapy @@ -46,6 +58,7 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter): layer = layer.payload return layer.name + def format_origin (self, origin): if origin == 'RX': @@ -63,11 +76,10 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter): self.byte_count += len(pkt_bin) pkt_scapy = Ether(pkt_bin) - self.logger.log(format_text(u'\n\nPort: {0} {1}\n'.format(pkt['port'], self.format_origin(pkt['origin'])), 'bold', '')) - self.logger.log(format_text(' Type: {:}, Size: {:} B, TS: {:.2f} [sec]\n'.format(self.get_scapy_name(pkt_scapy), len(pkt_bin), pkt['ts']), 'bold')) + self.logger.log(format_text(u'\n\n#{} Port: {} {}\n'.format(self.pkt_count, pkt['port'], self.format_origin(pkt['origin'])), 'bold', '')) + self.logger.log(format_text(' Type: {}, Size: {} B, TS: {:.2f} [sec]\n'.format(self.get_scapy_name(pkt_scapy), len(pkt_bin), pkt['ts'] - self.start_ts), 'bold')) - if self.is_brief: self.logger.log(' {0}'.format(pkt_scapy.command())) else: @@ -80,43 +92,74 @@ class CaptureMonitorWriterPipe(CaptureMonitorWriter): def __init__ (self, logger): self.logger = logger - def init (self): + def init (self, start_ts): + self.start_ts = start_ts self.fifo_name = tempfile.mktemp() try: + self.logger.pre_cmd('Starting pipe capture monitor') os.mkfifo(self.fifo_name) + self.logger.post_cmd(RC_OK) - self.logger.log(format_text("\nPlease run 'wireshark -k -i {0}'".format(self.fifo_name), 'bold')) - self.logger.log('\nWaiting for Wireshark connection...') + self.logger.log(format_text("*** Please run 'wireshark -k -i {0}' ***".format(self.fifo_name), 'bold')) + self.logger.pre_cmd("Waiting for Wireshark pipe connection") self.fifo = os.open(self.fifo_name, os.O_WRONLY) - self.logger.log('Successfuly connected to Wireshark...') + self.logger.post_cmd(RC_OK()) + self.logger.log(format_text('\n*** Capture monitoring started ***\n', 'bold')) self.writer = RawPcapWriter(self.fifo_name, linktype = 1, sync = True) self.writer._write_header(None) - + + # register a poller + self.poll = select.poll() + self.poll.register(self.fifo, select.EPOLLERR) + except KeyboardInterrupt as e: - os.unlink(self.fifo_name) + self.logger.post_cmd(RC_ERR("")) raise STLError("*** pipe monitor aborted...cleaning up") except OSError as e: - os.unlink(self.fifo_name) + self.logger.post_cmd(RC_ERR("")) raise STLError("failed to create pipe {0}\n{1}".format(self.fifo_name, str(e))) def deinit (self): - os.unlink(self.fifo_name) + try: + os.unlink(self.fifo_name) + except OSError: + pass + + def periodic_check (self): + return self.check_pipe() + + + def check_pipe (self): + if self.poll.poll(0): + return RC_ERR('*** pipe has been disconnected - aborting monitoring ***') + + return RC_OK() + def handle_pkts (self, pkts): + rc = self.check_pipe() + if not rc: + return rc + for pkt in pkts: pkt_bin = base64.b64decode(pkt['binary']) + ts = pkt['ts'] - self.start_ts + sec = int(ts) + usec = int( (ts - sec) * 1e6 ) + try: - self.writer._write_packet(pkt_bin, sec = 0, usec = 0) + self.writer._write_packet(pkt_bin, sec = sec, usec = usec) except IOError: - klgjdf - + return RC_ERR("*** failed to write packet to pipe ***") + + return RC_OK() class CaptureMonitor(object): @@ -124,9 +167,9 @@ class CaptureMonitor(object): self.client = client self.cmd_lock = cmd_lock self.active = False - self.capture_id = -1 + self.capture_id = None self.logger = client.logger - + self.writer = None def is_active (self): return self.active @@ -137,10 +180,20 @@ class CaptureMonitor(object): def start (self, tx_port_list, rx_port_list, rate_pps, mon_type): + try: + self.start_internal(tx_port_list, rx_port_list, rate_pps, mon_type) + except Exception as e: + self.__stop() + raise e + + def start_internal (self, tx_port_list, rx_port_list, rate_pps, mon_type): # stop any previous monitors if self.active: self.stop() + self.tx_port_list = tx_port_list + self.rx_port_list = rx_port_list + if mon_type == 'compact': self.writer = CaptureMonitorWriterStdout(self.logger, is_brief = True) elif mon_type == 'verbose': @@ -152,13 +205,14 @@ class CaptureMonitor(object): with self.logger.supress(): - self.capture_id = self.client.start_capture(tx_port_list, rx_port_list, limit = rate_pps) - - self.tx_port_list = tx_port_list - self.rx_port_list = rx_port_list + data = self.client.start_capture(tx_port_list, rx_port_list, limit = rate_pps) - self.writer.init() + self.capture_id = data['id'] + self.start_ts = data['ts'] + self.writer.init(self.start_ts) + + self.t = threading.Thread(target = self.__thread_cb) self.t.setDaemon(True) @@ -167,19 +221,43 @@ class CaptureMonitor(object): self.t.start() except Exception as e: self.active = False + self.client.stop_capture(self.capture_id) raise e - + def stop (self): + self.logger.pre_cmd("Stopping capture monitor") + try: + self.__stop() + except Exception as e: + self.logger.post_cmd(RC_ERR("")) + raise e + + self.logger.post_cmd(RC_OK()) + + def __stop (self): + + # shutdown thread if self.active: self.active = False self.t.join() - self.client.stop_capture(self.capture_id, None) - self.capture_id = -1 + # deinit the writer + if self.writer is not None: self.writer.deinit() - + self.writer = None + + # cleanup capture ID + if self.capture_id is not None: + try: + with self.logger.supress(): + self.client.stop_capture(self.capture_id) + self.capture_id = None + except STLError as e: + self.logger.post_cmd(RC_ERR("")) + raise e + def get_mon_row (self): if not self.is_active(): return None @@ -215,8 +293,20 @@ class CaptureMonitor(object): def __unlock (self): self.cmd_lock.release() - + def __thread_cb (self): + try: + rc = self.__thread_main_loop() + finally: + pass + + if not rc: + self.logger.log(str(rc)) + self.logger.log(format_text('\n*** monitor is inactive - please restart the monitor ***\n', 'bold')) + self.logger.prompt_redraw() + + + def __thread_main_loop (self): self.pkt_count = 0 self.byte_count = 0 @@ -225,6 +315,11 @@ class CaptureMonitor(object): if not self.__sleep(): break + # check that the writer is ok + rc = self.writer.periodic_check() + if not rc: + return rc + # try to lock if not self.__lock(): break @@ -232,7 +327,8 @@ class CaptureMonitor(object): try: rc = self.client._transmit("capture", params = {'command': 'fetch', 'capture_id': self.capture_id, 'pkt_limit': 10}) if not rc: - raise STLError(rc) + return rc + finally: self.__unlock() @@ -241,9 +337,13 @@ class CaptureMonitor(object): if not pkts: continue - self.writer.handle_pkts(pkts) + rc = self.writer.handle_pkts(pkts) + if not rc: + return rc - + # graceful shutdown + return RC_OK() + # main class @@ -338,7 +438,8 @@ class CaptureManager(object): elif opts.record_cmd == 'stop': self.parse_record_stop(opts) else: - assert(0) + self.record_parser.formatted_error("too few arguments") + def parse_record_start (self, opts): if not opts.tx_port_list and not opts.rx_port_list: @@ -370,7 +471,8 @@ class CaptureManager(object): elif opts.mon_cmd == 'stop': self.parse_monitor_stop(opts) else: - assert(0) + self.monitor_parser.formatted_error("too few arguments") + def parse_monitor_start (self, opts): mon_type = 'compact' diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index c632ad7c..5435619a 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -3003,7 +3003,8 @@ class STLClient(object): limit - limit how many packets will be written :returns: - the new capture_id + returns a dictionary containing + {'id: , 'ts': } :raises: + :exe:'STLError' @@ -3036,7 +3037,7 @@ class STLClient(object): if not rc: raise STLError(rc) - return rc.data()['capture_id'] + return {'id': rc.data()['capture_id'], 'ts': rc.data()['ts']} @@ -3070,7 +3071,7 @@ class STLClient(object): @__api_check(True) - def stop_capture (self, capture_id, output_filename): + def stop_capture (self, capture_id, output_filename = None): """ Stops an active capture diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index 8f7431e4..be261fbb 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -920,6 +920,7 @@ TrexRpcCmdCapture::parse_cmd_start(const Json::Value ¶ms, Json::Value &resul } result["result"]["capture_id"] = rc.get_new_id(); + result["result"]["ts"] = now_sec(); } /** -- cgit 1.2.3-korg