From f5f92b068561dcdf8414494e5daf6d285ea24135 Mon Sep 17 00:00:00 2001 From: imarom Date: Sun, 22 Jan 2017 15:36:20 +0200 Subject: few tweaks Signed-off-by: imarom --- .../trex_control_plane/stl/console/trex_capture.py | 90 +++++++++++++++------- .../trex_control_plane/stl/console/trex_console.py | 29 +++---- .../stl/trex_stl_lib/trex_stl_client.py | 14 +++- .../stl/trex_stl_lib/trex_stl_jsonrpc_client.py | 4 +- src/rpc-server/commands/trex_rpc_cmd_general.cpp | 5 +- src/stateless/rx/trex_stateless_capture.cpp | 19 +++-- src/stateless/rx/trex_stateless_capture.h | 24 ++++-- 7 files changed, 124 insertions(+), 61 deletions(-) diff --git a/scripts/automation/trex_control_plane/stl/console/trex_capture.py b/scripts/automation/trex_control_plane/stl/console/trex_capture.py index 67b6c08c..dfd7f0a4 100644 --- a/scripts/automation/trex_control_plane/stl/console/trex_capture.py +++ b/scripts/automation/trex_control_plane/stl/console/trex_capture.py @@ -22,8 +22,6 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter): def __init__ (self, logger, is_brief): self.logger = logger self.is_brief = is_brief - self.pkt_count = 0 - self.byte_count = 0 self.RX_ARROW = u'\u25c0\u2500\u2500' self.TX_ARROW = u'\u25b6\u2500\u2500' @@ -45,11 +43,13 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter): return RC_OK() def handle_pkts (self, pkts): + byte_count = 0 + for pkt in pkts: - self.__handle_pkt(pkt) + byte_count += self.__handle_pkt(pkt) self.logger.prompt_redraw() - return True + return RC_OK(byte_count) def get_scapy_name (self, pkt_scapy): @@ -72,11 +72,8 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter): def __handle_pkt (self, pkt): pkt_bin = base64.b64decode(pkt['binary']) - self.pkt_count += 1 - self.byte_count += len(pkt_bin) - pkt_scapy = Ether(pkt_bin) - self.logger.log(format_text(u'\n\n#{} Port: {} {}\n'.format(self.pkt_count, pkt['port'], self.format_origin(pkt['origin'])), 'bold', '')) + self.logger.log(format_text(u'\n\n#{} Port: {} {}\n'.format(pkt['index'], pkt['port'], self.format_origin(pkt['origin'])), 'bold', '')) self.logger.log(format_text(' Type: {}, Size: {} B, TS: {:.2f} [sec]\n'.format(self.get_scapy_name(pkt_scapy), len(pkt_bin), pkt['ts'] - self.start_ts), 'bold')) @@ -86,6 +83,7 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter): pkt_scapy.show(label_lvl = ' ') self.logger.log('') + return len(pkt_bin) # class CaptureMonitorWriterPipe(CaptureMonitorWriter): @@ -148,18 +146,22 @@ class CaptureMonitorWriterPipe(CaptureMonitorWriter): if not rc: return rc + byte_count = 0 + for pkt in pkts: pkt_bin = base64.b64decode(pkt['binary']) - ts = pkt['ts'] - self.start_ts - sec = int(ts) - usec = int( (ts - sec) * 1e6 ) + ts = pkt['ts'] + sec = int(ts) + usec = int( (ts - sec) * 1e6 ) try: self.writer._write_packet(pkt_bin, sec = sec, usec = usec) except IOError: return RC_ERR("*** failed to write packet to pipe ***") - - return RC_OK() + + byte_count += len(pkt_bin) + + return RC_OK(byte_count) class CaptureMonitor(object): @@ -170,7 +172,7 @@ class CaptureMonitor(object): self.capture_id = None self.logger = client.logger self.writer = None - + def is_active (self): return self.active @@ -179,7 +181,7 @@ class CaptureMonitor(object): return self.capture_id - def start (self, tx_port_list, rx_port_list, rate_pps, mon_type): + def start (self, tx_port_list, rx_port_list, rate_pps, mon_type): try: self.start_internal(tx_port_list, rx_port_list, rate_pps, mon_type) except Exception as e: @@ -221,12 +223,21 @@ class CaptureMonitor(object): self.t.start() except Exception as e: self.active = False - self.client.stop_capture(self.capture_id) + self.stop() raise e - + # entry point stop def stop (self): + + if self.active: + self.stop_logged() + else: + self.__stop() + + # wraps stop with a logging + def stop_logged (self): self.logger.pre_cmd("Stopping capture monitor") + try: self.__stop() except Exception as e: @@ -235,6 +246,7 @@ class CaptureMonitor(object): self.logger.post_cmd(RC_OK()) + # internal stop def __stop (self): # shutdown thread @@ -247,15 +259,28 @@ class CaptureMonitor(object): self.writer.deinit() self.writer = None - # cleanup capture ID - if self.capture_id is not None: - try: - with self.logger.supress(): - self.client.stop_capture(self.capture_id) - self.capture_id = None - except STLError as e: - self.logger.post_cmd(RC_ERR("")) - raise e + # cleanup capture ID if possible + if self.capture_id is None: + return + + capture_id = self.capture_id + self.capture_id = None + + # if we are disconnected - we cannot cleanup the capture + if not self.client.is_connected(): + return + + try: + captures = [x['id'] for x in self.client.get_capture_status()] + if capture_id not in captures: + return + + with self.logger.supress(): + self.client.stop_capture(capture_id) + + except STLError as e: + self.logger.post_cmd(RC_ERR("")) + raise e def get_mon_row (self): @@ -311,6 +336,7 @@ class CaptureMonitor(object): self.byte_count = 0 while self.active: + # sleep if not self.__sleep(): break @@ -325,6 +351,8 @@ class CaptureMonitor(object): break try: + if not self.client.is_connected(): + return RC_ERR('*** client has been disconnected, aborting monitoring ***') rc = self.client._transmit("capture", params = {'command': 'fetch', 'capture_id': self.capture_id, 'pkt_limit': 10}) if not rc: return rc @@ -340,6 +368,9 @@ class CaptureMonitor(object): rc = self.writer.handle_pkts(pkts) if not rc: return rc + + self.pkt_count += len(pkts) + self.byte_count += rc.data() # graceful shutdown return RC_OK() @@ -446,8 +477,11 @@ class CaptureManager(object): self.record_start_parser.formatted_error('please provide either --tx or --rx') return - self.c.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit) - + rc = self.c.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit) + + self.logger.log(format_text("*** Capturing ID is set to '{0}' ***".format(rc['id']), 'bold')) + self.logger.log(format_text("*** Please call 'capture record stop --id {0} -o ' when done ***\n".format(rc['id']), 'bold')) + def parse_record_stop (self, opts): captures = self.c.get_capture_status() diff --git a/scripts/automation/trex_control_plane/stl/console/trex_console.py b/scripts/automation/trex_control_plane/stl/console/trex_console.py index bf543045..270ef31c 100755 --- a/scripts/automation/trex_control_plane/stl/console/trex_console.py +++ b/scripts/automation/trex_control_plane/stl/console/trex_console.py @@ -707,20 +707,21 @@ class TRexConsole(TRexGeneralCmd): # a custorm cmdloop wrapper def start(self): - while True: - try: - self.cmdloop() - break - except KeyboardInterrupt as e: - if not readline.get_line_buffer(): - raise KeyboardInterrupt - else: - print("") - self.intro = None - continue - - finally: - self.cap_mngr.stop() + try: + while True: + try: + self.cmdloop() + break + except KeyboardInterrupt as e: + if not readline.get_line_buffer(): + raise KeyboardInterrupt + else: + print("") + self.intro = None + continue + + finally: + self.cap_mngr.stop() if self.terminal: self.terminal.kill() diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index 5435619a..c82d77fb 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -3058,13 +3058,19 @@ class STLClient(object): self.logger.post_cmd(rc) raise STLError(rc) - pkts = rc.data()['pkts'] + pkts = rc.data()['pkts'] + pending = rc.data()['pending'] + start_ts = rc.data()['start_ts'] + for pkt in pkts: - ts = pkt['ts'] + ts = pkt['ts'] - start_ts + ts_sec = int(ts) + ts_usec = int( (ts - ts_sec) * 1e6 ) + pkt_bin = base64.b64decode(pkt['binary']) - writer._write_packet(pkt_bin, sec = 0, usec = 0) + writer._write_packet(pkt_bin, sec = ts_sec, usec = ts_usec) - pending = rc.data()['pending'] + self.logger.post_cmd(rc) diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py index 72c9317a..ff07b59a 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py @@ -184,7 +184,7 @@ class JsonRpcClient(object): break except zmq.Again: tries += 1 - if tries > 5: + if tries > 0: self.disconnect() return RC_ERR("*** [RPC] - Failed to send message to server") @@ -200,7 +200,7 @@ class JsonRpcClient(object): break except zmq.Again: tries += 1 - if tries > 5: + if tries > 0: self.disconnect() return RC_ERR("*** [RPC] - Failed to get server response from {0}".format(self.transport)) diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index be261fbb..55249fc8 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -994,8 +994,9 @@ TrexRpcCmdCapture::parse_cmd_fetch(const Json::Value ¶ms, Json::Value &resul const TrexPktBuffer *pkt_buffer = rc.get_pkt_buffer(); - result["result"]["pending"] = rc.get_pending(); - result["result"]["pkts"] = pkt_buffer->to_json(); + result["result"]["pending"] = rc.get_pending(); + result["result"]["start_ts"] = rc.get_start_ts(); + result["result"]["pkts"] = pkt_buffer->to_json(); /* delete the buffer */ delete pkt_buffer; diff --git a/src/stateless/rx/trex_stateless_capture.cpp b/src/stateless/rx/trex_stateless_capture.cpp index 5d43cede..f0d4e806 100644 --- a/src/stateless/rx/trex_stateless_capture.cpp +++ b/src/stateless/rx/trex_stateless_capture.cpp @@ -26,6 +26,8 @@ TrexStatelessCapture::TrexStatelessCapture(capture_id_t id, uint64_t limit, cons m_pkt_buffer = new TrexPktBuffer(limit, TrexPktBuffer::MODE_DROP_TAIL); m_filter = filter; m_state = STATE_ACTIVE; + m_start_ts = now_sec(); + m_pkt_index = 0; } TrexStatelessCapture::~TrexStatelessCapture() { @@ -35,7 +37,7 @@ TrexStatelessCapture::~TrexStatelessCapture() { } void -TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) { +TrexStatelessCapture::handle_pkt_tx(TrexPkt *pkt) { if (m_state != STATE_ACTIVE) { delete pkt; @@ -48,6 +50,12 @@ TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) { return; } + if (pkt->get_ts() < m_start_ts) { + delete pkt; + return; + } + + pkt->set_index(++m_pkt_index); m_pkt_buffer->push(pkt); } @@ -62,7 +70,7 @@ TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) { return; } - m_pkt_buffer->push(m, port, TrexPkt::ORIGIN_RX); + m_pkt_buffer->push(m, port, TrexPkt::ORIGIN_RX, ++m_pkt_index); } @@ -110,7 +118,8 @@ TrexStatelessCapture::fetch(uint32_t pkt_limit, uint32_t &pending) { partial->push(pkt); } - pending = m_pkt_buffer->get_element_count(); + pending = m_pkt_buffer->get_element_count(); + return partial; } @@ -181,7 +190,7 @@ TrexStatelessCaptureMngr::fetch(capture_id_t capture_id, uint32_t pkt_limit, Tre uint32_t pending = 0; TrexPktBuffer *pkt_buffer = capture->fetch(pkt_limit, pending); - rc.set_pkt_buffer(pkt_buffer, pending); + rc.set_pkt_buffer(pkt_buffer, pending, capture->get_start_ts()); } void @@ -223,7 +232,7 @@ TrexStatelessCaptureMngr::reset() { } void -TrexStatelessCaptureMngr::handle_pkt_tx(const TrexPkt *pkt) { +TrexStatelessCaptureMngr::handle_pkt_tx(TrexPkt *pkt) { for (TrexStatelessCapture *capture : m_captures) { capture->handle_pkt_tx(pkt); } diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h index 4a9efea7..bc1b88c5 100644 --- a/src/stateless/rx/trex_stateless_capture.h +++ b/src/stateless/rx/trex_stateless_capture.h @@ -121,10 +121,11 @@ public: m_pending = 0; } - void set_pkt_buffer(const TrexPktBuffer *pkt_buffer, uint32_t pending) { - m_pkt_buffer = pkt_buffer; - m_pending = pending; - m_rc = RC_OK; + void set_pkt_buffer(const TrexPktBuffer *pkt_buffer, uint32_t pending, dsec_t start_ts) { + m_pkt_buffer = pkt_buffer; + m_pending = pending; + m_start_ts = start_ts; + m_rc = RC_OK; } const TrexPktBuffer *get_pkt_buffer() const { @@ -135,9 +136,14 @@ public: return m_pending; } + dsec_t get_start_ts() const { + return m_start_ts; + } + private: const TrexPktBuffer *m_pkt_buffer; uint32_t m_pending; + dsec_t m_start_ts; }; class TrexCaptureRCRemove : public TrexCaptureRC { @@ -245,7 +251,7 @@ public: TrexStatelessCapture(capture_id_t id, uint64_t limit, const CaptureFilter &filter); - void handle_pkt_tx(const TrexPkt *pkt); + void handle_pkt_tx(TrexPkt *pkt); void handle_pkt_rx(const rte_mbuf_t *m, int port); ~TrexStatelessCapture(); @@ -274,11 +280,17 @@ public: return m_pkt_buffer->get_element_count(); } + dsec_t get_start_ts() const { + return m_start_ts; + } + private: state_e m_state; TrexPktBuffer *m_pkt_buffer; + dsec_t m_start_ts; CaptureFilter m_filter; uint64_t m_id; + uint64_t m_pkt_index; }; class TrexStatelessCaptureMngr { @@ -341,7 +353,7 @@ public: /** * handle packet from TX */ - void handle_pkt_tx(const TrexPkt *pkt); + void handle_pkt_tx(TrexPkt *pkt); /** * handle packet from RX -- cgit 1.2.3-korg