From 5257dbb8253fe5b70b75f9c064c4593ca7aee99f Mon Sep 17 00:00:00 2001 From: imarom Date: Wed, 4 Jan 2017 18:46:45 +0200 Subject: draft - unreviewed Signed-off-by: imarom --- .../trex_control_plane/stl/console/trex_console.py | 10 +-- .../stl/trex_stl_lib/trex_stl_client.py | 79 +++++++++------------- .../stl/trex_stl_lib/trex_stl_port.py | 70 +++++++------------ .../stl/trex_stl_lib/trex_stl_stats.py | 1 - .../stl/trex_stl_lib/utils/parsing_opts.py | 21 ++++++ 5 files changed, 82 insertions(+), 99 deletions(-) (limited to 'scripts') diff --git a/scripts/automation/trex_control_plane/stl/console/trex_console.py b/scripts/automation/trex_control_plane/stl/console/trex_console.py index 7d47128b..38a1fca4 100755 --- a/scripts/automation/trex_control_plane/stl/console/trex_console.py +++ b/scripts/automation/trex_control_plane/stl/console/trex_console.py @@ -347,12 +347,12 @@ class TRexConsole(TRexGeneralCmd): @verify_connected - def do_set_rx_sniffer (self, line): - '''Sets a port sniffer on RX channel as PCAP recorder''' - self.stateless_client.set_rx_sniffer_line(line) + def do_capture (self, line): + '''Start PCAP capturing on port''' + self.stateless_client.start_capture_line(line) - def help_sniffer (self): - self.do_set_rx_sniffer("-h") + def help_capture (self): + self.do_capture("-h") @verify_connected def do_resolve (self, line): diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index e163d516..1b57218f 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -601,11 +601,9 @@ class STLClient(object): self.xstats, self.async_client.monitor) - - - ############# private functions - used by the class itself ########### + # some preprocessing for port argument def __ports (self, port_id_list): @@ -832,27 +830,6 @@ class STLClient(object): return rc - def __set_rx_sniffer (self, port_id_list, base_filename, limit): - port_id_list = self.__ports(port_id_list) - rc = RC() - - for port_id in port_id_list: - head, tail = os.path.splitext(base_filename) - filename = "{0}-{1}{2}".format(head, port_id, tail) - rc.add(self.ports[port_id].set_rx_sniffer(filename, limit)) - - return rc - - - def __remove_rx_sniffer (self, port_id_list): - port_id_list = self.__ports(port_id_list) - rc = RC() - - for port_id in port_id_list: - rc.add(self.ports[port_id].remove_rx_sniffer()) - - return rc - def __set_rx_queue (self, port_id_list, size): port_id_list = self.__ports(port_id_list) rc = RC() @@ -1071,7 +1048,7 @@ class STLClient(object): ############ functions used by other classes but not users ############## - def _validate_port_list (self, port_id_list): + def _validate_port_list (self, port_id_list, allow_empty = False): # listfiy single int if isinstance(port_id_list, int): port_id_list = [port_id_list] @@ -1080,7 +1057,7 @@ class STLClient(object): if not isinstance(port_id_list, list): raise STLTypeError('port_id_list', type(port_id_list), list) - if not port_id_list: + if not port_id_list and not allow_empty: raise STLError('No ports provided') valid_ports = self.get_all_ports() @@ -2084,9 +2061,9 @@ class STLClient(object): self.set_port_attr(ports, promiscuous = False, link_up = True if restart else None) - self.set_service_mode(ports, False) - self.remove_rx_sniffer(ports) self.remove_rx_queue(ports) + self.set_service_mode(ports, False) + except STLError as e: self.logger.post_cmd(False) @@ -3013,29 +2990,39 @@ class STLClient(object): @__api_check(True) - def set_rx_sniffer (self, ports = None, base_filename = 'rx.pcap', limit = 1000): + def start_capture (self, tx_ports, rx_ports, limit = 1000): """ - Sets a RX sniffer for port(s) written to a PCAP file + Starts a capture to PCAP on port(s) :parameters: - ports - for which ports to apply a unique sniffer (each port gets a unique file) - base_filename - filename will be appended with '-', e.g. rx.pcap --> rx-0.pcap, rx-1.pcap etc. + tx_ports - on which ports to capture TX + rx_ports - on which ports to capture RX limit - limit how many packets will be written :raises: + :exe:'STLError' """ - ports = ports if ports is not None else self.get_acquired_ports() - ports = self._validate_port_list(ports) - + + tx_ports = self._validate_port_list(tx_ports, allow_empty = True) + rx_ports = self._validate_port_list(rx_ports, allow_empty = True) + merge_ports = set(tx_ports + rx_ports) + + if not merge_ports: + raise STLError("start_capture - must get at least one port to capture") + # check arguments - validate_type('base_filename', base_filename, basestring) validate_type('limit', limit, (int)) if limit <= 0: raise STLError("'limit' must be a positive value") - self.logger.pre_cmd("Setting RX sniffers on port(s) {0}:".format(ports)) - rc = self.__set_rx_sniffer(ports, base_filename, limit) + non_service_ports = list_difference(set(tx_ports + rx_ports), self.get_service_enabled_ports()) + if non_service_ports: + raise STLError("Port(s) {0} are not under service mode. PCAP capturing requires all ports to be in service mode") + + + self.logger.pre_cmd("Starting PCAP capturing up to {0} packets".format(limit)) + + rc = self._transmit("start_capture", params = {'limit': limit, 'tx': tx_ports, 'rx': rx_ports}) self.logger.post_cmd(rc) @@ -3045,7 +3032,7 @@ class STLClient(object): @__api_check(True) - def remove_rx_sniffer (self, ports = None): + def stop_capture (self, ports = None): """ Removes RX sniffer from port(s) @@ -3779,21 +3766,21 @@ class STLClient(object): @__console - def set_rx_sniffer_line (self, line): - '''Sets a port sniffer on RX channel in form of a PCAP file''' + def start_capture_line (self, line): + '''Starts PCAP recorder on port(s)''' parser = parsing_opts.gen_parser(self, - "set_rx_sniffer", - self.set_rx_sniffer_line.__doc__, - parsing_opts.PORT_LIST_WITH_ALL, - parsing_opts.OUTPUT_FILENAME, + "capture", + self.start_capture_line.__doc__, + parsing_opts.TX_PORT_LIST, + parsing_opts.RX_PORT_LIST, parsing_opts.LIMIT) opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True) if not opts: return opts - self.set_rx_sniffer(opts.ports, opts.output_filename, opts.limit) + self.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit) return RC_OK() diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py index 07587b9f..654514cb 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py @@ -56,7 +56,8 @@ class Port(object): def __init__ (self, port_id, user, comm_link, session_id, info): self.port_id = port_id - self.state = self.STATE_IDLE + self.state = self.STATE_IDLE + self.service_mode = False self.handler = None self.comm_link = comm_link @@ -247,14 +248,16 @@ class Port(object): raise Exception("port {0}: bad state received from server '{1}'".format(self.port_id, port_state)) self.owner = rc.data()['owner'] - + self.next_available_id = int(rc.data()['max_stream_id']) + 1 self.status = rc.data() - + # replace the attributes in a thread safe manner self.set_ts_attr(rc.data()['attr']) - + + self.service_mode = rc.data()['service'] + return self.ok() @@ -490,33 +493,17 @@ class Port(object): @owned - def set_rx_sniffer (self, pcap_filename, limit): + def start_capture (self, pcap_filename, mode, limit): - if not self.is_service_mode_on(): + if mode != 'tx' and not self.is_service_mode_on(): return self.err('port service mode must be enabled for performing RX capturing. Please enable service mode') params = {"handler": self.handler, "port_id": self.port_id, - "type": "capture", - "enabled": True, - "pcap_filename": pcap_filename, + "mode": mode, "limit": limit} - rc = self.transmit("set_rx_feature", params) - if rc.bad(): - return self.err(rc.err()) - - return self.ok() - - - @owned - def remove_rx_sniffer (self): - params = {"handler": self.handler, - "port_id": self.port_id, - "type": "capture", - "enabled": False} - - rc = self.transmit("set_rx_feature", params) + rc = self.transmit("start_capture", params) if rc.bad(): return self.err(rc.err()) @@ -719,23 +706,21 @@ class Port(object): @owned def set_service_mode (self, enabled): - rc = self.set_attr(rx_filter_mode = 'all' if enabled else 'hw') - if not rc: - return rc - - if not enabled: - rc = self.remove_rx_queue() - if not rc: - return rc - - rc = self.remove_rx_sniffer() - if not rc: - return rc - + params = {"handler": self.handler, + "port_id": self.port_id, + "enabled": enabled} + + rc = self.transmit("service", params) + if rc.bad(): + return self.err(rc.err()) + + self.service_mode = enabled return self.ok() + def is_service_mode_on (self): - return self.get_rx_filter_mode() == 'all' + return self.service_mode + @writeable def push_remote (self, pcap_filename, ipg_usec, speedup, count, duration, is_dual, slave_handler, min_ipg_usec): @@ -902,11 +887,6 @@ class Port(object): # RX info rx_info = self.status['rx_info'] - # RX sniffer - sniffer = rx_info['sniffer'] - info['rx_sniffer'] = '{0}\n[{1} / {2}]'.format(sniffer['pcap_filename'], sniffer['count'], sniffer['limit']) if sniffer['is_active'] else 'off' - - # RX queue queue = rx_info['queue'] info['rx_queue'] = '[{0} / {1}]'.format(queue['count'], queue['size']) if queue['is_active'] else 'off' @@ -928,9 +908,6 @@ class Port(object): def get_layer_cfg (self): return self.__attr['layer_cfg'] - def get_rx_filter_mode (self): - return self.__attr['rx_filter_mode'] - def is_l3_mode (self): return self.get_layer_cfg()['ipv4']['state'] != 'none' @@ -1002,7 +979,6 @@ class Port(object): "layer mode": format_text(info['layer_mode'], 'green' if info['layer_mode'] == 'IPv4' else 'magenta'), "RX Filter Mode": info['rx_filter_mode'], "RX Queueing": info['rx_queue'], - "RX sniffer": info['rx_sniffer'], "Grat ARP": info['grat_arp'], } diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py index 38726062..21c9af87 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_stats.py @@ -682,7 +682,6 @@ class CTRexInfoGenerator(object): ("-----", []), ("RX Filter Mode", []), ("RX Queueing", []), - ("RX sniffer", []), ("Grat ARP", []), ] ) diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py index f5dab30c..265c43fb 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py @@ -63,6 +63,9 @@ PKT_SIZE SERVICE_OFF +TX_PORT_LIST +RX_PORT_LIST + SRC_IPV4 DST_IPV4 @@ -591,6 +594,24 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'], 'default': True, 'help': 'Deactivates services on port(s)'}), + TX_PORT_LIST: ArgumentPack(['--tx'], + {'nargs': '+', + 'dest':'tx_port_list', + 'metavar': 'TX', + 'action': 'merge', + 'type': int, + 'help': 'A list of ports to capture on the TX side', + 'default': []}), + + RX_PORT_LIST: ArgumentPack(['--rx'], + {'nargs': '+', + 'dest':'rx_port_list', + 'metavar': 'RX', + 'action': 'merge', + 'type': int, + 'help': 'A list of ports to capture on the RX side', + 'default': []}), + # advanced options PORT_LIST_WITH_ALL: ArgumentGroup(MUTEX, [PORT_LIST, ALL_PORTS], -- cgit 1.2.3-korg From ac2e93d4247b2db94cd07301b274336bb08dec46 Mon Sep 17 00:00:00 2001 From: imarom Date: Wed, 11 Jan 2017 18:19:47 +0200 Subject: capture - draft commit Signed-off-by: imarom --- .../trex_control_plane/stl/console/trex_console.py | 4 +- .../stl/trex_stl_lib/trex_stl_client.py | 143 ++++++++++++-- .../stl/trex_stl_lib/utils/common.py | 16 +- .../stl/trex_stl_lib/utils/parsing_opts.py | 52 +++++- src/rpc-server/commands/trex_rpc_cmd_general.cpp | 118 +++++++++++- src/rpc-server/commands/trex_rpc_cmds.h | 8 +- src/rpc-server/trex_rpc_cmds_table.cpp | 2 +- src/stateless/cp/trex_stateless.cpp | 31 +--- src/stateless/cp/trex_stateless.h | 12 +- src/stateless/cp/trex_stateless_port.h | 15 +- .../messaging/trex_stateless_messaging.cpp | 46 ++++- src/stateless/messaging/trex_stateless_messaging.h | 54 ++++-- src/stateless/rx/trex_stateless_capture.cpp | 142 ++++++++++++-- src/stateless/rx/trex_stateless_capture.h | 205 +++++++++++++++++++-- src/stateless/rx/trex_stateless_rx_core.cpp | 14 -- src/stateless/rx/trex_stateless_rx_core.h | 10 +- 16 files changed, 723 insertions(+), 149 deletions(-) (limited to 'scripts') diff --git a/scripts/automation/trex_control_plane/stl/console/trex_console.py b/scripts/automation/trex_control_plane/stl/console/trex_console.py index 38a1fca4..b0ab70e0 100755 --- a/scripts/automation/trex_control_plane/stl/console/trex_console.py +++ b/scripts/automation/trex_control_plane/stl/console/trex_console.py @@ -348,8 +348,8 @@ class TRexConsole(TRexGeneralCmd): @verify_connected def do_capture (self, line): - '''Start PCAP capturing on port''' - self.stateless_client.start_capture_line(line) + '''Manage PCAP captures''' + self.stateless_client.capture_line(line) def help_capture (self): self.do_capture("-h") diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index 1b57218f..d75c554e 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -26,6 +26,7 @@ import random import json import traceback import os.path +import argparse ############################ logger ############################# ############################ ############################# @@ -2961,7 +2962,7 @@ class STLClient(object): Resolves ports (ARP resolution) :parameters: - ports - for which ports to apply a unique sniffer (each port gets a unique file) + ports - which ports to resolve retires - how many times to retry on each port (intervals of 100 milliseconds) verbose - log for each request the response :raises: @@ -3022,7 +3023,7 @@ class STLClient(object): self.logger.pre_cmd("Starting PCAP capturing up to {0} packets".format(limit)) - rc = self._transmit("start_capture", params = {'limit': limit, 'tx': tx_ports, 'rx': rx_ports}) + rc = self._transmit("capture", params = {'command': 'start', 'limit': limit, 'tx': tx_ports, 'rx': rx_ports}) self.logger.post_cmd(rc) @@ -3032,24 +3033,82 @@ class STLClient(object): @__api_check(True) - def stop_capture (self, ports = None): + def stop_capture (self, capture_id, output_filename): """ - Removes RX sniffer from port(s) + Stops an active capture + + :parameters: + capture_id - an active capture ID to stop + output_filename - output filename to save capture :raises: + :exe:'STLError' """ - ports = ports if ports is not None else self.get_acquired_ports() - ports = self._validate_port_list(ports) - self.logger.pre_cmd("Removing RX sniffers on port(s) {0}:".format(ports)) - rc = self.__remove_rx_sniffer(ports) + + + # stopping a capture requires: + # 1. stopping + # 2. fetching + # 3. saving to file + + # stop + + self.logger.pre_cmd("Stopping PCAP capture {0}".format(capture_id)) + rc = self._transmit("capture", params = {'command': 'stop', 'capture_id': capture_id}) self.logger.post_cmd(rc) + if not rc: + raise STLError(rc) + + # pkt count + pkt_count = rc.data()['pkt_count'] + + if not output_filename or pkt_count == 0: + return + + self.logger.pre_cmd("Writing {0} packets to '{1}'".format(pkt_count, output_filename)) + + # create a PCAP file + writer = RawPcapWriter(output_filename, linktype = 1) + writer._write_header(None) + + # fetch + while True: + rc = self._transmit("capture", params = {'command': 'fetch', 'capture_id': capture_id, 'pkt_limit': 50}) + if not rc: + self.logger.post_cmd(rc) + raise STLError(rc) + + pkts = rc.data()['pkts'] + for pkt in pkts: + ts = pkt['ts'] + pkt_bin = base64.b64decode(pkt['binary']) + writer._write_packet(pkt_bin, sec = 0, usec = 0) + + if rc.data()['pending'] == 0: + break + + self.logger.post_cmd(rc) + + + # get capture status + @__api_check(True) + def get_capture_status (self): + """ + returns a list of all active captures + each element in the list is an object containing + info about the capture + + """ + + rc = self._transmit("capture", params = {'command': 'status'}) if not rc: raise STLError(rc) + return rc.data() + @__api_check(True) def set_rx_queue (self, ports = None, size = 1000): @@ -3766,23 +3825,71 @@ class STLClient(object): @__console - def start_capture_line (self, line): - '''Starts PCAP recorder on port(s)''' + def capture_line (self, line): + '''Manage PCAP recorders''' - parser = parsing_opts.gen_parser(self, - "capture", - self.start_capture_line.__doc__, - parsing_opts.TX_PORT_LIST, - parsing_opts.RX_PORT_LIST, - parsing_opts.LIMIT) + # default + if not line: + line = "show" + + parser = parsing_opts.gen_parser(self, "capture", self.capture_line.__doc__) + subparsers = parser.add_subparsers(title = "commands", dest="commands") + + # start + start_parser = subparsers.add_parser('start', help = "starts a new capture") + start_parser.add_arg_list(parsing_opts.TX_PORT_LIST, + parsing_opts.RX_PORT_LIST, + parsing_opts.LIMIT) + + # stop + stop_parser = subparsers.add_parser('stop', help = "stops an active capture") + stop_parser.add_arg_list(parsing_opts.CAPTURE_ID, + parsing_opts.OUTPUT_FILENAME) + + # show + show_parser = subparsers.add_parser('show', help = "show all active captures") + + opts = parser.parse_args(line.split()) - opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True) if not opts: return opts - self.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit) + # start + if opts.commands == 'start': + if not opts.tx_port_list and not opts.rx_port_list: + start_parser.formatted_error('please provide either --tx or --rx') + return + + self.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit) + + # stop + elif opts.commands == 'stop': + self.stop_capture(opts.capture_id, opts.output_filename) + + # show + else: + data = self.get_capture_status() + + stats_table = text_tables.TRexTextTable() + stats_table.set_cols_align(["c"] * 6) + stats_table.set_cols_width([15] * 6) + + for elem in data: + row = [elem['id'], + elem['state'], + '[{0}/{1}]'.format(elem['count'], elem['limit']), + format_num(elem['bytes'], suffix = 'B'), + bitfield_to_str(elem['filter']['tx']), + bitfield_to_str(elem['filter']['rx'])] + + stats_table.add_rows([row], header=False) + + stats_table.header(['ID', 'Status', 'Count', 'Bytes', 'TX Ports', 'RX Ports']) + text_tables.print_table_with_header(stats_table, "Captures") + return RC_OK() + @__console diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py index cbbacb27..c386451b 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py @@ -107,4 +107,18 @@ def list_remove_dup (l): return tmp - +def bitfield_to_list (bf): + rc = [] + bitpos = 0 + + while bf > 0: + if bf & 0x1: + rc.append(bitpos) + bitpos += 1 + bf = bf >> 1 + + return rc + +def bitfield_to_str (bf): + lst = bitfield_to_list(bf) + return "-" if not lst else ', '.join([str(x) for x in lst]) diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py index 265c43fb..cb594ef4 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py @@ -69,6 +69,8 @@ RX_PORT_LIST SRC_IPV4 DST_IPV4 +CAPTURE_ID + GLOBAL_STATS PORT_STATS PORT_STATUS @@ -81,12 +83,14 @@ EXTENDED_INC_ZERO_STATS STREAMS_MASK CORE_MASK_GROUP +CAPTURE_PORTS_GROUP # ALL_STREAMS # STREAM_LIST_WITH_ALL # list of ArgumentGroup types MUTEX +NON_MUTEX ''' @@ -392,7 +396,6 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'], {'help': 'Output PCAP filename', 'dest': 'output_filename', 'default': None, - 'required': True, 'type': str}), @@ -612,6 +615,12 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'], 'help': 'A list of ports to capture on the RX side', 'default': []}), + CAPTURE_ID: ArgumentPack(['-i', '--id'], + {'help': "capture ID to remove", + 'dest': "capture_id", + 'type': int, + 'required': True}), + # advanced options PORT_LIST_WITH_ALL: ArgumentGroup(MUTEX, [PORT_LIST, ALL_PORTS], @@ -636,6 +645,7 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'], CORE_MASK], {'required': False}), + CAPTURE_PORTS_GROUP: ArgumentGroup(NON_MUTEX, [TX_PORT_LIST, RX_PORT_LIST], {}), } class _MergeAction(argparse._AppendAction): @@ -654,12 +664,30 @@ class _MergeAction(argparse._AppendAction): class CCmdArgParser(argparse.ArgumentParser): - def __init__(self, stateless_client, *args, **kwargs): + def __init__(self, stateless_client = None, x = None, *args, **kwargs): super(CCmdArgParser, self).__init__(*args, **kwargs) self.stateless_client = stateless_client self.cmd_name = kwargs.get('prog') self.register('action', 'merge', _MergeAction) + + def add_arg_list (self, *args): + populate_parser(self, *args) + + def add_subparsers(self, *args, **kwargs): + sub = super(CCmdArgParser, self).add_subparsers(*args, **kwargs) + + add_parser = sub.add_parser + stateless_client = self.stateless_client + + def add_parser_hook (self, *args, **kwargs): + parser = add_parser(self, *args, **kwargs) + parser.stateless_client = stateless_client + return parser + + sub.add_parser = add_parser_hook + return sub + # hook this to the logger def _print_message(self, message, file=None): self.stateless_client.logger.log(message) @@ -730,13 +758,15 @@ class CCmdArgParser(argparse.ArgumentParser): # recover from system exit scenarios, such as "help", or bad arguments. return RC_ERR("'{0}' - {1}".format(self.cmd_name, "no action")) + def formatted_error (self, msg): + self.print_usage() + self.stateless_client.logger.log(msg) + def get_flags (opt): return OPTIONS_DB[opt].name_or_flags -def gen_parser(stateless_client, op_name, description, *args): - parser = CCmdArgParser(stateless_client, prog=op_name, conflict_handler='resolve', - description=description) +def populate_parser (parser, *args): for param in args: try: @@ -752,6 +782,12 @@ def gen_parser(stateless_client, op_name, description, *args): for sub_argument in argument.args: group.add_argument(*OPTIONS_DB[sub_argument].name_or_flags, **OPTIONS_DB[sub_argument].options) + + elif argument.type == NON_MUTEX: + group = parser.add_argument_group(**argument.options) + for sub_argument in argument.args: + group.add_argument(*OPTIONS_DB[sub_argument].name_or_flags, + **OPTIONS_DB[sub_argument].options) else: # ignore invalid objects continue @@ -764,6 +800,12 @@ def gen_parser(stateless_client, op_name, description, *args): except KeyError as e: cause = e.args[0] raise KeyError("The attribute '{0}' is missing as a field of the {1} option.\n".format(cause, param)) + +def gen_parser(stateless_client, op_name, description, *args): + parser = CCmdArgParser(stateless_client, prog=op_name, conflict_handler='resolve', + description=description) + + populate_parser(parser, *args) return parser diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index ec5c3158..80f69fa3 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -29,6 +29,7 @@ limitations under the License. #include "trex_stateless_rx_core.h" #include "trex_stateless_capture.h" +#include "trex_stateless_messaging.h" #include #include @@ -844,13 +845,38 @@ TrexRpcCmdSetL3::_run(const Json::Value ¶ms, Json::Value &result) { return (TREX_RPC_CMD_OK); } - + + /** - * starts PCAP capturing + * capture command tree * */ trex_rpc_cmd_rc_e -TrexRpcCmdStartCapture::_run(const Json::Value ¶ms, Json::Value &result) { +TrexRpcCmdCapture::_run(const Json::Value ¶ms, Json::Value &result) { + const std::string cmd = parse_choice(params, "command", {"start", "stop", "fetch", "status"}, result); + + if (cmd == "start") { + parse_cmd_start(params, result); + } else if (cmd == "stop") { + parse_cmd_stop(params, result); + } else if (cmd == "fetch") { + parse_cmd_fetch(params, result); + } else if (cmd == "status") { + parse_cmd_status(params, result); + } else { + /* can't happen */ + assert(0); + } + + return TREX_RPC_CMD_OK; +} + +/** + * starts PCAP capturing + * + */ +void +TrexRpcCmdCapture::parse_cmd_start(const Json::Value ¶ms, Json::Value &result) { uint32_t limit = parse_uint32(params, "limit", result); const Json::Value &tx_json = parse_array(params, "tx", result); @@ -881,8 +907,90 @@ TrexRpcCmdStartCapture::_run(const Json::Value ¶ms, Json::Value &result) { } } - get_stateless_obj()->start_capture(filter, limit); + static MsgReply reply; + reply.reset(); + + TrexStatelessRxCaptureStart *start_msg = new TrexStatelessRxCaptureStart(filter, limit, reply); + get_stateless_obj()->send_msg_to_rx(start_msg); + + TrexCaptureRCStart rc = reply.wait_for_reply(); + if (!rc) { + generate_execute_err(result, rc.get_err()); + } result["result"] = Json::objectValue; - return (TREX_RPC_CMD_OK); } + +/** + * stops PCAP capturing + * + */ +void +TrexRpcCmdCapture::parse_cmd_stop(const Json::Value ¶ms, Json::Value &result) { + + uint32_t capture_id = parse_uint32(params, "capture_id", result); + + static MsgReply reply; + reply.reset(); + + TrexStatelessRxCaptureStop *stop_msg = new TrexStatelessRxCaptureStop(capture_id, reply); + get_stateless_obj()->send_msg_to_rx(stop_msg); + + TrexCaptureRCStop rc = reply.wait_for_reply(); + if (!rc) { + generate_execute_err(result, rc.get_err()); + } + + result["result"]["pkt_count"] = rc.get_pkt_count(); +} + +/** + * gets the status of all captures in the system + * + */ +void +TrexRpcCmdCapture::parse_cmd_status(const Json::Value ¶ms, Json::Value &result) { + + /* generate a status command */ + + static MsgReply reply; + reply.reset(); + + TrexStatelessRxCaptureStatus *status_msg = new TrexStatelessRxCaptureStatus(reply); + get_stateless_obj()->send_msg_to_rx(status_msg); + + TrexCaptureRCStatus rc = reply.wait_for_reply(); + if (!rc) { + generate_execute_err(result, rc.get_err()); + } + + result["result"] = rc.get_status(); +} + +/** + * fetch packets from a capture + * + */ +void +TrexRpcCmdCapture::parse_cmd_fetch(const Json::Value ¶ms, Json::Value &result) { + + uint32_t capture_id = parse_uint32(params, "capture_id", result); + uint32_t pkt_limit = parse_uint32(params, "pkt_limit", result); + + /* generate a fetch command */ + + static MsgReply reply; + reply.reset(); + + TrexStatelessRxCaptureFetch *fetch_msg = new TrexStatelessRxCaptureFetch(capture_id, pkt_limit, reply); + get_stateless_obj()->send_msg_to_rx(fetch_msg); + + TrexCaptureRCFetch rc = reply.wait_for_reply(); + if (!rc) { + generate_execute_err(result, rc.get_err()); + } + + result["result"]["pkts"] = rc.get_pkt_buffer()->to_json(); + result["result"]["pending"] = rc.get_pending(); +} + diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h index 1ea63cc7..bf78ff80 100644 --- a/src/rpc-server/commands/trex_rpc_cmds.h +++ b/src/rpc-server/commands/trex_rpc_cmds.h @@ -160,7 +160,13 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdGetRxQueuePkts, "get_rx_queue_pkts", 1, true, APIC TREX_RPC_CMD_DEFINE(TrexRpcCmdSetServiceMode, "service", 2, true, APIClass::API_CLASS_TYPE_CORE); -TREX_RPC_CMD_DEFINE(TrexRpcCmdStartCapture, "start_capture", 3, false, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdCapture, "capture", 1, false, APIClass::API_CLASS_TYPE_CORE, + void parse_cmd_start(const Json::Value &msg, Json::Value &result); + void parse_cmd_stop(const Json::Value &msg, Json::Value &result); + void parse_cmd_status(const Json::Value &msg, Json::Value &result); + void parse_cmd_fetch(const Json::Value &msg, Json::Value &result); +); + #endif /* __TREX_RPC_CMD_H__ */ diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp index 3d4d5a23..2af9f4f5 100644 --- a/src/rpc-server/trex_rpc_cmds_table.cpp +++ b/src/rpc-server/trex_rpc_cmds_table.cpp @@ -79,7 +79,7 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() { register_command(new TrexRpcCmdSetL2()); register_command(new TrexRpcCmdSetL3()); - register_command(new TrexRpcCmdStartCapture()); + register_command(new TrexRpcCmdCapture()); } diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index 32babbf7..6ab9b417 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -19,7 +19,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -//#include #include #include @@ -142,35 +141,11 @@ TrexStateless::get_dp_core_count() { return m_platform_api->get_dp_core_count(); } -capture_id_t -TrexStateless::start_capture(const CaptureFilter &filter, uint64_t limit) { - static MsgReply reply; - - reply.reset(); - - CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0); - TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(filter, limit, reply); - - ring->Enqueue((CGenNode *)msg); - - capture_id_t new_id = reply.wait_for_reply(); - - return (new_id); -} +void +TrexStateless::send_msg_to_rx(TrexStatelessCpToRxMsgBase *msg) const { -capture_id_t -TrexStateless::stop_capture(capture_id_t capture_id) { - static MsgReply reply; - - reply.reset(); - CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0); - TrexStatelessRxStopCapture *msg = new TrexStatelessRxStopCapture(capture_id, reply); - ring->Enqueue((CGenNode *)msg); - - capture_id_t rc = reply.wait_for_reply(); - - return (rc); } + diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h index 33f16ce9..87d227f6 100644 --- a/src/stateless/cp/trex_stateless.h +++ b/src/stateless/cp/trex_stateless.h @@ -102,7 +102,6 @@ public: * defines the TRex stateless operation mode * */ -class CaptureFilter; class TrexStateless { public: @@ -133,16 +132,9 @@ public: /** - * starts a capture on a 'filter' of ports - * with a limit of packets + * send a message to the RX core */ - capture_id_t start_capture(const CaptureFilter &filter, uint64_t limit); - - /** - * stops an active capture - * - */ - capture_id_t stop_capture(capture_id_t capture_id); + void send_msg_to_rx(TrexStatelessCpToRxMsgBase *msg) const; /** * shutdown the server diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index 2cc1b9ca..0ef8ae60 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -140,7 +140,7 @@ public: } if (TrexStatelessCaptureMngr::getInstance().is_active(m_port_id)) { - throw TrexException("unable to disable service - an active capture on port " + std::to_string(m_port_id) + " exists"); + throw TrexException("unable to disable service mode - an active capture on port " + std::to_string(m_port_id) + " exists"); } m_port_attr->set_rx_filter_mode(RX_FILTER_MODE_HW); @@ -439,19 +439,6 @@ public: void get_pci_info(std::string &pci_addr, int &numa_node); - - /** - * starts capturing packets - * - */ - void start_capture(capture_mode_e mode, uint64_t limit); - - /** - * stops capturing packets - * - */ - void stop_capture(); - /** * start RX queueing of packets * diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index f441c692..b9bb1d1c 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -262,24 +262,58 @@ bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) { bool -TrexStatelessRxStartCapture::handle(CRxCoreStateless *rx_core) { - capture_id_t capture_id = rx_core->start_capture(m_limit, m_filter); +TrexStatelessRxCaptureStart::handle(CRxCoreStateless *rx_core) { + + TrexCaptureRCStart start_rc; + + TrexStatelessCaptureMngr::getInstance().start(m_filter, m_limit, start_rc); + + /* mark as done */ + m_reply.set_reply(start_rc); + + return true; +} + +bool +TrexStatelessRxCaptureStop::handle(CRxCoreStateless *rx_core) { + + TrexCaptureRCStop stop_rc; + + TrexStatelessCaptureMngr::getInstance().stop(m_capture_id, stop_rc); + + /* mark as done */ + m_reply.set_reply(stop_rc); + + return true; +} +bool +TrexStatelessRxCaptureFetch::handle(CRxCoreStateless *rx_core) { + + TrexCaptureRCFetch fetch_rc; + + TrexStatelessCaptureMngr::getInstance().fetch(m_capture_id, m_pkt_limit, fetch_rc); + /* mark as done */ - m_reply.set_reply(capture_id); + m_reply.set_reply(fetch_rc); return true; } bool -TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) { - capture_id_t rc = rx_core->stop_capture(m_capture_id); +TrexStatelessRxCaptureStatus::handle(CRxCoreStateless *rx_core) { + + TrexCaptureRCStatus status_rc; - m_reply.set_reply(rc); + status_rc.set_status(TrexStatelessCaptureMngr::getInstance().to_json()); + + /* mark as done */ + m_reply.set_reply(status_rc); return true; } + bool TrexStatelessRxStartQueue::handle(CRxCoreStateless *rx_core) { rx_core->start_queue(m_port_id, m_size); diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index 5f4978f5..4027d075 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -485,12 +485,16 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase { }; +class TrexStatelessRxCapture : public TrexStatelessCpToRxMsgBase { +public: + virtual bool handle (CRxCoreStateless *rx_core) = 0; +}; -class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase { +class TrexStatelessRxCaptureStart : public TrexStatelessRxCapture { public: - TrexStatelessRxStartCapture(const CaptureFilter& filter, + TrexStatelessRxCaptureStart(const CaptureFilter& filter, uint64_t limit, - MsgReply &reply) : m_reply(reply) { + MsgReply &reply) : m_reply(reply) { m_limit = limit; m_filter = filter; @@ -499,24 +503,52 @@ public: virtual bool handle(CRxCoreStateless *rx_core); private: - uint8_t m_port_id; - uint64_t m_limit; - CaptureFilter m_filter; - MsgReply &m_reply; + uint8_t m_port_id; + uint64_t m_limit; + CaptureFilter m_filter; + MsgReply &m_reply; +}; + + +class TrexStatelessRxCaptureStop : public TrexStatelessRxCapture { +public: + TrexStatelessRxCaptureStop(capture_id_t capture_id, MsgReply &reply) : m_reply(reply) { + m_capture_id = capture_id; + } + + virtual bool handle(CRxCoreStateless *rx_core); + +private: + capture_id_t m_capture_id; + MsgReply &m_reply; }; -class TrexStatelessRxStopCapture : public TrexStatelessCpToRxMsgBase { +class TrexStatelessRxCaptureFetch : public TrexStatelessRxCapture { public: - TrexStatelessRxStopCapture(capture_id_t capture_id, MsgReply &reply) : m_reply(reply) { + TrexStatelessRxCaptureFetch(capture_id_t capture_id, uint32_t pkt_limit, MsgReply &reply) : m_reply(reply) { m_capture_id = capture_id; + m_pkt_limit = pkt_limit; + } + + virtual bool handle(CRxCoreStateless *rx_core); + +private: + capture_id_t m_capture_id; + uint32_t m_pkt_limit; + MsgReply &m_reply; +}; + + +class TrexStatelessRxCaptureStatus : public TrexStatelessRxCapture { +public: + TrexStatelessRxCaptureStatus(MsgReply &reply) : m_reply(reply) { } virtual bool handle(CRxCoreStateless *rx_core); private: - capture_id_t m_capture_id; - MsgReply &m_reply; + MsgReply &m_reply; }; diff --git a/src/stateless/rx/trex_stateless_capture.cpp b/src/stateless/rx/trex_stateless_capture.cpp index 4ed126cc..85be7aef 100644 --- a/src/stateless/rx/trex_stateless_capture.cpp +++ b/src/stateless/rx/trex_stateless_capture.cpp @@ -25,6 +25,7 @@ TrexStatelessCapture::TrexStatelessCapture(capture_id_t id, uint64_t limit, cons m_id = id; m_pkt_buffer = new TrexPktBuffer(limit, TrexPktBuffer::MODE_DROP_TAIL); m_filter = filter; + m_state = STATE_ACTIVE; } TrexStatelessCapture::~TrexStatelessCapture() { @@ -35,9 +36,15 @@ TrexStatelessCapture::~TrexStatelessCapture() { void TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) { + + if (m_state != STATE_ACTIVE) { + delete pkt; + return; + } /* if not in filter - back off */ if (!m_filter.in_filter(pkt)) { + delete pkt; return; } @@ -46,6 +53,11 @@ TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) { void TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) { + + if (m_state != STATE_ACTIVE) { + return; + } + if (!m_filter.in_rx(port)) { return; } @@ -53,6 +65,56 @@ TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) { m_pkt_buffer->push(m); } + +Json::Value +TrexStatelessCapture::to_json() const { + Json::Value output = Json::objectValue; + + output["id"] = Json::UInt64(m_id); + output["filter"] = m_filter.to_json(); + output["count"] = m_pkt_buffer->get_element_count(); + output["bytes"] = m_pkt_buffer->get_bytes(); + output["limit"] = m_pkt_buffer->get_capacity(); + + switch (m_state) { + case STATE_ACTIVE: + output["state"] = "ACTIVE"; + break; + + case STATE_STOPPED: + output["state"] = "STOPPED"; + break; + + default: + assert(0); + + } + + return output; +} + +TrexPktBuffer * +TrexStatelessCapture::fetch(uint32_t pkt_limit, uint32_t &pending) { + + /* if the total sum of packets is within the limit range - take it */ + if (m_pkt_buffer->get_element_count() <= pkt_limit) { + TrexPktBuffer *current = m_pkt_buffer; + m_pkt_buffer = new TrexPktBuffer(m_pkt_buffer->get_capacity(), m_pkt_buffer->get_mode()); + pending = 0; + return current; + } + + /* harder part - partial fetch */ + TrexPktBuffer *partial = new TrexPktBuffer(pkt_limit); + for (int i = 0; i < pkt_limit; i++) { + const TrexPkt *pkt = m_pkt_buffer->pop(); + partial->push(pkt); + } + + pending = m_pkt_buffer->get_element_count(); + return partial; +} + void TrexStatelessCaptureMngr::update_global_filter() { CaptureFilter new_filter; @@ -64,11 +126,25 @@ TrexStatelessCaptureMngr::update_global_filter() { m_global_filter = new_filter; } -capture_id_t -TrexStatelessCaptureMngr::add(uint64_t limit, const CaptureFilter &filter) { +TrexStatelessCapture * +TrexStatelessCaptureMngr::lookup(capture_id_t capture_id) { + for (int i = 0; i < m_captures.size(); i++) { + if (m_captures[i]->get_id() == capture_id) { + return m_captures[i]; + } + } + + /* does not exist */ + return nullptr; +} + +void +TrexStatelessCaptureMngr::start(const CaptureFilter &filter, uint64_t limit, TrexCaptureRCStart &rc) { + if (m_captures.size() > MAX_CAPTURE_SIZE) { - return CAPTURE_TOO_MANY_CAPTURES; + rc.set_err(TrexCaptureRC::RC_CAPTURE_LIMIT_REACHED); + return; } @@ -79,15 +155,46 @@ TrexStatelessCaptureMngr::add(uint64_t limit, const CaptureFilter &filter) { /* update global filter */ update_global_filter(); - return new_id; + /* result */ + rc.set_new_id(new_id); +} + +void +TrexStatelessCaptureMngr::stop(capture_id_t capture_id, TrexCaptureRCStop &rc) { + TrexStatelessCapture *capture = lookup(capture_id); + if (!capture) { + rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND); + return; + } + + capture->stop(); + rc.set_count(capture->get_pkt_count()); } -capture_id_t -TrexStatelessCaptureMngr::remove(capture_id_t id) { +void +TrexStatelessCaptureMngr::fetch(capture_id_t capture_id, uint32_t pkt_limit, TrexCaptureRCFetch &rc) { + TrexStatelessCapture *capture = lookup(capture_id); + if (!capture) { + rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND); + return; + } + if (capture->is_active()) { + rc.set_err(TrexCaptureRC::RC_CAPTURE_FETCH_UNDER_ACTIVE); + return; + } + uint32_t pending = 0; + TrexPktBuffer *pkt_buffer = capture->fetch(pkt_limit, pending); + + rc.set_pkt_buffer(pkt_buffer, pending); +} + +void +TrexStatelessCaptureMngr::remove(capture_id_t capture_id, TrexCaptureRCRemove &rc) { + int index = -1; for (int i = 0; i < m_captures.size(); i++) { - if (m_captures[i]->get_id() == id) { + if (m_captures[i]->get_id() == capture_id) { index = i; break; } @@ -95,24 +202,26 @@ TrexStatelessCaptureMngr::remove(capture_id_t id) { /* does not exist */ if (index == -1) { - return CAPTURE_ID_NOT_FOUND; + rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND); + return; } TrexStatelessCapture *capture = m_captures[index]; m_captures.erase(m_captures.begin() + index); + /* free memory */ delete capture; /* update global filter */ update_global_filter(); - - return id; } void TrexStatelessCaptureMngr::reset() { + TrexCaptureRCRemove dummy; + while (m_captures.size() > 0) { - remove(m_captures[0]->get_id()); + remove(m_captures[0]->get_id(), dummy); } } @@ -130,3 +239,14 @@ TrexStatelessCaptureMngr::handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port) } } +Json::Value +TrexStatelessCaptureMngr::to_json() const { + Json::Value lst = Json::arrayValue; + + for (TrexStatelessCapture *capture : m_captures) { + lst.append(capture->to_json()); + } + + return lst; +} + diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h index 4d0b6a78..6cd25a94 100644 --- a/src/stateless/rx/trex_stateless_capture.h +++ b/src/stateless/rx/trex_stateless_capture.h @@ -22,7 +22,146 @@ limitations under the License. #define __TREX_STATELESS_CAPTURE_H__ #include +#include + #include "trex_stateless_pkt.h" +#include "trex_stateless_capture_msg.h" + +typedef int64_t capture_id_t; + +class TrexCaptureRC { +public: + + TrexCaptureRC() { + m_rc = RC_INVALID; + m_pkt_buffer = NULL; + } + + enum rc_e { + RC_INVALID = 0, + RC_OK = 1, + RC_CAPTURE_NOT_FOUND, + RC_CAPTURE_LIMIT_REACHED, + RC_CAPTURE_FETCH_UNDER_ACTIVE + }; + + bool operator !() const { + return (m_rc != RC_OK); + } + + std::string get_err() const { + assert(m_rc != RC_INVALID); + + switch (m_rc) { + case RC_OK: + return ""; + case RC_CAPTURE_LIMIT_REACHED: + return "capture limit has reached"; + case RC_CAPTURE_NOT_FOUND: + return "capture ID not found"; + case RC_CAPTURE_FETCH_UNDER_ACTIVE: + return "fetch command cannot be executed on an active capture"; + default: + assert(0); + } + } + + void set_err(rc_e rc) { + m_rc = rc; + } + + Json::Value get_json() const { + return m_json_rc; + } + +public: + rc_e m_rc; + capture_id_t m_capture_id; + TrexPktBuffer *m_pkt_buffer; + Json::Value m_json_rc; +}; + +class TrexCaptureRCStart : public TrexCaptureRC { +public: + + void set_new_id(capture_id_t new_id) { + m_capture_id = new_id; + m_rc = RC_OK; + } + + capture_id_t get_new_id() const { + return m_capture_id; + } + +private: + capture_id_t m_capture_id; +}; + + +class TrexCaptureRCStop : public TrexCaptureRC { +public: + void set_count(uint32_t pkt_count) { + m_pkt_count = pkt_count; + m_rc = RC_OK; + } + + uint32_t get_pkt_count() const { + return m_pkt_count; + } + +private: + uint32_t m_pkt_count; +}; + +class TrexCaptureRCFetch : public TrexCaptureRC { +public: + + TrexCaptureRCFetch() { + m_pkt_buffer = nullptr; + m_pending = 0; + } + + void set_pkt_buffer(const TrexPktBuffer *pkt_buffer, uint32_t pending) { + m_pkt_buffer = pkt_buffer; + m_pending = pending; + m_rc = RC_OK; + } + + const TrexPktBuffer *get_pkt_buffer() const { + return m_pkt_buffer; + } + + uint32_t get_pending() const { + return m_pending; + } + +private: + const TrexPktBuffer *m_pkt_buffer; + uint32_t m_pending; +}; + +class TrexCaptureRCRemove : public TrexCaptureRC { +public: + void set_ok() { + m_rc = RC_OK; + } +}; + +class TrexCaptureRCStatus : public TrexCaptureRC { +public: + + void set_status(const Json::Value &json) { + m_json = json; + m_rc = RC_OK; + } + + const Json::Value & get_status() const { + return m_json; + } + +private: + Json::Value m_json; +}; /** * capture filter @@ -82,20 +221,27 @@ public: return *this; } + Json::Value to_json() const { + Json::Value output = Json::objectValue; + output["tx"] = Json::UInt64(m_tx_active); + output["rx"] = Json::UInt64(m_rx_active); + + return output; + } + private: uint64_t m_tx_active; uint64_t m_rx_active; }; -typedef int64_t capture_id_t; -enum { - CAPTURE_ID_NOT_FOUND = -1, - CAPTURE_TOO_MANY_CAPTURES = -2, -}; class TrexStatelessCapture { public: + enum state_e { + STATE_ACTIVE, + STATE_STOPPED, + }; TrexStatelessCapture(capture_id_t id, uint64_t limit, const CaptureFilter &filter); @@ -112,7 +258,24 @@ public: return m_filter; } + Json::Value to_json() const; + + void stop() { + m_state = STATE_STOPPED; + } + + TrexPktBuffer * fetch(uint32_t pkt_limit, uint32_t &pending); + + bool is_active() const { + return m_state == STATE_ACTIVE; + } + + uint32_t get_pkt_count() const { + return m_pkt_buffer->get_element_count(); + } + private: + state_e m_state; TrexPktBuffer *m_pkt_buffer; CaptureFilter m_filter; uint64_t m_id; @@ -134,18 +297,28 @@ public: } /** - * adds a capture buffer - * returns ID + * starts a new capture */ - capture_id_t add(uint64_t limit, const CaptureFilter &filter); + void start(const CaptureFilter &filter, uint64_t limit, TrexCaptureRCStart &rc); - /** - * stops capture mode - * on success, will return the ID of the removed one - * o.w it will be an error + * stops an existing capture + * */ - capture_id_t remove(capture_id_t id); + void stop(capture_id_t capture_id, TrexCaptureRCStop &rc); + + /** + * fetch packets from an existing capture + * + */ + void fetch(capture_id_t capture_id, uint32_t pkt_limit, TrexCaptureRCFetch &rc); + + /** + * removes an existing capture + * all packets captured will be detroyed + */ + void remove(capture_id_t capture_id, TrexCaptureRCRemove &rc); + /** * removes all captures @@ -153,6 +326,7 @@ public: */ void reset(); + /** * return true if any filter is active * @@ -182,6 +356,8 @@ public: handle_pkt_rx_slow_path(m, port); } + Json::Value to_json() const; + private: TrexStatelessCaptureMngr() { @@ -189,6 +365,9 @@ private: m_id_counter = 1; } + + TrexStatelessCapture * lookup(capture_id_t capture_id); + void handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port); void update_global_filter(); diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index f1ba303a..00c18082 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -270,10 +270,6 @@ void CRxCoreStateless::start() { m_monitor.disable(); } -void CRxCoreStateless::capture_pkt(rte_mbuf_t *m) { - -} - int CRxCoreStateless::process_all_pending_pkts(bool flush_rx) { int total_pkts = 0; @@ -332,16 +328,6 @@ double CRxCoreStateless::get_cpu_util() { } -capture_id_t -CRxCoreStateless::start_capture(uint64_t limit, const CaptureFilter &filter) { - return TrexStatelessCaptureMngr::getInstance().add(limit, filter); -} - -capture_id_t -CRxCoreStateless::stop_capture(capture_id_t capture_id) { - return TrexStatelessCaptureMngr::getInstance().remove(capture_id); -} - void CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size) { m_rx_port_mngr[port_id].start_queue(size); diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index 21ed51ba..954a5f04 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -131,14 +131,7 @@ class CRxCoreStateless { const TrexPktBuffer *get_rx_queue_pkts(uint8_t port_id) { return m_rx_port_mngr[port_id].get_pkt_buffer(); } - - /** - * start capturing packets - * - */ - capture_id_t start_capture(uint64_t limit, const CaptureFilter &filter); - capture_id_t stop_capture(capture_id_t capture_id); - + /** * start RX queueing of packets * @@ -175,7 +168,6 @@ class CRxCoreStateless { void recalculate_next_state(); bool are_any_features_active(); - void capture_pkt(rte_mbuf_t *m); void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r); void handle_work_stage(); void handle_grat_arp(); -- cgit 1.2.3-korg From 9f72a19a6bb0edf7ad54129f7ad06e8e288a61d7 Mon Sep 17 00:00:00 2001 From: imarom Date: Mon, 16 Jan 2017 18:50:21 +0200 Subject: capture module for console Signed-off-by: imarom --- .../trex_control_plane/stl/console/trex_capture.py | 365 +++++++++++++++++++++ 1 file changed, 365 insertions(+) create mode 100644 scripts/automation/trex_control_plane/stl/console/trex_capture.py (limited to 'scripts') diff --git a/scripts/automation/trex_control_plane/stl/console/trex_capture.py b/scripts/automation/trex_control_plane/stl/console/trex_capture.py new file mode 100644 index 00000000..922497db --- /dev/null +++ b/scripts/automation/trex_control_plane/stl/console/trex_capture.py @@ -0,0 +1,365 @@ +from trex_stl_lib.api import * +from trex_stl_lib.utils import parsing_opts, text_tables +import threading + + +class CaptureMonitorWriter(object): + def init (self): + raise NotImplementedError + + def deinit(self): + raise NotImplementedError + + def handle_pkts (self, pkts): + raise NotImplementedError + + +class CaptureMonitorWriterStdout(CaptureMonitorWriter): + def __init__ (self, logger, is_brief): + self.logger = logger + self.is_brief = is_brief + self.pkt_count = 0 + self.byte_count = 0 + + self.RX_ARROW = u'\u25c0\u2500\u2500' + self.TX_ARROW = u'\u25b6\u2500\u2500' + + def init (self): + self.logger.log(format_text("\nStarting capture monitor on selected ports", 'bold')) + self.logger.log(format_text("*** any captured packet will be displayed on screen ***\n")) + self.logger.log(format_text("('capture monitor --stop' to abort capturing...)\n", 'bold')) + + + def deinit (self): + pass + + + def handle_pkts (self, pkts): + for pkt in pkts: + self.__handle_pkt(pkt) + + self.logger.prompt_redraw() + + def get_scapy_name (self, pkt_scapy): + layer = pkt_scapy + while layer.payload and layer.payload.name not in('Padding', 'Raw'): + layer = layer.payload + + return layer.name + + def format_origin (self, origin): + if origin == 'RX': + return u'{0} {1}'.format(self.RX_ARROW, 'RX') + elif origin == 'TX': + return u'{0} {1}'.format(self.TX_ARROW, 'TX') + else: + return '{0}'.format(origin) + + + def __handle_pkt (self, pkt): + pkt_bin = base64.b64decode(pkt['binary']) + + self.pkt_count += 1 + self.byte_count += len(pkt_bin) + + pkt_scapy = Ether(pkt_bin) + self.logger.log(format_text(u'\n\nPort: {0} {1}\n'.format(pkt['port'], self.format_origin(pkt['origin'])), 'bold', '')) + self.logger.log(format_text(' Type: {:}, Size: {:} B, TS: {:.2f} [sec]\n'.format(self.get_scapy_name(pkt_scapy), len(pkt_bin), pkt['ts']), 'bold')) + + + + if self.is_brief: + self.logger.log(' {0}'.format(pkt_scapy.command())) + else: + pkt_scapy.show(label_lvl = ' ') + self.logger.log('') + + +# +class CaptureMonitorWriterPipe(CaptureMonitorWriter): + def __init__ (self, logger): + self.logger = logger + + def init (self): + self.fifo_name = '/tmp/out.fif' + if os.path.exists(self.fifo_name): + os.unlink(self.fifo_name) + + os.mkfifo(self.fifo_name) + self.fifo = os.open(self.fifo_name, os.O_WRONLY) + + self.writer = RawPcapWriter(self.fifo_name, linktype = 1, sync = True) + self.writer._write_header(None) + + def handle_pkts (self, pkts): + pass + + +class CaptureMonitor(object): + def __init__ (self, client, cmd_lock): + self.client = client + self.cmd_lock = cmd_lock + self.active = False + self.capture_id = -1 + self.logger = client.logger + + + def is_active (self): + return self.active + + + def get_capture_id (self): + return self.capture_id + + + def start (self, tx_port_list, rx_port_list, rate_pps, mon_type): + # stop any previous monitors + if self.active: + self.stop() + + if mon_type == 'compact': + self.writer = CaptureMonitorWriterStdout(self.logger, is_brief = True) + elif mon_type == 'verbose': + self.writer = CaptureMonitorWriterStdout(self.logger, is_brief = False) + elif mon_type == 'pipe': + self.writer = CaptureMonitorWriterPipe(self.logger) + else: + raise STLError('unknown writer type') + + + self.writer.init() + + with self.logger.supress(): + self.capture_id = self.client.start_capture(tx_port_list, rx_port_list, limit = rate_pps) + + self.tx_port_list = tx_port_list + self.rx_port_list = rx_port_list + + self.t = threading.Thread(target = self.__thread_cb) + self.t.setDaemon(True) + + try: + self.active = True + self.t.start() + except Exception as e: + self.active = False + raise e + + + def stop (self): + if self.active: + self.active = False + self.t.join() + + self.client.stop_capture(self.capture_id, None) + self.capture_id = -1 + self.writer.deinit() + + def get_mon_row (self): + if not self.is_active(): + return None + + return [self.capture_id, + self.pkt_count, + format_num(self.byte_count, suffix = 'B'), + ', '.join([str(x) for x in self.tx_port_list] if self.tx_port_list else '-'), + ', '.join([str(x) for x in self.rx_port_list] if self.rx_port_list else '-') + ] + + + # sleeps with high freq checks for active + def __sleep (self): + for _ in range(5): + if not self.active: + return False + + time.sleep(0.1) + + return True + + def __lock (self): + while True: + rc = self.cmd_lock.acquire(False) + if rc: + return True + + if not self.active: + return False + time.sleep(0.1) + + def __unlock (self): + self.cmd_lock.release() + + + def __thread_cb (self): + self.pkt_count = 0 + self.byte_count = 0 + + while self.active: + # sleep + if not self.__sleep(): + break + + # try to lock + if not self.__lock(): + break + + try: + rc = self.client._transmit("capture", params = {'command': 'fetch', 'capture_id': self.capture_id, 'pkt_limit': 10}) + if not rc: + raise STLError(rc) + finally: + self.__unlock() + + + pkts = rc.data()['pkts'] + if not pkts: + continue + + self.writer.handle_pkts(pkts) + + + + +# main class +class CaptureManager(object): + def __init__ (self, client, cmd_lock): + self.c = client + self.cmd_lock = cmd_lock + self.monitor = CaptureMonitor(client, cmd_lock) + self.logger = client.logger + + # install parsers + + self.parser = parsing_opts.gen_parser(self, "capture", self.parse_line_internal.__doc__) + subparsers = self.parser.add_subparsers(title = "commands", dest="commands") + + # start + self.start_parser = subparsers.add_parser('start', help = "starts a new buffered capture") + self.start_parser.add_arg_list(parsing_opts.TX_PORT_LIST, + parsing_opts.RX_PORT_LIST, + parsing_opts.LIMIT) + + # stop + self.stop_parser = subparsers.add_parser('stop', help = "stops an active capture") + self.stop_parser.add_arg_list(parsing_opts.CAPTURE_ID, + parsing_opts.OUTPUT_FILENAME) + + # show + self.show_parser = subparsers.add_parser('show', help = "show all active captures") + + # monitor + self.monitor_parser = subparsers.add_parser('monitor', help = "attach a constant monitor to port(s)") + self.monitor_parser.add_arg_list(parsing_opts.TX_PORT_LIST, + parsing_opts.RX_PORT_LIST, + parsing_opts.MONITOR_TYPE) + + # reset + self.clear_parser = subparsers.add_parser('clear', help = "remove all active captures") + + # register handlers + self.cmds = {'start': self.parse_start, 'stop' : self.parse_stop, 'clear': self.parse_clear, 'monitor': self.parse_monitor, 'show' : self.parse_show} + + + def stop (self): + self.monitor.stop() + + + # main entry point for parsing commands from console + def parse_line (self, line): + try: + self.parse_line_internal(line) + except STLError as e: + self.logger.log("\nAction has failed with the following error:\n" + format_text(e.brief() + "\n", 'bold')) + return RC_ERR(e.brief()) + + + def parse_line_internal (self, line): + '''Manage PCAP recorders''' + + # default + if not line: + line = "show" + + opts = self.parser.parse_args(line.split()) + if not opts: + return opts + + # call the handler + self.cmds[opts.commands](opts) + + + def parse_start (self, opts): + if not opts.tx_port_list and not opts.rx_port_list: + self.start_parser.formatted_error('please provide either --tx or --rx') + return + + self.c.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit) + + + def parse_stop (self, opts): + if self.monitor.is_active() and self.monitor.get_capture_id() == opts.capture_id: + self.monitor.stop() + else: + self.c.stop_capture(opts.capture_id, opts.output_filename) + + + def parse_monitor (self, opts): + mon_type = 'compact' + + if opts.verbose: + mon_type = 'verbose' + elif opts.pipe: + mon_type = 'pipe' + + + self.monitor.stop() + self.monitor.start(opts.tx_port_list, opts.rx_port_list, 10, mon_type) + + + def parse_clear (self, opts): + self.monitor.stop() + self.c.remove_all_captures() + + + + def parse_show (self, opts): + data = self.c.get_capture_status() + + # captures + cap_table = text_tables.TRexTextTable() + cap_table.set_cols_align(["c"] * 6) + cap_table.set_cols_width([15] * 6) + + # monitor + mon_table = text_tables.TRexTextTable() + mon_table.set_cols_align(["c"] * 5) + mon_table.set_cols_width([15] * 5) + + for elem in data: + id = elem['id'] + + if self.monitor.get_capture_id() == id: + row = self.monitor.get_mon_row() + mon_table.add_rows([row], header=False) + + else: + row = [id, + format_text(elem['state'], 'bold'), + '[{0}/{1}]'.format(elem['count'], elem['limit']), + format_num(elem['bytes'], suffix = 'B'), + bitfield_to_str(elem['filter']['tx']), + bitfield_to_str(elem['filter']['rx'])] + + cap_table.add_rows([row], header=False) + + cap_table.header(['ID', 'Status', 'Packets', 'Bytes', 'TX Ports', 'RX Ports']) + mon_table.header(['ID', 'Packets Seen', 'Bytes Seen', 'TX Ports', 'RX Ports']) + + if cap_table._rows: + text_tables.print_table_with_header(cap_table, "Buffers") + + if mon_table._rows: + text_tables.print_table_with_header(mon_table, "Monitors") + + -- cgit 1.2.3-korg From 951b09ef1b892594840f091f861f11ad274541ec Mon Sep 17 00:00:00 2001 From: imarom Date: Wed, 18 Jan 2017 13:08:41 +0200 Subject: many capture modes in Python console Signed-off-by: imarom --- .../trex_control_plane/stl/console/trex_capture.py | 163 +++++++++++++------ .../trex_control_plane/stl/console/trex_console.py | 26 +++- .../stl/trex_stl_lib/trex_stl_client.py | 172 +++++++++------------ .../stl/trex_stl_lib/utils/parsing_opts.py | 29 +++- .../stl/trex_stl_lib/utils/text_opts.py | 16 +- src/rpc-server/commands/trex_rpc_cmd_general.cpp | 43 +++++- src/rpc-server/commands/trex_rpc_cmds.h | 1 + .../messaging/trex_stateless_messaging.cpp | 13 ++ src/stateless/messaging/trex_stateless_messaging.h | 15 ++ src/stateless/rx/trex_stateless_capture.cpp | 9 +- src/stateless/rx/trex_stateless_capture.h | 2 +- 11 files changed, 318 insertions(+), 171 deletions(-) (limited to 'scripts') diff --git a/scripts/automation/trex_control_plane/stl/console/trex_capture.py b/scripts/automation/trex_control_plane/stl/console/trex_capture.py index 922497db..e5708e9b 100644 --- a/scripts/automation/trex_control_plane/stl/console/trex_capture.py +++ b/scripts/automation/trex_control_plane/stl/console/trex_capture.py @@ -1,7 +1,7 @@ from trex_stl_lib.api import * from trex_stl_lib.utils import parsing_opts, text_tables import threading - +import tempfile class CaptureMonitorWriter(object): def init (self): @@ -27,7 +27,7 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter): def init (self): self.logger.log(format_text("\nStarting capture monitor on selected ports", 'bold')) self.logger.log(format_text("*** any captured packet will be displayed on screen ***\n")) - self.logger.log(format_text("('capture monitor --stop' to abort capturing...)\n", 'bold')) + self.logger.log(format_text("('capture monitor stop' to abort capturing...)\n", 'bold')) def deinit (self): @@ -81,18 +81,42 @@ class CaptureMonitorWriterPipe(CaptureMonitorWriter): self.logger = logger def init (self): - self.fifo_name = '/tmp/out.fif' - if os.path.exists(self.fifo_name): - os.unlink(self.fifo_name) + self.fifo_name = tempfile.mktemp() + + try: + os.mkfifo(self.fifo_name) + + self.logger.log(format_text("\nPlease run 'wireshark -k -i {0}'".format(self.fifo_name), 'bold')) + self.logger.log('\nWaiting for Wireshark connection...') + + self.fifo = os.open(self.fifo_name, os.O_WRONLY) + self.logger.log('Successfuly connected to Wireshark...') + self.logger.log(format_text('\n*** Capture monitoring started ***\n', 'bold')) - os.mkfifo(self.fifo_name) - self.fifo = os.open(self.fifo_name, os.O_WRONLY) + self.writer = RawPcapWriter(self.fifo_name, linktype = 1, sync = True) + self.writer._write_header(None) + + except KeyboardInterrupt as e: + os.unlink(self.fifo_name) + raise STLError("*** pipe monitor aborted...cleaning up") + + except OSError as e: + os.unlink(self.fifo_name) + raise STLError("failed to create pipe {0}\n{1}".format(self.fifo_name, str(e))) + + + def deinit (self): + os.unlink(self.fifo_name) + - self.writer = RawPcapWriter(self.fifo_name, linktype = 1, sync = True) - self.writer._write_header(None) - def handle_pkts (self, pkts): - pass + for pkt in pkts: + pkt_bin = base64.b64decode(pkt['binary']) + try: + self.writer._write_packet(pkt_bin, sec = 0, usec = 0) + except IOError: + klgjdf + class CaptureMonitor(object): @@ -127,14 +151,14 @@ class CaptureMonitor(object): raise STLError('unknown writer type') - self.writer.init() - with self.logger.supress(): self.capture_id = self.client.start_capture(tx_port_list, rx_port_list, limit = rate_pps) self.tx_port_list = tx_port_list self.rx_port_list = rx_port_list + self.writer.init() + self.t = threading.Thread(target = self.__thread_cb) self.t.setDaemon(True) @@ -154,6 +178,7 @@ class CaptureMonitor(object): self.client.stop_capture(self.capture_id, None) self.capture_id = -1 self.writer.deinit() + def get_mon_row (self): if not self.is_active(): @@ -215,7 +240,7 @@ class CaptureMonitor(object): pkts = rc.data()['pkts'] if not pkts: continue - + self.writer.handle_pkts(pkts) @@ -232,33 +257,50 @@ class CaptureManager(object): # install parsers self.parser = parsing_opts.gen_parser(self, "capture", self.parse_line_internal.__doc__) - subparsers = self.parser.add_subparsers(title = "commands", dest="commands") + self.subparsers = self.parser.add_subparsers(title = "commands", dest="commands") + + self.install_record_parser() + self.install_monitor_parser() + + # show + self.show_parser = self.subparsers.add_parser('show', help = "show all active captures") + + # reset + self.clear_parser = self.subparsers.add_parser('clear', help = "remove all active captures") + + # register handlers + self.cmds = {'record': self.parse_record, 'monitor' : self.parse_monitor, 'clear': self.parse_clear, 'show' : self.parse_show} + + + def install_record_parser (self): + # recording + self.record_parser = self.subparsers.add_parser('record', help = "PCAP recording") + record_sub = self.record_parser.add_subparsers(title = 'commands', dest = 'record_cmd') + self.record_start_parser = record_sub.add_parser('start', help = "starts a new buffered capture") + self.record_stop_parser = record_sub.add_parser('stop', help = "stops an active buffered capture") # start - self.start_parser = subparsers.add_parser('start', help = "starts a new buffered capture") - self.start_parser.add_arg_list(parsing_opts.TX_PORT_LIST, - parsing_opts.RX_PORT_LIST, - parsing_opts.LIMIT) + self.record_start_parser.add_arg_list(parsing_opts.TX_PORT_LIST, + parsing_opts.RX_PORT_LIST, + parsing_opts.LIMIT) # stop - self.stop_parser = subparsers.add_parser('stop', help = "stops an active capture") - self.stop_parser.add_arg_list(parsing_opts.CAPTURE_ID, - parsing_opts.OUTPUT_FILENAME) - - # show - self.show_parser = subparsers.add_parser('show', help = "show all active captures") + self.record_stop_parser.add_arg_list(parsing_opts.CAPTURE_ID, + parsing_opts.OUTPUT_FILENAME) + + + def install_monitor_parser (self): # monitor - self.monitor_parser = subparsers.add_parser('monitor', help = "attach a constant monitor to port(s)") - self.monitor_parser.add_arg_list(parsing_opts.TX_PORT_LIST, - parsing_opts.RX_PORT_LIST, - parsing_opts.MONITOR_TYPE) + self.monitor_parser = self.subparsers.add_parser('monitor', help = 'live monitoring') + monitor_sub = self.monitor_parser.add_subparsers(title = 'commands', dest = 'mon_cmd') + self.monitor_start_parser = monitor_sub.add_parser('start', help = 'starts a monitor') + self.monitor_stop_parser = monitor_sub.add_parser('stop', help = 'stops an active monitor') + + self.monitor_start_parser.add_arg_list(parsing_opts.TX_PORT_LIST, + parsing_opts.RX_PORT_LIST, + parsing_opts.MONITOR_TYPE) - # reset - self.clear_parser = subparsers.add_parser('clear', help = "remove all active captures") - - # register handlers - self.cmds = {'start': self.parse_start, 'stop' : self.parse_stop, 'clear': self.parse_clear, 'monitor': self.parse_monitor, 'show' : self.parse_show} def stop (self): @@ -289,22 +331,48 @@ class CaptureManager(object): self.cmds[opts.commands](opts) - def parse_start (self, opts): + # record methods + def parse_record (self, opts): + if opts.record_cmd == 'start': + self.parse_record_start(opts) + elif opts.record_cmd == 'stop': + self.parse_record_stop(opts) + else: + assert(0) + + def parse_record_start (self, opts): if not opts.tx_port_list and not opts.rx_port_list: - self.start_parser.formatted_error('please provide either --tx or --rx') + self.record_start_parser.formatted_error('please provide either --tx or --rx') return self.c.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit) - def parse_stop (self, opts): - if self.monitor.is_active() and self.monitor.get_capture_id() == opts.capture_id: - self.monitor.stop() - else: - self.c.stop_capture(opts.capture_id, opts.output_filename) + def parse_record_stop (self, opts): + captures = self.c.get_capture_status() + ids = [c['id'] for c in captures] + + if opts.capture_id == self.monitor.get_capture_id(): + self.record_stop_parser.formatted_error("'{0}' is a monitor, please use 'capture monitor stop'".format(opts.capture_id)) + return + + if opts.capture_id not in ids: + self.record_stop_parser.formatted_error("'{0}' is not an active capture ID".format(opts.capture_id)) + return + + self.c.stop_capture(opts.capture_id, opts.output_filename) + # monitor methods def parse_monitor (self, opts): + if opts.mon_cmd == 'start': + self.parse_monitor_start(opts) + elif opts.mon_cmd == 'stop': + self.parse_monitor_stop(opts) + else: + assert(0) + + def parse_monitor_start (self, opts): mon_type = 'compact' if opts.verbose: @@ -312,10 +380,15 @@ class CaptureManager(object): elif opts.pipe: mon_type = 'pipe' - + if not opts.tx_port_list and not opts.rx_port_list: + self.monitor_start_parser.formatted_error('please provide either --tx or --rx') + return + self.monitor.stop() - self.monitor.start(opts.tx_port_list, opts.rx_port_list, 10, mon_type) + self.monitor.start(opts.tx_port_list, opts.rx_port_list, 100, mon_type) + def parse_monitor_stop (self, opts): + self.monitor.stop() def parse_clear (self, opts): self.monitor.stop() @@ -357,9 +430,9 @@ class CaptureManager(object): mon_table.header(['ID', 'Packets Seen', 'Bytes Seen', 'TX Ports', 'RX Ports']) if cap_table._rows: - text_tables.print_table_with_header(cap_table, "Buffers") + text_tables.print_table_with_header(cap_table, '\nActive Recorders') if mon_table._rows: - text_tables.print_table_with_header(mon_table, "Monitors") + text_tables.print_table_with_header(mon_table, '\nActive Monitor') diff --git a/scripts/automation/trex_control_plane/stl/console/trex_console.py b/scripts/automation/trex_control_plane/stl/console/trex_console.py index b0ab70e0..bf543045 100755 --- a/scripts/automation/trex_control_plane/stl/console/trex_console.py +++ b/scripts/automation/trex_control_plane/stl/console/trex_console.py @@ -29,6 +29,8 @@ import string import os import sys import tty, termios +from threading import Lock +import threading try: import stl_path @@ -39,6 +41,7 @@ from trex_stl_lib.api import * from trex_stl_lib.utils.text_opts import * from trex_stl_lib.utils.common import user_input, get_current_user from trex_stl_lib.utils import parsing_opts +from .trex_capture import CaptureManager try: import trex_tui @@ -172,6 +175,8 @@ class TRexConsole(TRexGeneralCmd): def __init__(self, stateless_client, verbose = False): + self.cmd_lock = Lock() + self.stateless_client = stateless_client TRexGeneralCmd.__init__(self) @@ -184,8 +189,11 @@ class TRexConsole(TRexGeneralCmd): self.intro = "\n-=TRex Console v{ver}=-\n".format(ver=__version__) self.intro += "\nType 'help' or '?' for supported actions\n" + self.cap_mngr = CaptureManager(stateless_client, self.cmd_lock) + self.postcmd(False, "") + ################### internal section ######################## @@ -231,6 +239,7 @@ class TRexConsole(TRexGeneralCmd): lines = line.split(';') try: + self.cmd_lock.acquire() for line in lines: stop = self.onecmd(line) stop = self.postcmd(stop, line) @@ -238,10 +247,15 @@ class TRexConsole(TRexGeneralCmd): return "quit" return "" + except STLError as e: print(e) return '' + finally: + self.cmd_lock.release() + + def postcmd(self, stop, line): self.prompt = self.stateless_client.generate_prompt(prefix = 'trex') @@ -349,7 +363,7 @@ class TRexConsole(TRexGeneralCmd): @verify_connected def do_capture (self, line): '''Manage PCAP captures''' - self.stateless_client.capture_line(line) + self.cap_mngr.parse_line(line) def help_capture (self): self.do_capture("-h") @@ -443,7 +457,9 @@ class TRexConsole(TRexGeneralCmd): def do_disconnect (self, line): '''Disconnect from the server\n''' - + + # stop any monitors before disconnecting + self.cap_mngr.stop() self.stateless_client.disconnect_line(line) @@ -688,6 +704,7 @@ class TRexConsole(TRexGeneralCmd): l=help.splitlines() print("{:<30} {:<30}".format(cmd + " - ",l[0] )) + # a custorm cmdloop wrapper def start(self): while True: @@ -702,6 +719,9 @@ class TRexConsole(TRexGeneralCmd): self.intro = None continue + finally: + self.cap_mngr.stop() + if self.terminal: self.terminal.kill() @@ -934,6 +954,8 @@ def main(): with stateless_client.logger.supress(): stateless_client.disconnect(stop_traffic = False) + + if __name__ == '__main__': main() diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index d75c554e..c632ad7c 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -17,10 +17,12 @@ from .utils.text_opts import * from functools import wraps from texttable import ansi_len + from collections import namedtuple from yaml import YAMLError import time import datetime +import threading import re import random import json @@ -601,7 +603,7 @@ class STLClient(object): self.util_stats, self.xstats, self.async_client.monitor) - + ############# private functions - used by the class itself ########### @@ -2981,7 +2983,7 @@ class STLClient(object): self.logger.post_cmd(rc) if verbose: - for x in filter(bool, rc.data()): + for x in filter(bool, listify(rc.data())): self.logger.log(format_text("{0}".format(x), 'bold')) if not rc: @@ -2999,6 +3001,10 @@ class STLClient(object): tx_ports - on which ports to capture TX rx_ports - on which ports to capture RX limit - limit how many packets will be written + + :returns: + the new capture_id + :raises: + :exe:'STLError' @@ -3018,7 +3024,7 @@ class STLClient(object): non_service_ports = list_difference(set(tx_ports + rx_ports), self.get_service_enabled_ports()) if non_service_ports: - raise STLError("Port(s) {0} are not under service mode. PCAP capturing requires all ports to be in service mode") + raise STLError("Port(s) {0} are not under service mode. PCAP capturing requires all ports to be in service mode".format(non_service_ports)) self.logger.pre_cmd("Starting PCAP capturing up to {0} packets".format(limit)) @@ -3030,8 +3036,39 @@ class STLClient(object): if not rc: raise STLError(rc) + return rc.data()['capture_id'] + + + def __fetch_capture_packets (self, capture_id, output_filename, pkt_count): + self.logger.pre_cmd("Writing {0} packets to '{1}'".format(pkt_count, output_filename)) + + # create a PCAP file + writer = RawPcapWriter(output_filename, linktype = 1) + writer._write_header(None) + + # fetch + pending = pkt_count + rc = RC_OK() + while pending > 0: + rc = self._transmit("capture", params = {'command': 'fetch', 'capture_id': capture_id, 'pkt_limit': 50}) + if not rc: + self.logger.post_cmd(rc) + raise STLError(rc) + + pkts = rc.data()['pkts'] + for pkt in pkts: + ts = pkt['ts'] + pkt_bin = base64.b64decode(pkt['binary']) + writer._write_packet(pkt_bin, sec = 0, usec = 0) + + pending = rc.data()['pending'] + + + self.logger.post_cmd(rc) + + @__api_check(True) def stop_capture (self, capture_id, output_filename): """ @@ -3045,8 +3082,6 @@ class STLClient(object): + :exe:'STLError' """ - - # stopping a capture requires: # 1. stopping @@ -3063,36 +3098,19 @@ class STLClient(object): # pkt count pkt_count = rc.data()['pkt_count'] - - if not output_filename or pkt_count == 0: - return - self.logger.pre_cmd("Writing {0} packets to '{1}'".format(pkt_count, output_filename)) - - # create a PCAP file - writer = RawPcapWriter(output_filename, linktype = 1) - writer._write_header(None) - - # fetch - while True: - rc = self._transmit("capture", params = {'command': 'fetch', 'capture_id': capture_id, 'pkt_limit': 50}) - if not rc: - self.logger.post_cmd(rc) - raise STLError(rc) - - pkts = rc.data()['pkts'] - for pkt in pkts: - ts = pkt['ts'] - pkt_bin = base64.b64decode(pkt['binary']) - writer._write_packet(pkt_bin, sec = 0, usec = 0) - - if rc.data()['pending'] == 0: - break + # fetch packets + if output_filename: + self.__fetch_capture_packets(capture_id, output_filename, pkt_count) + # remove + self.logger.pre_cmd("Removing PCAP capture {0} from server".format(capture_id)) + rc = self._transmit("capture", params = {'command': 'remove', 'capture_id': capture_id}) self.logger.post_cmd(rc) + if not rc: + raise STLError(rc) - # get capture status @__api_check(True) def get_capture_status (self): """ @@ -3109,7 +3127,25 @@ class STLClient(object): return rc.data() - + @__api_check(True) + def remove_all_captures (self): + """ + Removes any existing captures + """ + captures = self.get_capture_status() + + self.logger.pre_cmd("Removing all PCAP captures from server") + + for c in captures: + # remove + rc = self._transmit("capture", params = {'command': 'remove', 'capture_id': c['id']}) + if not rc: + raise STLError(rc) + + self.logger.post_cmd(RC_OK()) + + + @__api_check(True) def set_rx_queue (self, ports = None, size = 1000): """ @@ -3230,6 +3266,7 @@ class STLClient(object): return wrap + @__console def ping_line (self, line): '''pings the server / specific IP''' @@ -3820,77 +3857,9 @@ class STLClient(object): opts.link, opts.led, opts.flow_ctrl) + + - - - - @__console - def capture_line (self, line): - '''Manage PCAP recorders''' - - # default - if not line: - line = "show" - - parser = parsing_opts.gen_parser(self, "capture", self.capture_line.__doc__) - subparsers = parser.add_subparsers(title = "commands", dest="commands") - - # start - start_parser = subparsers.add_parser('start', help = "starts a new capture") - start_parser.add_arg_list(parsing_opts.TX_PORT_LIST, - parsing_opts.RX_PORT_LIST, - parsing_opts.LIMIT) - - # stop - stop_parser = subparsers.add_parser('stop', help = "stops an active capture") - stop_parser.add_arg_list(parsing_opts.CAPTURE_ID, - parsing_opts.OUTPUT_FILENAME) - - # show - show_parser = subparsers.add_parser('show', help = "show all active captures") - - opts = parser.parse_args(line.split()) - - if not opts: - return opts - - # start - if opts.commands == 'start': - if not opts.tx_port_list and not opts.rx_port_list: - start_parser.formatted_error('please provide either --tx or --rx') - return - - self.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit) - - # stop - elif opts.commands == 'stop': - self.stop_capture(opts.capture_id, opts.output_filename) - - # show - else: - data = self.get_capture_status() - - stats_table = text_tables.TRexTextTable() - stats_table.set_cols_align(["c"] * 6) - stats_table.set_cols_width([15] * 6) - - for elem in data: - row = [elem['id'], - elem['state'], - '[{0}/{1}]'.format(elem['count'], elem['limit']), - format_num(elem['bytes'], suffix = 'B'), - bitfield_to_str(elem['filter']['tx']), - bitfield_to_str(elem['filter']['rx'])] - - stats_table.add_rows([row], header=False) - - stats_table.header(['ID', 'Status', 'Count', 'Bytes', 'TX Ports', 'RX Ports']) - text_tables.print_table_with_header(stats_table, "Captures") - - - return RC_OK() - - @__console def resolve_line (self, line): @@ -4089,3 +4058,4 @@ class STLClient(object): self.set_service_mode(ports = opts.ports, enabled = opts.enabled) + diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py index cb594ef4..8d3aedbe 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py @@ -85,6 +85,10 @@ STREAMS_MASK CORE_MASK_GROUP CAPTURE_PORTS_GROUP +MONITOR_TYPE_VERBOSE +MONITOR_TYPE_PIPE +MONITOR_TYPE + # ALL_STREAMS # STREAM_LIST_WITH_ALL @@ -606,6 +610,7 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'], 'help': 'A list of ports to capture on the TX side', 'default': []}), + RX_PORT_LIST: ArgumentPack(['--rx'], {'nargs': '+', 'dest':'rx_port_list', @@ -614,7 +619,21 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'], 'type': int, 'help': 'A list of ports to capture on the RX side', 'default': []}), - + + + MONITOR_TYPE_VERBOSE: ArgumentPack(['-v', '--verbose'], + {'action': 'store_true', + 'dest': 'verbose', + 'default': False, + 'help': 'output to screen as verbose'}), + + MONITOR_TYPE_PIPE: ArgumentPack(['-p', '--pipe'], + {'action': 'store_true', + 'dest': 'pipe', + 'default': False, + 'help': 'forward packets to a pipe'}), + + CAPTURE_ID: ArgumentPack(['-i', '--id'], {'help': "capture ID to remove", 'dest': "capture_id", @@ -646,6 +665,12 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'], {'required': False}), CAPTURE_PORTS_GROUP: ArgumentGroup(NON_MUTEX, [TX_PORT_LIST, RX_PORT_LIST], {}), + + + MONITOR_TYPE: ArgumentGroup(MUTEX, [MONITOR_TYPE_VERBOSE, + MONITOR_TYPE_PIPE], + {'required': False}), + } class _MergeAction(argparse._AppendAction): @@ -760,7 +785,7 @@ class CCmdArgParser(argparse.ArgumentParser): def formatted_error (self, msg): self.print_usage() - self.stateless_client.logger.log(msg) + self._print_message(('%s: error: %s\n') % (self.prog, msg)) def get_flags (opt): diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py index 63b05bf4..3ffd07e2 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py @@ -133,12 +133,16 @@ def underline(text): # apply attribute on each non-empty line def text_attribute(text, attribute): - return '\n'.join(['{start}{txt}{end}'.format( - start = TEXT_CODES[attribute]['start'], - txt = line, - end = TEXT_CODES[attribute]['end']) - if line else '' for line in ('%s' % text).split('\n')]) - + if isinstance(text, str): + return "{start}{txt}{stop}".format(start=TEXT_CODES[attribute]['start'], + txt=text, + stop=TEXT_CODES[attribute]['end']) + elif isinstance(text, unicode): + return u"{start}{txt}{stop}".format(start=TEXT_CODES[attribute]['start'], + txt=text, + stop=TEXT_CODES[attribute]['end']) + else: + raise Exception("not a string") FUNC_DICT = {'blue': blue, 'bold': bold, diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index 80f69fa3..8f7431e4 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -853,7 +853,7 @@ TrexRpcCmdSetL3::_run(const Json::Value ¶ms, Json::Value &result) { */ trex_rpc_cmd_rc_e TrexRpcCmdCapture::_run(const Json::Value ¶ms, Json::Value &result) { - const std::string cmd = parse_choice(params, "command", {"start", "stop", "fetch", "status"}, result); + const std::string cmd = parse_choice(params, "command", {"start", "stop", "fetch", "status", "remove"}, result); if (cmd == "start") { parse_cmd_start(params, result); @@ -863,6 +863,8 @@ TrexRpcCmdCapture::_run(const Json::Value ¶ms, Json::Value &result) { parse_cmd_fetch(params, result); } else if (cmd == "status") { parse_cmd_status(params, result); + } else if (cmd == "remove") { + parse_cmd_remove(params, result); } else { /* can't happen */ assert(0); @@ -878,10 +880,9 @@ TrexRpcCmdCapture::_run(const Json::Value ¶ms, Json::Value &result) { void TrexRpcCmdCapture::parse_cmd_start(const Json::Value ¶ms, Json::Value &result) { - uint32_t limit = parse_uint32(params, "limit", result); - const Json::Value &tx_json = parse_array(params, "tx", result); - const Json::Value &rx_json = parse_array(params, "rx", result); - + uint32_t limit = parse_uint32(params, "limit", result); + const Json::Value &tx_json = parse_array(params, "tx", result); + const Json::Value &rx_json = parse_array(params, "rx", result); CaptureFilter filter; std::set ports; @@ -909,7 +910,7 @@ TrexRpcCmdCapture::parse_cmd_start(const Json::Value ¶ms, Json::Value &resul static MsgReply reply; reply.reset(); - + TrexStatelessRxCaptureStart *start_msg = new TrexStatelessRxCaptureStart(filter, limit, reply); get_stateless_obj()->send_msg_to_rx(start_msg); @@ -918,7 +919,7 @@ TrexRpcCmdCapture::parse_cmd_start(const Json::Value ¶ms, Json::Value &resul generate_execute_err(result, rc.get_err()); } - result["result"] = Json::objectValue; + result["result"]["capture_id"] = rc.get_new_id(); } /** @@ -990,7 +991,33 @@ TrexRpcCmdCapture::parse_cmd_fetch(const Json::Value ¶ms, Json::Value &resul generate_execute_err(result, rc.get_err()); } - result["result"]["pkts"] = rc.get_pkt_buffer()->to_json(); + const TrexPktBuffer *pkt_buffer = rc.get_pkt_buffer(); + result["result"]["pending"] = rc.get_pending(); + result["result"]["pkts"] = pkt_buffer->to_json(); + + /* delete the buffer */ + delete pkt_buffer; +} + +void +TrexRpcCmdCapture::parse_cmd_remove(const Json::Value ¶ms, Json::Value &result) { + + uint32_t capture_id = parse_uint32(params, "capture_id", result); + + /* generate a remove command */ + + static MsgReply reply; + reply.reset(); + + TrexStatelessRxCaptureRemove *remove_msg = new TrexStatelessRxCaptureRemove(capture_id, reply); + get_stateless_obj()->send_msg_to_rx(remove_msg); + + TrexCaptureRCRemove rc = reply.wait_for_reply(); + if (!rc) { + generate_execute_err(result, rc.get_err()); + } + + result["result"] = Json::objectValue; } diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h index bf78ff80..54797bdf 100644 --- a/src/rpc-server/commands/trex_rpc_cmds.h +++ b/src/rpc-server/commands/trex_rpc_cmds.h @@ -165,6 +165,7 @@ TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdCapture, "capture", 1, false, APIClass:: void parse_cmd_stop(const Json::Value &msg, Json::Value &result); void parse_cmd_status(const Json::Value &msg, Json::Value &result); void parse_cmd_fetch(const Json::Value &msg, Json::Value &result); + void parse_cmd_remove(const Json::Value ¶ms, Json::Value &result); ); diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index b9bb1d1c..2452487c 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -313,6 +313,19 @@ TrexStatelessRxCaptureStatus::handle(CRxCoreStateless *rx_core) { return true; } +bool +TrexStatelessRxCaptureRemove::handle(CRxCoreStateless *rx_core) { + + TrexCaptureRCRemove remove_rc; + + TrexStatelessCaptureMngr::getInstance().remove(m_capture_id, remove_rc); + + /* mark as done */ + m_reply.set_reply(remove_rc); + + return true; +} + bool TrexStatelessRxStartQueue::handle(CRxCoreStateless *rx_core) { diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index 4027d075..3535ad4f 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -552,6 +552,21 @@ private: }; + +class TrexStatelessRxCaptureRemove : public TrexStatelessRxCapture { +public: + TrexStatelessRxCaptureRemove(capture_id_t capture_id, MsgReply &reply) : m_reply(reply) { + m_capture_id = capture_id; + } + + virtual bool handle(CRxCoreStateless *rx_core); + +private: + capture_id_t m_capture_id; + MsgReply &m_reply; +}; + + class TrexStatelessRxStartQueue : public TrexStatelessCpToRxMsgBase { public: TrexStatelessRxStartQueue(uint8_t port_id, diff --git a/src/stateless/rx/trex_stateless_capture.cpp b/src/stateless/rx/trex_stateless_capture.cpp index 85be7aef..5d43cede 100644 --- a/src/stateless/rx/trex_stateless_capture.cpp +++ b/src/stateless/rx/trex_stateless_capture.cpp @@ -62,7 +62,7 @@ TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) { return; } - m_pkt_buffer->push(m); + m_pkt_buffer->push(m, port, TrexPkt::ORIGIN_RX); } @@ -87,7 +87,6 @@ TrexStatelessCapture::to_json() const { default: assert(0); - } return output; @@ -178,10 +177,6 @@ TrexStatelessCaptureMngr::fetch(capture_id_t capture_id, uint32_t pkt_limit, Tre rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND); return; } - if (capture->is_active()) { - rc.set_err(TrexCaptureRC::RC_CAPTURE_FETCH_UNDER_ACTIVE); - return; - } uint32_t pending = 0; TrexPktBuffer *pkt_buffer = capture->fetch(pkt_limit, pending); @@ -214,6 +209,8 @@ TrexStatelessCaptureMngr::remove(capture_id_t capture_id, TrexCaptureRCRemove &r /* update global filter */ update_global_filter(); + + rc.set_ok(); } void diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h index 6cd25a94..4a9efea7 100644 --- a/src/stateless/rx/trex_stateless_capture.h +++ b/src/stateless/rx/trex_stateless_capture.h @@ -27,7 +27,7 @@ limitations under the License. #include "trex_stateless_pkt.h" #include "trex_stateless_capture_msg.h" -typedef int64_t capture_id_t; +typedef int32_t capture_id_t; class TrexCaptureRC { public: -- cgit 1.2.3-korg From 641fed03d8e407b6dca94f5280b9a1b4c768f601 Mon Sep 17 00:00:00 2001 From: imarom Date: Thu, 19 Jan 2017 13:30:48 +0200 Subject: fine tune Signed-off-by: imarom --- .../trex_control_plane/stl/console/trex_capture.py | 174 ++++++++++++++++----- .../stl/trex_stl_lib/trex_stl_client.py | 7 +- src/rpc-server/commands/trex_rpc_cmd_general.cpp | 1 + 3 files changed, 143 insertions(+), 39 deletions(-) (limited to 'scripts') diff --git a/scripts/automation/trex_control_plane/stl/console/trex_capture.py b/scripts/automation/trex_control_plane/stl/console/trex_capture.py index e5708e9b..67b6c08c 100644 --- a/scripts/automation/trex_control_plane/stl/console/trex_capture.py +++ b/scripts/automation/trex_control_plane/stl/console/trex_capture.py @@ -2,9 +2,10 @@ from trex_stl_lib.api import * from trex_stl_lib.utils import parsing_opts, text_tables import threading import tempfile +import select class CaptureMonitorWriter(object): - def init (self): + def init (self, start_ts): raise NotImplementedError def deinit(self): @@ -13,6 +14,9 @@ class CaptureMonitorWriter(object): def handle_pkts (self, pkts): raise NotImplementedError + def periodic_check (self): + raise NotImplementedError + class CaptureMonitorWriterStdout(CaptureMonitorWriter): def __init__ (self, logger, is_brief): @@ -24,21 +28,29 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter): self.RX_ARROW = u'\u25c0\u2500\u2500' self.TX_ARROW = u'\u25b6\u2500\u2500' - def init (self): - self.logger.log(format_text("\nStarting capture monitor on selected ports", 'bold')) - self.logger.log(format_text("*** any captured packet will be displayed on screen ***\n")) - self.logger.log(format_text("('capture monitor stop' to abort capturing...)\n", 'bold')) + def init (self, start_ts): + self.start_ts = start_ts + + self.logger.pre_cmd("Starting stdout capture monitor - verbose: '{0}'".format('low' if self.is_brief else 'high')) + self.logger.post_cmd(RC_OK) + + self.logger.log(format_text("\n*** use 'capture monitor stop' to abort capturing... ***\n", 'bold')) def deinit (self): pass + + def periodic_check (self): + return RC_OK() def handle_pkts (self, pkts): for pkt in pkts: self.__handle_pkt(pkt) self.logger.prompt_redraw() + return True + def get_scapy_name (self, pkt_scapy): layer = pkt_scapy @@ -46,6 +58,7 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter): layer = layer.payload return layer.name + def format_origin (self, origin): if origin == 'RX': @@ -63,11 +76,10 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter): self.byte_count += len(pkt_bin) pkt_scapy = Ether(pkt_bin) - self.logger.log(format_text(u'\n\nPort: {0} {1}\n'.format(pkt['port'], self.format_origin(pkt['origin'])), 'bold', '')) - self.logger.log(format_text(' Type: {:}, Size: {:} B, TS: {:.2f} [sec]\n'.format(self.get_scapy_name(pkt_scapy), len(pkt_bin), pkt['ts']), 'bold')) + self.logger.log(format_text(u'\n\n#{} Port: {} {}\n'.format(self.pkt_count, pkt['port'], self.format_origin(pkt['origin'])), 'bold', '')) + self.logger.log(format_text(' Type: {}, Size: {} B, TS: {:.2f} [sec]\n'.format(self.get_scapy_name(pkt_scapy), len(pkt_bin), pkt['ts'] - self.start_ts), 'bold')) - if self.is_brief: self.logger.log(' {0}'.format(pkt_scapy.command())) else: @@ -80,43 +92,74 @@ class CaptureMonitorWriterPipe(CaptureMonitorWriter): def __init__ (self, logger): self.logger = logger - def init (self): + def init (self, start_ts): + self.start_ts = start_ts self.fifo_name = tempfile.mktemp() try: + self.logger.pre_cmd('Starting pipe capture monitor') os.mkfifo(self.fifo_name) + self.logger.post_cmd(RC_OK) - self.logger.log(format_text("\nPlease run 'wireshark -k -i {0}'".format(self.fifo_name), 'bold')) - self.logger.log('\nWaiting for Wireshark connection...') + self.logger.log(format_text("*** Please run 'wireshark -k -i {0}' ***".format(self.fifo_name), 'bold')) + self.logger.pre_cmd("Waiting for Wireshark pipe connection") self.fifo = os.open(self.fifo_name, os.O_WRONLY) - self.logger.log('Successfuly connected to Wireshark...') + self.logger.post_cmd(RC_OK()) + self.logger.log(format_text('\n*** Capture monitoring started ***\n', 'bold')) self.writer = RawPcapWriter(self.fifo_name, linktype = 1, sync = True) self.writer._write_header(None) - + + # register a poller + self.poll = select.poll() + self.poll.register(self.fifo, select.EPOLLERR) + except KeyboardInterrupt as e: - os.unlink(self.fifo_name) + self.logger.post_cmd(RC_ERR("")) raise STLError("*** pipe monitor aborted...cleaning up") except OSError as e: - os.unlink(self.fifo_name) + self.logger.post_cmd(RC_ERR("")) raise STLError("failed to create pipe {0}\n{1}".format(self.fifo_name, str(e))) def deinit (self): - os.unlink(self.fifo_name) + try: + os.unlink(self.fifo_name) + except OSError: + pass + + def periodic_check (self): + return self.check_pipe() + + + def check_pipe (self): + if self.poll.poll(0): + return RC_ERR('*** pipe has been disconnected - aborting monitoring ***') + + return RC_OK() + def handle_pkts (self, pkts): + rc = self.check_pipe() + if not rc: + return rc + for pkt in pkts: pkt_bin = base64.b64decode(pkt['binary']) + ts = pkt['ts'] - self.start_ts + sec = int(ts) + usec = int( (ts - sec) * 1e6 ) + try: - self.writer._write_packet(pkt_bin, sec = 0, usec = 0) + self.writer._write_packet(pkt_bin, sec = sec, usec = usec) except IOError: - klgjdf - + return RC_ERR("*** failed to write packet to pipe ***") + + return RC_OK() class CaptureMonitor(object): @@ -124,9 +167,9 @@ class CaptureMonitor(object): self.client = client self.cmd_lock = cmd_lock self.active = False - self.capture_id = -1 + self.capture_id = None self.logger = client.logger - + self.writer = None def is_active (self): return self.active @@ -137,10 +180,20 @@ class CaptureMonitor(object): def start (self, tx_port_list, rx_port_list, rate_pps, mon_type): + try: + self.start_internal(tx_port_list, rx_port_list, rate_pps, mon_type) + except Exception as e: + self.__stop() + raise e + + def start_internal (self, tx_port_list, rx_port_list, rate_pps, mon_type): # stop any previous monitors if self.active: self.stop() + self.tx_port_list = tx_port_list + self.rx_port_list = rx_port_list + if mon_type == 'compact': self.writer = CaptureMonitorWriterStdout(self.logger, is_brief = True) elif mon_type == 'verbose': @@ -152,13 +205,14 @@ class CaptureMonitor(object): with self.logger.supress(): - self.capture_id = self.client.start_capture(tx_port_list, rx_port_list, limit = rate_pps) - - self.tx_port_list = tx_port_list - self.rx_port_list = rx_port_list + data = self.client.start_capture(tx_port_list, rx_port_list, limit = rate_pps) - self.writer.init() + self.capture_id = data['id'] + self.start_ts = data['ts'] + self.writer.init(self.start_ts) + + self.t = threading.Thread(target = self.__thread_cb) self.t.setDaemon(True) @@ -167,19 +221,43 @@ class CaptureMonitor(object): self.t.start() except Exception as e: self.active = False + self.client.stop_capture(self.capture_id) raise e - + def stop (self): + self.logger.pre_cmd("Stopping capture monitor") + try: + self.__stop() + except Exception as e: + self.logger.post_cmd(RC_ERR("")) + raise e + + self.logger.post_cmd(RC_OK()) + + def __stop (self): + + # shutdown thread if self.active: self.active = False self.t.join() - self.client.stop_capture(self.capture_id, None) - self.capture_id = -1 + # deinit the writer + if self.writer is not None: self.writer.deinit() - + self.writer = None + + # cleanup capture ID + if self.capture_id is not None: + try: + with self.logger.supress(): + self.client.stop_capture(self.capture_id) + self.capture_id = None + except STLError as e: + self.logger.post_cmd(RC_ERR("")) + raise e + def get_mon_row (self): if not self.is_active(): return None @@ -215,8 +293,20 @@ class CaptureMonitor(object): def __unlock (self): self.cmd_lock.release() - + def __thread_cb (self): + try: + rc = self.__thread_main_loop() + finally: + pass + + if not rc: + self.logger.log(str(rc)) + self.logger.log(format_text('\n*** monitor is inactive - please restart the monitor ***\n', 'bold')) + self.logger.prompt_redraw() + + + def __thread_main_loop (self): self.pkt_count = 0 self.byte_count = 0 @@ -225,6 +315,11 @@ class CaptureMonitor(object): if not self.__sleep(): break + # check that the writer is ok + rc = self.writer.periodic_check() + if not rc: + return rc + # try to lock if not self.__lock(): break @@ -232,7 +327,8 @@ class CaptureMonitor(object): try: rc = self.client._transmit("capture", params = {'command': 'fetch', 'capture_id': self.capture_id, 'pkt_limit': 10}) if not rc: - raise STLError(rc) + return rc + finally: self.__unlock() @@ -241,9 +337,13 @@ class CaptureMonitor(object): if not pkts: continue - self.writer.handle_pkts(pkts) + rc = self.writer.handle_pkts(pkts) + if not rc: + return rc - + # graceful shutdown + return RC_OK() + # main class @@ -338,7 +438,8 @@ class CaptureManager(object): elif opts.record_cmd == 'stop': self.parse_record_stop(opts) else: - assert(0) + self.record_parser.formatted_error("too few arguments") + def parse_record_start (self, opts): if not opts.tx_port_list and not opts.rx_port_list: @@ -370,7 +471,8 @@ class CaptureManager(object): elif opts.mon_cmd == 'stop': self.parse_monitor_stop(opts) else: - assert(0) + self.monitor_parser.formatted_error("too few arguments") + def parse_monitor_start (self, opts): mon_type = 'compact' diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index c632ad7c..5435619a 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -3003,7 +3003,8 @@ class STLClient(object): limit - limit how many packets will be written :returns: - the new capture_id + returns a dictionary containing + {'id: , 'ts': } :raises: + :exe:'STLError' @@ -3036,7 +3037,7 @@ class STLClient(object): if not rc: raise STLError(rc) - return rc.data()['capture_id'] + return {'id': rc.data()['capture_id'], 'ts': rc.data()['ts']} @@ -3070,7 +3071,7 @@ class STLClient(object): @__api_check(True) - def stop_capture (self, capture_id, output_filename): + def stop_capture (self, capture_id, output_filename = None): """ Stops an active capture diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index 8f7431e4..be261fbb 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -920,6 +920,7 @@ TrexRpcCmdCapture::parse_cmd_start(const Json::Value ¶ms, Json::Value &resul } result["result"]["capture_id"] = rc.get_new_id(); + result["result"]["ts"] = now_sec(); } /** -- cgit 1.2.3-korg From f5f92b068561dcdf8414494e5daf6d285ea24135 Mon Sep 17 00:00:00 2001 From: imarom Date: Sun, 22 Jan 2017 15:36:20 +0200 Subject: few tweaks Signed-off-by: imarom --- .../trex_control_plane/stl/console/trex_capture.py | 90 +++++++++++++++------- .../trex_control_plane/stl/console/trex_console.py | 29 +++---- .../stl/trex_stl_lib/trex_stl_client.py | 14 +++- .../stl/trex_stl_lib/trex_stl_jsonrpc_client.py | 4 +- src/rpc-server/commands/trex_rpc_cmd_general.cpp | 5 +- src/stateless/rx/trex_stateless_capture.cpp | 19 +++-- src/stateless/rx/trex_stateless_capture.h | 24 ++++-- 7 files changed, 124 insertions(+), 61 deletions(-) (limited to 'scripts') diff --git a/scripts/automation/trex_control_plane/stl/console/trex_capture.py b/scripts/automation/trex_control_plane/stl/console/trex_capture.py index 67b6c08c..dfd7f0a4 100644 --- a/scripts/automation/trex_control_plane/stl/console/trex_capture.py +++ b/scripts/automation/trex_control_plane/stl/console/trex_capture.py @@ -22,8 +22,6 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter): def __init__ (self, logger, is_brief): self.logger = logger self.is_brief = is_brief - self.pkt_count = 0 - self.byte_count = 0 self.RX_ARROW = u'\u25c0\u2500\u2500' self.TX_ARROW = u'\u25b6\u2500\u2500' @@ -45,11 +43,13 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter): return RC_OK() def handle_pkts (self, pkts): + byte_count = 0 + for pkt in pkts: - self.__handle_pkt(pkt) + byte_count += self.__handle_pkt(pkt) self.logger.prompt_redraw() - return True + return RC_OK(byte_count) def get_scapy_name (self, pkt_scapy): @@ -72,11 +72,8 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter): def __handle_pkt (self, pkt): pkt_bin = base64.b64decode(pkt['binary']) - self.pkt_count += 1 - self.byte_count += len(pkt_bin) - pkt_scapy = Ether(pkt_bin) - self.logger.log(format_text(u'\n\n#{} Port: {} {}\n'.format(self.pkt_count, pkt['port'], self.format_origin(pkt['origin'])), 'bold', '')) + self.logger.log(format_text(u'\n\n#{} Port: {} {}\n'.format(pkt['index'], pkt['port'], self.format_origin(pkt['origin'])), 'bold', '')) self.logger.log(format_text(' Type: {}, Size: {} B, TS: {:.2f} [sec]\n'.format(self.get_scapy_name(pkt_scapy), len(pkt_bin), pkt['ts'] - self.start_ts), 'bold')) @@ -86,6 +83,7 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter): pkt_scapy.show(label_lvl = ' ') self.logger.log('') + return len(pkt_bin) # class CaptureMonitorWriterPipe(CaptureMonitorWriter): @@ -148,18 +146,22 @@ class CaptureMonitorWriterPipe(CaptureMonitorWriter): if not rc: return rc + byte_count = 0 + for pkt in pkts: pkt_bin = base64.b64decode(pkt['binary']) - ts = pkt['ts'] - self.start_ts - sec = int(ts) - usec = int( (ts - sec) * 1e6 ) + ts = pkt['ts'] + sec = int(ts) + usec = int( (ts - sec) * 1e6 ) try: self.writer._write_packet(pkt_bin, sec = sec, usec = usec) except IOError: return RC_ERR("*** failed to write packet to pipe ***") - - return RC_OK() + + byte_count += len(pkt_bin) + + return RC_OK(byte_count) class CaptureMonitor(object): @@ -170,7 +172,7 @@ class CaptureMonitor(object): self.capture_id = None self.logger = client.logger self.writer = None - + def is_active (self): return self.active @@ -179,7 +181,7 @@ class CaptureMonitor(object): return self.capture_id - def start (self, tx_port_list, rx_port_list, rate_pps, mon_type): + def start (self, tx_port_list, rx_port_list, rate_pps, mon_type): try: self.start_internal(tx_port_list, rx_port_list, rate_pps, mon_type) except Exception as e: @@ -221,12 +223,21 @@ class CaptureMonitor(object): self.t.start() except Exception as e: self.active = False - self.client.stop_capture(self.capture_id) + self.stop() raise e - + # entry point stop def stop (self): + + if self.active: + self.stop_logged() + else: + self.__stop() + + # wraps stop with a logging + def stop_logged (self): self.logger.pre_cmd("Stopping capture monitor") + try: self.__stop() except Exception as e: @@ -235,6 +246,7 @@ class CaptureMonitor(object): self.logger.post_cmd(RC_OK()) + # internal stop def __stop (self): # shutdown thread @@ -247,15 +259,28 @@ class CaptureMonitor(object): self.writer.deinit() self.writer = None - # cleanup capture ID - if self.capture_id is not None: - try: - with self.logger.supress(): - self.client.stop_capture(self.capture_id) - self.capture_id = None - except STLError as e: - self.logger.post_cmd(RC_ERR("")) - raise e + # cleanup capture ID if possible + if self.capture_id is None: + return + + capture_id = self.capture_id + self.capture_id = None + + # if we are disconnected - we cannot cleanup the capture + if not self.client.is_connected(): + return + + try: + captures = [x['id'] for x in self.client.get_capture_status()] + if capture_id not in captures: + return + + with self.logger.supress(): + self.client.stop_capture(capture_id) + + except STLError as e: + self.logger.post_cmd(RC_ERR("")) + raise e def get_mon_row (self): @@ -311,6 +336,7 @@ class CaptureMonitor(object): self.byte_count = 0 while self.active: + # sleep if not self.__sleep(): break @@ -325,6 +351,8 @@ class CaptureMonitor(object): break try: + if not self.client.is_connected(): + return RC_ERR('*** client has been disconnected, aborting monitoring ***') rc = self.client._transmit("capture", params = {'command': 'fetch', 'capture_id': self.capture_id, 'pkt_limit': 10}) if not rc: return rc @@ -340,6 +368,9 @@ class CaptureMonitor(object): rc = self.writer.handle_pkts(pkts) if not rc: return rc + + self.pkt_count += len(pkts) + self.byte_count += rc.data() # graceful shutdown return RC_OK() @@ -446,8 +477,11 @@ class CaptureManager(object): self.record_start_parser.formatted_error('please provide either --tx or --rx') return - self.c.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit) - + rc = self.c.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit) + + self.logger.log(format_text("*** Capturing ID is set to '{0}' ***".format(rc['id']), 'bold')) + self.logger.log(format_text("*** Please call 'capture record stop --id {0} -o ' when done ***\n".format(rc['id']), 'bold')) + def parse_record_stop (self, opts): captures = self.c.get_capture_status() diff --git a/scripts/automation/trex_control_plane/stl/console/trex_console.py b/scripts/automation/trex_control_plane/stl/console/trex_console.py index bf543045..270ef31c 100755 --- a/scripts/automation/trex_control_plane/stl/console/trex_console.py +++ b/scripts/automation/trex_control_plane/stl/console/trex_console.py @@ -707,20 +707,21 @@ class TRexConsole(TRexGeneralCmd): # a custorm cmdloop wrapper def start(self): - while True: - try: - self.cmdloop() - break - except KeyboardInterrupt as e: - if not readline.get_line_buffer(): - raise KeyboardInterrupt - else: - print("") - self.intro = None - continue - - finally: - self.cap_mngr.stop() + try: + while True: + try: + self.cmdloop() + break + except KeyboardInterrupt as e: + if not readline.get_line_buffer(): + raise KeyboardInterrupt + else: + print("") + self.intro = None + continue + + finally: + self.cap_mngr.stop() if self.terminal: self.terminal.kill() diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index 5435619a..c82d77fb 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -3058,13 +3058,19 @@ class STLClient(object): self.logger.post_cmd(rc) raise STLError(rc) - pkts = rc.data()['pkts'] + pkts = rc.data()['pkts'] + pending = rc.data()['pending'] + start_ts = rc.data()['start_ts'] + for pkt in pkts: - ts = pkt['ts'] + ts = pkt['ts'] - start_ts + ts_sec = int(ts) + ts_usec = int( (ts - ts_sec) * 1e6 ) + pkt_bin = base64.b64decode(pkt['binary']) - writer._write_packet(pkt_bin, sec = 0, usec = 0) + writer._write_packet(pkt_bin, sec = ts_sec, usec = ts_usec) - pending = rc.data()['pending'] + self.logger.post_cmd(rc) diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py index 72c9317a..ff07b59a 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py @@ -184,7 +184,7 @@ class JsonRpcClient(object): break except zmq.Again: tries += 1 - if tries > 5: + if tries > 0: self.disconnect() return RC_ERR("*** [RPC] - Failed to send message to server") @@ -200,7 +200,7 @@ class JsonRpcClient(object): break except zmq.Again: tries += 1 - if tries > 5: + if tries > 0: self.disconnect() return RC_ERR("*** [RPC] - Failed to get server response from {0}".format(self.transport)) diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index be261fbb..55249fc8 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -994,8 +994,9 @@ TrexRpcCmdCapture::parse_cmd_fetch(const Json::Value ¶ms, Json::Value &resul const TrexPktBuffer *pkt_buffer = rc.get_pkt_buffer(); - result["result"]["pending"] = rc.get_pending(); - result["result"]["pkts"] = pkt_buffer->to_json(); + result["result"]["pending"] = rc.get_pending(); + result["result"]["start_ts"] = rc.get_start_ts(); + result["result"]["pkts"] = pkt_buffer->to_json(); /* delete the buffer */ delete pkt_buffer; diff --git a/src/stateless/rx/trex_stateless_capture.cpp b/src/stateless/rx/trex_stateless_capture.cpp index 5d43cede..f0d4e806 100644 --- a/src/stateless/rx/trex_stateless_capture.cpp +++ b/src/stateless/rx/trex_stateless_capture.cpp @@ -26,6 +26,8 @@ TrexStatelessCapture::TrexStatelessCapture(capture_id_t id, uint64_t limit, cons m_pkt_buffer = new TrexPktBuffer(limit, TrexPktBuffer::MODE_DROP_TAIL); m_filter = filter; m_state = STATE_ACTIVE; + m_start_ts = now_sec(); + m_pkt_index = 0; } TrexStatelessCapture::~TrexStatelessCapture() { @@ -35,7 +37,7 @@ TrexStatelessCapture::~TrexStatelessCapture() { } void -TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) { +TrexStatelessCapture::handle_pkt_tx(TrexPkt *pkt) { if (m_state != STATE_ACTIVE) { delete pkt; @@ -48,6 +50,12 @@ TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) { return; } + if (pkt->get_ts() < m_start_ts) { + delete pkt; + return; + } + + pkt->set_index(++m_pkt_index); m_pkt_buffer->push(pkt); } @@ -62,7 +70,7 @@ TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) { return; } - m_pkt_buffer->push(m, port, TrexPkt::ORIGIN_RX); + m_pkt_buffer->push(m, port, TrexPkt::ORIGIN_RX, ++m_pkt_index); } @@ -110,7 +118,8 @@ TrexStatelessCapture::fetch(uint32_t pkt_limit, uint32_t &pending) { partial->push(pkt); } - pending = m_pkt_buffer->get_element_count(); + pending = m_pkt_buffer->get_element_count(); + return partial; } @@ -181,7 +190,7 @@ TrexStatelessCaptureMngr::fetch(capture_id_t capture_id, uint32_t pkt_limit, Tre uint32_t pending = 0; TrexPktBuffer *pkt_buffer = capture->fetch(pkt_limit, pending); - rc.set_pkt_buffer(pkt_buffer, pending); + rc.set_pkt_buffer(pkt_buffer, pending, capture->get_start_ts()); } void @@ -223,7 +232,7 @@ TrexStatelessCaptureMngr::reset() { } void -TrexStatelessCaptureMngr::handle_pkt_tx(const TrexPkt *pkt) { +TrexStatelessCaptureMngr::handle_pkt_tx(TrexPkt *pkt) { for (TrexStatelessCapture *capture : m_captures) { capture->handle_pkt_tx(pkt); } diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h index 4a9efea7..bc1b88c5 100644 --- a/src/stateless/rx/trex_stateless_capture.h +++ b/src/stateless/rx/trex_stateless_capture.h @@ -121,10 +121,11 @@ public: m_pending = 0; } - void set_pkt_buffer(const TrexPktBuffer *pkt_buffer, uint32_t pending) { - m_pkt_buffer = pkt_buffer; - m_pending = pending; - m_rc = RC_OK; + void set_pkt_buffer(const TrexPktBuffer *pkt_buffer, uint32_t pending, dsec_t start_ts) { + m_pkt_buffer = pkt_buffer; + m_pending = pending; + m_start_ts = start_ts; + m_rc = RC_OK; } const TrexPktBuffer *get_pkt_buffer() const { @@ -135,9 +136,14 @@ public: return m_pending; } + dsec_t get_start_ts() const { + return m_start_ts; + } + private: const TrexPktBuffer *m_pkt_buffer; uint32_t m_pending; + dsec_t m_start_ts; }; class TrexCaptureRCRemove : public TrexCaptureRC { @@ -245,7 +251,7 @@ public: TrexStatelessCapture(capture_id_t id, uint64_t limit, const CaptureFilter &filter); - void handle_pkt_tx(const TrexPkt *pkt); + void handle_pkt_tx(TrexPkt *pkt); void handle_pkt_rx(const rte_mbuf_t *m, int port); ~TrexStatelessCapture(); @@ -274,11 +280,17 @@ public: return m_pkt_buffer->get_element_count(); } + dsec_t get_start_ts() const { + return m_start_ts; + } + private: state_e m_state; TrexPktBuffer *m_pkt_buffer; + dsec_t m_start_ts; CaptureFilter m_filter; uint64_t m_id; + uint64_t m_pkt_index; }; class TrexStatelessCaptureMngr { @@ -341,7 +353,7 @@ public: /** * handle packet from TX */ - void handle_pkt_tx(const TrexPkt *pkt); + void handle_pkt_tx(TrexPkt *pkt); /** * handle packet from RX -- cgit 1.2.3-korg From 19df06349d311377ca1ef10f91ef1f786b41418b Mon Sep 17 00:00:00 2001 From: imarom Date: Tue, 24 Jan 2017 14:11:32 +0200 Subject: code review cleanups - C++ Signed-off-by: imarom --- .../stl/trex_stl_lib/trex_stl_client.py | 2 +- src/rpc-server/commands/trex_rpc_cmd_general.cpp | 7 ++- src/stateless/common/trex_stateless_pkt.cpp | 18 +++++-- src/stateless/common/trex_stateless_pkt.h | 58 ++++++++++++++++++---- src/stateless/rx/trex_stateless_capture.cpp | 8 +-- src/stateless/rx/trex_stateless_capture.h | 15 ++++-- src/stateless/rx/trex_stateless_rx_port_mngr.cpp | 2 +- 7 files changed, 83 insertions(+), 27 deletions(-) (limited to 'scripts') diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index f7432107..d81765c6 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -3049,7 +3049,7 @@ class STLClient(object): if not rc: raise STLError(rc) - return {'id': rc.data()['capture_id'], 'ts': rc.data()['ts']} + return {'id': rc.data()['capture_id'], 'ts': rc.data()['start_ts']} diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index 6f0ab09a..c20c77d4 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -901,12 +901,16 @@ TrexRpcCmdCapture::parse_cmd_start(const Json::Value ¶ms, Json::Value &resul /* populate the filter */ for (int i = 0; i < tx_json.size(); i++) { uint8_t tx_port = parse_byte(tx_json, i, result); + validate_port_id(tx_port, result); + filter.add_tx(tx_port); ports.insert(tx_port); } for (int i = 0; i < rx_json.size(); i++) { uint8_t rx_port = parse_byte(rx_json, i, result); + validate_port_id(rx_port, result); + filter.add_rx(rx_port); ports.insert(rx_port); } @@ -922,6 +926,7 @@ TrexRpcCmdCapture::parse_cmd_start(const Json::Value ¶ms, Json::Value &resul static MsgReply reply; reply.reset(); + /* send a start message to RX core */ TrexStatelessRxCaptureStart *start_msg = new TrexStatelessRxCaptureStart(filter, limit, reply); get_stateless_obj()->send_msg_to_rx(start_msg); @@ -931,7 +936,7 @@ TrexRpcCmdCapture::parse_cmd_start(const Json::Value ¶ms, Json::Value &resul } result["result"]["capture_id"] = rc.get_new_id(); - result["result"]["ts"] = now_sec(); + result["result"]["start_ts"] = rc.get_start_ts(); } /** diff --git a/src/stateless/common/trex_stateless_pkt.cpp b/src/stateless/common/trex_stateless_pkt.cpp index f7d47ec0..14c14462 100644 --- a/src/stateless/common/trex_stateless_pkt.cpp +++ b/src/stateless/common/trex_stateless_pkt.cpp @@ -34,7 +34,7 @@ * * @return uint8_t* */ -void copy_mbuf(uint8_t *dest, const rte_mbuf_t *m) { +void mbuf_to_buffer(uint8_t *dest, const rte_mbuf_t *m) { int index = 0; for (const rte_mbuf_t *it = m; it != NULL; it = it->next) { @@ -55,7 +55,7 @@ TrexPkt::TrexPkt(const rte_mbuf_t *m, int port, origin_e origin, uint64_t index) m_raw = new uint8_t[m_size]; /* copy data */ - copy_mbuf(m_raw, m); + mbuf_to_buffer(m_raw, m); /* generate a packet timestamp */ m_timestamp = now_sec(); @@ -76,6 +76,12 @@ TrexPkt::TrexPkt(const TrexPkt &other) { m_index = other.m_index; } + +/************************************** + * TRex packet buffer + * + *************************************/ + TrexPktBuffer::TrexPktBuffer(uint64_t size, mode_e mode) { m_mode = mode; m_buffer = nullptr; @@ -117,13 +123,14 @@ TrexPktBuffer::push(const rte_mbuf_t *m, int port, TrexPkt::origin_e origin, uin /* push packet */ m_buffer[m_head] = new TrexPkt(m, port, origin, pkt_index); m_bytes += m_buffer[m_head]->get_size(); - - m_head = next(m_head); + /* advance */ + m_head = next(m_head); } /** * packet will be handled internally + * packet pointer is invalid after this call */ void TrexPktBuffer::push(const TrexPkt *pkt) { @@ -140,6 +147,8 @@ TrexPktBuffer::push(const TrexPkt *pkt) { /* push packet */ m_buffer[m_head] = pkt; + m_bytes += pkt->get_size(); + m_head = next(m_head); } @@ -179,4 +188,3 @@ TrexPktBuffer::to_json() const { return output; } - diff --git a/src/stateless/common/trex_stateless_pkt.h b/src/stateless/common/trex_stateless_pkt.h index 1b6bd2f8..573f4950 100644 --- a/src/stateless/common/trex_stateless_pkt.h +++ b/src/stateless/common/trex_stateless_pkt.h @@ -32,33 +32,45 @@ /** * copies MBUF to a flat buffer * - * @author imarom (1/1/2017) - * - * @param dest - * @param m */ -void copy_mbuf(uint8_t *dest, const rte_mbuf_t *m); +void mbuf_to_buffer(uint8_t *dest, const rte_mbuf_t *m); -/** - * describes a single saved packet +/************************************** + * TRex packet * - */ + *************************************/ class TrexPkt { public: + /** + * origin of the created packet + */ enum origin_e { ORIGIN_NONE = 1, ORIGIN_TX, ORIGIN_RX }; + /** + * generate a packet from MBUF + */ TrexPkt(const rte_mbuf_t *m, int port = -1, origin_e origin = ORIGIN_NONE, uint64_t index = 0); + + /** + * duplicate an existing packet + */ TrexPkt(const TrexPkt &other); + + /** + * sets a packet index + * used by a buffer of packets + */ void set_index(uint64_t index) { m_index = index; } + /* slow path and also RVO - pass by value is ok */ Json::Value to_json() const { Json::Value output; @@ -115,6 +127,10 @@ private: }; +/************************************** + * TRex packet buffer + * + *************************************/ class TrexPktBuffer { public: @@ -136,10 +152,23 @@ public: ~TrexPktBuffer(); /** - * push a packet to the buffer - * + * push a packet to the buffer + * packet will be generated from a MBUF + * + */ + void push(const rte_mbuf_t *m, + int port = -1, + TrexPkt::origin_e origin = TrexPkt::ORIGIN_NONE, + uint64_t pkt_index = 0); + + /** + * push an existing packet structure + * packet will *not* be duplicated + * + * after calling this function + * the packet is no longer usable + * from caller prespective */ - void push(const rte_mbuf_t *m, int port = -1, TrexPkt::origin_e origin = TrexPkt::ORIGIN_NONE, uint64_t pkt_index = 0); void push(const TrexPkt *pkt); /** @@ -171,6 +200,10 @@ public: return (m_size - 1); } + /** + * see mode_e + * + */ mode_e get_mode() const { return m_mode; } @@ -180,6 +213,9 @@ public: */ uint32_t get_element_count() const; + /** + * current bytes holded by the buffer + */ uint32_t get_bytes() const { return m_bytes; } diff --git a/src/stateless/rx/trex_stateless_capture.cpp b/src/stateless/rx/trex_stateless_capture.cpp index f0d4e806..7b020444 100644 --- a/src/stateless/rx/trex_stateless_capture.cpp +++ b/src/stateless/rx/trex_stateless_capture.cpp @@ -157,14 +157,14 @@ TrexStatelessCaptureMngr::start(const CaptureFilter &filter, uint64_t limit, Tre int new_id = m_id_counter++; - TrexStatelessCapture *new_buffer = new TrexStatelessCapture(new_id, limit, filter); - m_captures.push_back(new_buffer); + TrexStatelessCapture *new_capture = new TrexStatelessCapture(new_id, limit, filter); + m_captures.push_back(new_capture); /* update global filter */ update_global_filter(); /* result */ - rc.set_new_id(new_id); + rc.set_rc(new_id, new_capture->get_start_ts()); } void @@ -176,7 +176,7 @@ TrexStatelessCaptureMngr::stop(capture_id_t capture_id, TrexCaptureRCStop &rc) { } capture->stop(); - rc.set_count(capture->get_pkt_count()); + rc.set_rc(capture->get_pkt_count()); } void diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h index 0f98fd95..852aee2a 100644 --- a/src/stateless/rx/trex_stateless_capture.h +++ b/src/stateless/rx/trex_stateless_capture.h @@ -83,23 +83,30 @@ public: class TrexCaptureRCStart : public TrexCaptureRC { public: - void set_new_id(capture_id_t new_id) { - m_capture_id = new_id; - m_rc = RC_OK; + void set_rc(capture_id_t new_id, dsec_t start_ts) { + m_capture_id = new_id; + m_start_ts = start_ts; + m_rc = RC_OK; + } capture_id_t get_new_id() const { return m_capture_id; } + dsec_t get_start_ts() const { + return m_start_ts; + } + private: capture_id_t m_capture_id; + dsec_t m_start_ts; }; class TrexCaptureRCStop : public TrexCaptureRC { public: - void set_count(uint32_t pkt_count) { + void set_rc(uint32_t pkt_count) { m_pkt_count = pkt_count; m_rc = RC_OK; } diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp index ede86062..b01665ec 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp @@ -492,7 +492,7 @@ RXServer::duplicate_mbuf(const rte_mbuf_t *m) { } /* copy data */ - copy_mbuf(dest, m); + mbuf_to_buffer(dest, m); return clone_mbuf; } -- cgit 1.2.3-korg From 3689edf311778c8cb921db61f293db6cd43a9b14 Mon Sep 17 00:00:00 2001 From: imarom Date: Wed, 25 Jan 2017 13:54:51 +0200 Subject: capture - personal code review Signed-off-by: imarom --- .../trex_control_plane/stl/console/trex_capture.py | 18 +- .../stl/trex_stl_lib/trex_stl_client.py | 19 +- src/rpc-server/commands/trex_rpc_cmd_general.cpp | 8 +- src/stateless/common/trex_stateless_pkt.cpp | 43 +++- src/stateless/common/trex_stateless_pkt.h | 23 ++- .../messaging/trex_stateless_messaging.cpp | 4 +- src/stateless/messaging/trex_stateless_messaging.h | 3 + src/stateless/rx/trex_stateless_capture.cpp | 98 ++++++--- src/stateless/rx/trex_stateless_capture.h | 229 +++++++-------------- src/stateless/rx/trex_stateless_capture_rc.h | 195 ++++++++++++++++++ 10 files changed, 420 insertions(+), 220 deletions(-) create mode 100644 src/stateless/rx/trex_stateless_capture_rc.h (limited to 'scripts') diff --git a/scripts/automation/trex_control_plane/stl/console/trex_capture.py b/scripts/automation/trex_control_plane/stl/console/trex_capture.py index dfd7f0a4..aac685a1 100644 --- a/scripts/automation/trex_control_plane/stl/console/trex_capture.py +++ b/scripts/automation/trex_control_plane/stl/console/trex_capture.py @@ -88,7 +88,10 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter): # class CaptureMonitorWriterPipe(CaptureMonitorWriter): def __init__ (self, logger): - self.logger = logger + self.logger = logger + self.fifo_name = None + self.fifo = None + self.start_ts = None def init (self, start_ts): self.start_ts = start_ts @@ -125,7 +128,14 @@ class CaptureMonitorWriterPipe(CaptureMonitorWriter): def deinit (self): try: - os.unlink(self.fifo_name) + if self.fifo: + os.close(self.fifo) + self.fifo = None + + if self.fifo_name: + os.unlink(self.fifo_name) + self.fifo_name = None + except OSError: pass @@ -207,7 +217,7 @@ class CaptureMonitor(object): with self.logger.supress(): - data = self.client.start_capture(tx_port_list, rx_port_list, limit = rate_pps) + data = self.client.start_capture(tx_port_list, rx_port_list, limit = rate_pps, mode = 'cyclic') self.capture_id = data['id'] self.start_ts = data['ts'] @@ -477,7 +487,7 @@ class CaptureManager(object): self.record_start_parser.formatted_error('please provide either --tx or --rx') return - rc = self.c.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit) + rc = self.c.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit, mode = 'fixed') self.logger.log(format_text("*** Capturing ID is set to '{0}' ***".format(rc['id']), 'bold')) self.logger.log(format_text("*** Please call 'capture record stop --id {0} -o ' when done ***\n".format(rc['id']), 'bold')) diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index d81765c6..654ceaf6 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -3005,7 +3005,7 @@ class STLClient(object): @__api_check(True) - def start_capture (self, tx_ports, rx_ports, limit = 1000): + def start_capture (self, tx_ports, rx_ports, limit = 1000, mode = 'fixed'): """ Starts a capture to PCAP on port(s) @@ -3014,6 +3014,11 @@ class STLClient(object): rx_ports - on which ports to capture RX limit - limit how many packets will be written + mode - 'fixed': when full, future packets will be + dropped + 'cyclic: when full, oldest packets will be + dropped + :returns: returns a dictionary containing {'id: , 'ts': } @@ -3023,6 +3028,7 @@ class STLClient(object): """ + # check arguments tx_ports = self._validate_port_list(tx_ports, allow_empty = True) rx_ports = self._validate_port_list(rx_ports, allow_empty = True) merge_ports = set(tx_ports + rx_ports) @@ -3030,28 +3036,29 @@ class STLClient(object): if not merge_ports: raise STLError("start_capture - must get at least one port to capture") - # check arguments validate_type('limit', limit, (int)) if limit <= 0: raise STLError("'limit' must be a positive value") + if mode not in ('fixed', 'cyclic'): + raise STLError("'mode' must be either 'fixed' or 'cyclic'") + + # verify service mode non_service_ports = list_difference(set(tx_ports + rx_ports), self.get_service_enabled_ports()) if non_service_ports: raise STLError("Port(s) {0} are not under service mode. PCAP capturing requires all ports to be in service mode".format(non_service_ports)) + # actual job self.logger.pre_cmd("Starting PCAP capturing up to {0} packets".format(limit)) - - rc = self._transmit("capture", params = {'command': 'start', 'limit': limit, 'tx': tx_ports, 'rx': rx_ports}) + rc = self._transmit("capture", params = {'command': 'start', 'limit': limit, 'mode': mode, 'tx': tx_ports, 'rx': rx_ports}) self.logger.post_cmd(rc) - if not rc: raise STLError(rc) return {'id': rc.data()['capture_id'], 'ts': rc.data()['start_ts']} - def __fetch_capture_packets (self, capture_id, output_filename, pkt_count): diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index c20c77d4..54798abb 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -892,6 +892,12 @@ void TrexRpcCmdCapture::parse_cmd_start(const Json::Value ¶ms, Json::Value &result) { uint32_t limit = parse_uint32(params, "limit", result); + + /* parse mode type */ + const std::string mode_str = parse_choice(params, "mode", {"fixed", "cyclic"}, result); + TrexPktBuffer::mode_e mode = ( (mode_str == "fixed") ? TrexPktBuffer::MODE_DROP_TAIL : TrexPktBuffer::MODE_DROP_HEAD); + + /* parse filters */ const Json::Value &tx_json = parse_array(params, "tx", result); const Json::Value &rx_json = parse_array(params, "rx", result); CaptureFilter filter; @@ -927,7 +933,7 @@ TrexRpcCmdCapture::parse_cmd_start(const Json::Value ¶ms, Json::Value &resul reply.reset(); /* send a start message to RX core */ - TrexStatelessRxCaptureStart *start_msg = new TrexStatelessRxCaptureStart(filter, limit, reply); + TrexStatelessRxCaptureStart *start_msg = new TrexStatelessRxCaptureStart(filter, limit, mode, reply); get_stateless_obj()->send_msg_to_rx(start_msg); TrexCaptureRCStart rc = reply.wait_for_reply(); diff --git a/src/stateless/common/trex_stateless_pkt.cpp b/src/stateless/common/trex_stateless_pkt.cpp index 14c14462..43cbbe1c 100644 --- a/src/stateless/common/trex_stateless_pkt.cpp +++ b/src/stateless/common/trex_stateless_pkt.cpp @@ -120,12 +120,7 @@ TrexPktBuffer::push(const rte_mbuf_t *m, int port, TrexPkt::origin_e origin, uin } } - /* push packet */ - m_buffer[m_head] = new TrexPkt(m, port, origin, pkt_index); - m_bytes += m_buffer[m_head]->get_size(); - - /* advance */ - m_head = next(m_head); + push_internal(new TrexPkt(m, port, origin, pkt_index)); } /** @@ -133,19 +128,32 @@ TrexPktBuffer::push(const rte_mbuf_t *m, int port, TrexPkt::origin_e origin, uin * packet pointer is invalid after this call */ void -TrexPktBuffer::push(const TrexPkt *pkt) { +TrexPktBuffer::push(const TrexPkt *pkt, uint64_t pkt_index) { /* if full - decide by the policy */ if (is_full()) { if (m_mode == MODE_DROP_HEAD) { delete pop(); } else { /* drop the tail (current packet) */ - delete pkt; return; } } - /* push packet */ + /* duplicate packet */ + TrexPkt *dup = new TrexPkt(*pkt); + + /* update packet index if given */ + if (pkt_index != 0) { + dup->set_index(pkt_index); + } + + push_internal(dup); +} + + +void +TrexPktBuffer::push_internal(const TrexPkt *pkt) { + /* push the packet */ m_buffer[m_head] = pkt; m_bytes += pkt->get_size(); @@ -188,3 +196,20 @@ TrexPktBuffer::to_json() const { return output; } +TrexPktBuffer * +TrexPktBuffer::pop_n(uint32_t count) { + /* can't pop more than total */ + assert(count <= get_element_count()); + + // TODO: consider returning NULL if no packets exists + // to avoid mallocing + + TrexPktBuffer *partial = new TrexPktBuffer(count); + + for (int i = 0; i < count; i++) { + const TrexPkt *pkt = pop(); + partial->push_internal(pkt); + } + + return partial; +} diff --git a/src/stateless/common/trex_stateless_pkt.h b/src/stateless/common/trex_stateless_pkt.h index 573f4950..f44355dc 100644 --- a/src/stateless/common/trex_stateless_pkt.h +++ b/src/stateless/common/trex_stateless_pkt.h @@ -70,6 +70,9 @@ public: m_index = index; } + uint64_t get_index() const { + return m_index; + } /* slow path and also RVO - pass by value is ok */ Json::Value to_json() const { @@ -163,13 +166,10 @@ public: /** * push an existing packet structure - * packet will *not* be duplicated - * - * after calling this function - * the packet is no longer usable - * from caller prespective + * packet will be duplicated + * if pkt_index is non zero - it will be updated */ - void push(const TrexPkt *pkt); + void push(const TrexPkt *pkt, uint64_t pkt_index = 0); /** * pops a packet from the buffer @@ -177,6 +177,15 @@ public: */ const TrexPkt * pop(); + /** + * pops N packets from the buffer + * N must be <= get_element_count() + * + * returns a new buffer + */ + TrexPktBuffer * pop_n(uint32_t count); + + /** * generate a JSON output of the queue * @@ -225,6 +234,8 @@ private: return ( (v + 1) % m_size ); } + void push_internal(const TrexPkt *pkt); + mode_e m_mode; int m_head; int m_tail; diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index 21fe7a13..f89ca343 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -266,7 +266,7 @@ TrexStatelessRxCaptureStart::handle(CRxCoreStateless *rx_core) { TrexCaptureRCStart start_rc; - TrexStatelessCaptureMngr::getInstance().start(m_filter, m_limit, start_rc); + TrexStatelessCaptureMngr::getInstance().start(m_filter, m_limit, m_mode, start_rc); /* mark as done */ m_reply.set_reply(start_rc); @@ -305,7 +305,7 @@ TrexStatelessRxCaptureStatus::handle(CRxCoreStateless *rx_core) { TrexCaptureRCStatus status_rc; - status_rc.set_status(TrexStatelessCaptureMngr::getInstance().to_json()); + status_rc.set_rc(TrexStatelessCaptureMngr::getInstance().to_json()); /* mark as done */ m_reply.set_reply(status_rc); diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index ed14b100..cd79d6e7 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -494,10 +494,12 @@ class TrexStatelessRxCaptureStart : public TrexStatelessRxCapture { public: TrexStatelessRxCaptureStart(const CaptureFilter& filter, uint64_t limit, + TrexPktBuffer::mode_e mode, MsgReply &reply) : m_reply(reply) { m_limit = limit; m_filter = filter; + m_mode = mode; } virtual bool handle(CRxCoreStateless *rx_core); @@ -506,6 +508,7 @@ private: uint8_t m_port_id; uint64_t m_limit; CaptureFilter m_filter; + TrexPktBuffer::mode_e m_mode; MsgReply &m_reply; }; diff --git a/src/stateless/rx/trex_stateless_capture.cpp b/src/stateless/rx/trex_stateless_capture.cpp index 7b020444..bf7623d5 100644 --- a/src/stateless/rx/trex_stateless_capture.cpp +++ b/src/stateless/rx/trex_stateless_capture.cpp @@ -21,9 +21,17 @@ limitations under the License. #include "trex_stateless_capture.h" #include "trex_exception.h" -TrexStatelessCapture::TrexStatelessCapture(capture_id_t id, uint64_t limit, const CaptureFilter &filter) { +/************************************** + * Capture + * + * A single instance of a capture + *************************************/ +TrexStatelessCapture::TrexStatelessCapture(capture_id_t id, + uint64_t limit, + const CaptureFilter &filter, + TrexPktBuffer::mode_e mode) { m_id = id; - m_pkt_buffer = new TrexPktBuffer(limit, TrexPktBuffer::MODE_DROP_TAIL); + m_pkt_buffer = new TrexPktBuffer(limit, mode); m_filter = filter; m_state = STATE_ACTIVE; m_start_ts = now_sec(); @@ -37,26 +45,22 @@ TrexStatelessCapture::~TrexStatelessCapture() { } void -TrexStatelessCapture::handle_pkt_tx(TrexPkt *pkt) { +TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) { if (m_state != STATE_ACTIVE) { - delete pkt; return; } /* if not in filter - back off */ if (!m_filter.in_filter(pkt)) { - delete pkt; return; } if (pkt->get_ts() < m_start_ts) { - delete pkt; return; } - pkt->set_index(++m_pkt_index); - m_pkt_buffer->push(pkt); + m_pkt_buffer->push(pkt, ++m_pkt_index); } void @@ -100,9 +104,13 @@ TrexStatelessCapture::to_json() const { return output; } +/** + * fetch up to 'pkt_limit' from the capture + * + */ TrexPktBuffer * TrexStatelessCapture::fetch(uint32_t pkt_limit, uint32_t &pending) { - + /* if the total sum of packets is within the limit range - take it */ if (m_pkt_buffer->get_element_count() <= pkt_limit) { TrexPktBuffer *current = m_pkt_buffer; @@ -111,22 +119,29 @@ TrexStatelessCapture::fetch(uint32_t pkt_limit, uint32_t &pending) { return current; } - /* harder part - partial fetch */ - TrexPktBuffer *partial = new TrexPktBuffer(pkt_limit); - for (int i = 0; i < pkt_limit; i++) { - const TrexPkt *pkt = m_pkt_buffer->pop(); - partial->push(pkt); - } - + /* partial fetch - take a partial list */ + TrexPktBuffer *partial = m_pkt_buffer->pop_n(pkt_limit); pending = m_pkt_buffer->get_element_count(); return partial; } + +/************************************** + * Capture Manager + * handles all the captures + * in the system + *************************************/ + +/** + * holds the global filter in the capture manager + * which ports in the entire system are monitored + */ void TrexStatelessCaptureMngr::update_global_filter() { CaptureFilter new_filter; + /* recalculates the global filter */ for (TrexStatelessCapture *capture : m_captures) { new_filter += capture->get_filter(); } @@ -134,6 +149,10 @@ TrexStatelessCaptureMngr::update_global_filter() { m_global_filter = new_filter; } + +/** + * lookup a specific capture by ID + */ TrexStatelessCapture * TrexStatelessCaptureMngr::lookup(capture_id_t capture_id) { @@ -147,17 +166,37 @@ TrexStatelessCaptureMngr::lookup(capture_id_t capture_id) { return nullptr; } + +int +TrexStatelessCaptureMngr::lookup_index(capture_id_t capture_id) { + for (int i = 0; i < m_captures.size(); i++) { + if (m_captures[i]->get_id() == capture_id) { + return i; + } + } + return -1; +} + + +/** + * starts a new capture + * + */ void -TrexStatelessCaptureMngr::start(const CaptureFilter &filter, uint64_t limit, TrexCaptureRCStart &rc) { +TrexStatelessCaptureMngr::start(const CaptureFilter &filter, + uint64_t limit, + TrexPktBuffer::mode_e mode, + TrexCaptureRCStart &rc) { - if (m_captures.size() > MAX_CAPTURE_SIZE) { + /* check for maximum active captures */ + if (m_captures.size() >= MAX_CAPTURE_SIZE) { rc.set_err(TrexCaptureRC::RC_CAPTURE_LIMIT_REACHED); return; } - + /* create a new capture*/ int new_id = m_id_counter++; - TrexStatelessCapture *new_capture = new TrexStatelessCapture(new_id, limit, filter); + TrexStatelessCapture *new_capture = new TrexStatelessCapture(new_id, limit, filter, mode); m_captures.push_back(new_capture); /* update global filter */ @@ -179,6 +218,7 @@ TrexStatelessCaptureMngr::stop(capture_id_t capture_id, TrexCaptureRCStop &rc) { rc.set_rc(capture->get_pkt_count()); } + void TrexStatelessCaptureMngr::fetch(capture_id_t capture_id, uint32_t pkt_limit, TrexCaptureRCFetch &rc) { TrexStatelessCapture *capture = lookup(capture_id); @@ -190,21 +230,14 @@ TrexStatelessCaptureMngr::fetch(capture_id_t capture_id, uint32_t pkt_limit, Tre uint32_t pending = 0; TrexPktBuffer *pkt_buffer = capture->fetch(pkt_limit, pending); - rc.set_pkt_buffer(pkt_buffer, pending, capture->get_start_ts()); + rc.set_rc(pkt_buffer, pending, capture->get_start_ts()); } void TrexStatelessCaptureMngr::remove(capture_id_t capture_id, TrexCaptureRCRemove &rc) { - - int index = -1; - for (int i = 0; i < m_captures.size(); i++) { - if (m_captures[i]->get_id() == capture_id) { - index = i; - break; - } - } - /* does not exist */ + /* lookup index */ + int index = lookup_index(capture_id); if (index == -1) { rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND); return; @@ -219,7 +252,7 @@ TrexStatelessCaptureMngr::remove(capture_id_t capture_id, TrexCaptureRCRemove &r /* update global filter */ update_global_filter(); - rc.set_ok(); + rc.set_rc(); } void @@ -228,11 +261,12 @@ TrexStatelessCaptureMngr::reset() { while (m_captures.size() > 0) { remove(m_captures[0]->get_id(), dummy); + assert(!!dummy); } } void -TrexStatelessCaptureMngr::handle_pkt_tx(TrexPkt *pkt) { +TrexStatelessCaptureMngr::handle_pkt_tx_slow_path(const TrexPkt *pkt) { for (TrexStatelessCapture *capture : m_captures) { capture->handle_pkt_tx(pkt); } diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h index 852aee2a..e4a2e632 100644 --- a/src/stateless/rx/trex_stateless_capture.h +++ b/src/stateless/rx/trex_stateless_capture.h @@ -25,160 +25,14 @@ limitations under the License. #include #include "trex_stateless_pkt.h" +#include "trex_stateless_capture_rc.h" -typedef int32_t capture_id_t; -class TrexCaptureRC { -public: - - TrexCaptureRC() { - m_rc = RC_INVALID; - m_pkt_buffer = NULL; - } - - enum rc_e { - RC_INVALID = 0, - RC_OK = 1, - RC_CAPTURE_NOT_FOUND, - RC_CAPTURE_LIMIT_REACHED, - RC_CAPTURE_FETCH_UNDER_ACTIVE - }; - - bool operator !() const { - return (m_rc != RC_OK); - } - - std::string get_err() const { - assert(m_rc != RC_INVALID); - - switch (m_rc) { - case RC_OK: - return ""; - case RC_CAPTURE_LIMIT_REACHED: - return "capture limit has reached"; - case RC_CAPTURE_NOT_FOUND: - return "capture ID not found"; - case RC_CAPTURE_FETCH_UNDER_ACTIVE: - return "fetch command cannot be executed on an active capture"; - default: - assert(0); - } - } - - void set_err(rc_e rc) { - m_rc = rc; - } - - Json::Value get_json() const { - return m_json_rc; - } - -public: - rc_e m_rc; - capture_id_t m_capture_id; - TrexPktBuffer *m_pkt_buffer; - Json::Value m_json_rc; -}; - -class TrexCaptureRCStart : public TrexCaptureRC { -public: - - void set_rc(capture_id_t new_id, dsec_t start_ts) { - m_capture_id = new_id; - m_start_ts = start_ts; - m_rc = RC_OK; - - } - - capture_id_t get_new_id() const { - return m_capture_id; - } - - dsec_t get_start_ts() const { - return m_start_ts; - } - -private: - capture_id_t m_capture_id; - dsec_t m_start_ts; -}; - - -class TrexCaptureRCStop : public TrexCaptureRC { -public: - void set_rc(uint32_t pkt_count) { - m_pkt_count = pkt_count; - m_rc = RC_OK; - } - - uint32_t get_pkt_count() const { - return m_pkt_count; - } - -private: - uint32_t m_pkt_count; -}; - -class TrexCaptureRCFetch : public TrexCaptureRC { -public: - - TrexCaptureRCFetch() { - m_pkt_buffer = nullptr; - m_pending = 0; - } - - void set_pkt_buffer(const TrexPktBuffer *pkt_buffer, uint32_t pending, dsec_t start_ts) { - m_pkt_buffer = pkt_buffer; - m_pending = pending; - m_start_ts = start_ts; - m_rc = RC_OK; - } - - const TrexPktBuffer *get_pkt_buffer() const { - return m_pkt_buffer; - } - - uint32_t get_pending() const { - return m_pending; - } - - dsec_t get_start_ts() const { - return m_start_ts; - } - -private: - const TrexPktBuffer *m_pkt_buffer; - uint32_t m_pending; - dsec_t m_start_ts; -}; - -class TrexCaptureRCRemove : public TrexCaptureRC { -public: - void set_ok() { - m_rc = RC_OK; - } -}; - -class TrexCaptureRCStatus : public TrexCaptureRC { -public: - - void set_status(const Json::Value &json) { - m_json = json; - m_rc = RC_OK; - } - - const Json::Value & get_status() const { - return m_json; - } - -private: - Json::Value m_json; -}; - -/** - * capture filter +/************************************** + * Capture Filter + * * specify which ports to capture and if TX/RX or both - */ + *************************************/ class CaptureFilter { public: CaptureFilter() { @@ -186,10 +40,16 @@ public: m_rx_active = 0; } + /** + * add a port to the active TX port list + */ void add_tx(uint8_t port_id) { m_tx_active |= (1LL << port_id); } + /** + * add a port to the active RX port list + */ void add_rx(uint8_t port_id) { m_rx_active |= (1LL << port_id); } @@ -226,6 +86,10 @@ public: return ( in_tx(port_id) || in_rx(port_id) ); } + /** + * updates the current filter with another filter + * the result is the aggregation of TX /RX active lists + */ CaptureFilter& operator +=(const CaptureFilter &other) { m_tx_active |= other.m_tx_active; m_rx_active |= other.m_rx_active; @@ -248,19 +112,36 @@ private: }; +/************************************** + * Capture + * + * A single instance of a capture + *************************************/ class TrexStatelessCapture { public: + enum state_e { STATE_ACTIVE, STATE_STOPPED, }; - TrexStatelessCapture(capture_id_t id, uint64_t limit, const CaptureFilter &filter); + TrexStatelessCapture(capture_id_t id, + uint64_t limit, + const CaptureFilter &filter, + TrexPktBuffer::mode_e mode); - void handle_pkt_tx(TrexPkt *pkt); + ~TrexStatelessCapture(); + + /** + * handles a packet from the TX side + */ + void handle_pkt_tx(const TrexPkt *pkt); + + /** + * handles a packet from the RX side + */ void handle_pkt_rx(const rte_mbuf_t *m, int port); - ~TrexStatelessCapture(); uint64_t get_id() const { return m_id; @@ -270,8 +151,12 @@ public: return m_filter; } - Json::Value to_json() const; + /** + * stop the capture - from now on all packets will be ignored + * + * @author imarom (1/24/2017) + */ void stop() { m_state = STATE_STOPPED; } @@ -290,6 +175,9 @@ public: return m_start_ts; } + + Json::Value to_json() const; + private: state_e m_state; TrexPktBuffer *m_pkt_buffer; @@ -299,6 +187,14 @@ private: uint64_t m_pkt_index; }; + +/************************************** + * Capture Manager + * Handles all the captures in + * the system + * + * the design is a singleton + *************************************/ class TrexStatelessCaptureMngr { public: @@ -317,7 +213,10 @@ public: /** * starts a new capture */ - void start(const CaptureFilter &filter, uint64_t limit, TrexCaptureRCStart &rc); + void start(const CaptureFilter &filter, + uint64_t limit, + TrexPktBuffer::mode_e mode, + TrexCaptureRCStart &rc); /** * stops an existing capture @@ -346,7 +245,8 @@ public: /** - * return true if any filter is active + * return true if any filter is active + * on a specific port * * @author imarom (1/3/2017) * @@ -359,14 +259,20 @@ public: /** * handle packet from TX */ - void handle_pkt_tx(TrexPkt *pkt); + void handle_pkt_tx(const TrexPkt *pkt) { + if (!m_global_filter.in_filter(pkt)) { + return; + } + + handle_pkt_tx_slow_path(pkt); + } /** * handle packet from RX */ void handle_pkt_rx(const rte_mbuf_t *m, int port) { - /* fast path */ - if (!is_active(port)) { + /* fast path - check the global filter */ + if (!m_global_filter.in_rx(port)) { return; } @@ -385,8 +291,11 @@ private: TrexStatelessCapture * lookup(capture_id_t capture_id); + int lookup_index(capture_id_t capture_id); void handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port); + void handle_pkt_tx_slow_path(const TrexPkt *pkt); + void update_global_filter(); std::vector m_captures; diff --git a/src/stateless/rx/trex_stateless_capture_rc.h b/src/stateless/rx/trex_stateless_capture_rc.h new file mode 100644 index 00000000..12b37c1d --- /dev/null +++ b/src/stateless/rx/trex_stateless_capture_rc.h @@ -0,0 +1,195 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2016 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_STATELESS_CAPTURE_RC_H__ +#define __TREX_STATELESS_CAPTURE_RC_H__ + +typedef int32_t capture_id_t; + +/** + * a base class for a capture command RC + * not to be used directly + */ +class TrexCaptureRC { + +protected: + /* cannot instantiate this object from outside */ + TrexCaptureRC() { + m_rc = RC_INVALID; + } + +public: + + /** + * error types for commands + */ + enum rc_e { + RC_INVALID = 0, + RC_OK = 1, + RC_CAPTURE_NOT_FOUND, + RC_CAPTURE_LIMIT_REACHED, + RC_CAPTURE_FETCH_UNDER_ACTIVE + }; + + bool operator !() const { + return (m_rc != RC_OK); + } + + std::string get_err() const { + assert(m_rc != RC_INVALID); + + switch (m_rc) { + case RC_OK: + return ""; + case RC_CAPTURE_LIMIT_REACHED: + return "capture limit has reached"; + case RC_CAPTURE_NOT_FOUND: + return "capture ID not found"; + case RC_CAPTURE_FETCH_UNDER_ACTIVE: + return "fetch command cannot be executed on an active capture"; + case RC_INVALID: + /* should never be called under invalid */ + assert(0); + + default: + assert(0); + } + } + + void set_err(rc_e rc) { + m_rc = rc; + } + + +protected: + rc_e m_rc; +}; + +/** + * return code for executing capture start + */ +class TrexCaptureRCStart : public TrexCaptureRC { +public: + + void set_rc(capture_id_t new_id, dsec_t start_ts) { + m_capture_id = new_id; + m_start_ts = start_ts; + m_rc = RC_OK; + } + + capture_id_t get_new_id() const { + assert(m_rc == RC_OK); + return m_capture_id; + } + + dsec_t get_start_ts() const { + assert(m_rc == RC_OK); + return m_start_ts; + } + +private: + capture_id_t m_capture_id; + dsec_t m_start_ts; +}; + +/** + * return code for exectuing capture stop + */ +class TrexCaptureRCStop : public TrexCaptureRC { +public: + + void set_rc(uint32_t pkt_count) { + m_pkt_count = pkt_count; + m_rc = RC_OK; + } + + uint32_t get_pkt_count() const { + assert(m_rc == RC_OK); + return m_pkt_count; + } + +private: + uint32_t m_pkt_count; +}; + +/** + * return code for executing capture fetch + */ +class TrexCaptureRCFetch : public TrexCaptureRC { +public: + + void set_rc(const TrexPktBuffer *pkt_buffer, uint32_t pending, dsec_t start_ts) { + m_pkt_buffer = pkt_buffer; + m_pending = pending; + m_start_ts = start_ts; + m_rc = RC_OK; + } + + const TrexPktBuffer *get_pkt_buffer() const { + assert(m_rc == RC_OK); + return m_pkt_buffer; + } + + uint32_t get_pending() const { + assert(m_rc == RC_OK); + return m_pending; + } + + dsec_t get_start_ts() const { + assert(m_rc == RC_OK); + return m_start_ts; + } + +private: + const TrexPktBuffer *m_pkt_buffer; + uint32_t m_pending; + dsec_t m_start_ts; +}; + + + +class TrexCaptureRCRemove : public TrexCaptureRC { +public: + void set_rc() { + m_rc = RC_OK; + } +}; + + +class TrexCaptureRCStatus : public TrexCaptureRC { +public: + + void set_rc(const Json::Value &json) { + m_json = json; + m_rc = RC_OK; + } + + const Json::Value & get_status() const { + assert(m_rc == RC_OK); + return m_json; + } + +private: + Json::Value m_json; +}; + + +#endif /* __TREX_STATELESS_CAPTURE_RC_H__ */ + -- cgit 1.2.3-korg From acf815dbf67d7a3be8fefd84eea1d25465f71136 Mon Sep 17 00:00:00 2001 From: imarom Date: Thu, 26 Jan 2017 17:26:00 +0200 Subject: code review - few cleanups Signed-off-by: imarom --- linux/ws_main.py | 5 +- .../trex_control_plane/stl/console/trex_console.py | 5 +- .../stl/trex_stl_lib/trex_stl_client.py | 114 ++++++++++++--------- .../stl/trex_stl_lib/trex_stl_jsonrpc_client.py | 9 ++ .../stl/trex_stl_lib/trex_stl_port.py | 16 --- .../stl/trex_stl_lib/utils/common.py | 5 + .../stl/trex_stl_lib/utils/parsing_opts.py | 11 +- 7 files changed, 96 insertions(+), 69 deletions(-) (limited to 'scripts') diff --git a/linux/ws_main.py b/linux/ws_main.py index 711b4c89..fa62c59d 100755 --- a/linux/ws_main.py +++ b/linux/ws_main.py @@ -174,7 +174,9 @@ stateless_src = SrcGroup(dir='src/stateless/', 'dp/trex_stateless_dp_core.cpp', 'messaging/trex_stateless_messaging.cpp', 'rx/trex_stateless_rx_core.cpp', - 'rx/trex_stateless_rx_port_mngr.cpp' + 'rx/trex_stateless_rx_port_mngr.cpp', + 'rx/trex_stateless_capture.cpp', + 'common/trex_stateless_pkt.cpp' ]) # RPC code rpc_server_src = SrcGroup(dir='src/rpc-server/', @@ -273,6 +275,7 @@ includes_path =''' ../src/pal/linux/ ../src/stateless/cp/ ../src/stateless/dp/ ../src/stateless/rx/ + ../src/stateless/common/ ../src/stateless/messaging/ ../external_libs/json/ ../external_libs/zmq/include/ diff --git a/scripts/automation/trex_control_plane/stl/console/trex_console.py b/scripts/automation/trex_control_plane/stl/console/trex_console.py index 83f36820..d36ce7b0 100755 --- a/scripts/automation/trex_control_plane/stl/console/trex_console.py +++ b/scripts/automation/trex_control_plane/stl/console/trex_console.py @@ -175,6 +175,8 @@ class TRexConsole(TRexGeneralCmd): def __init__(self, stateless_client, verbose = False): + # cmd lock is used to make sure background job + # of the console is not done while the user excutes commands self.cmd_lock = Lock() self.stateless_client = stateless_client @@ -721,6 +723,7 @@ class TRexConsole(TRexGeneralCmd): continue finally: + # capture manager is not presistent - kill it before going out self.cap_mngr.stop() if self.terminal: @@ -955,8 +958,6 @@ def main(): stateless_client.disconnect(stop_traffic = False) - if __name__ == '__main__': - main() diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index 654ceaf6..571334ee 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -22,13 +22,11 @@ from collections import namedtuple from yaml import YAMLError import time import datetime -import threading import re import random import json import traceback import os.path -import argparse ############################ logger ############################# ############################ ############################# @@ -3007,32 +3005,42 @@ class STLClient(object): @__api_check(True) def start_capture (self, tx_ports, rx_ports, limit = 1000, mode = 'fixed'): """ - Starts a capture to PCAP on port(s) + Starts a low rate packet capturing on the server :parameters: tx_ports - on which ports to capture TX rx_ports - on which ports to capture RX limit - limit how many packets will be written + memory requierment is O(9K * limit) - mode - 'fixed': when full, future packets will be + mode - 'fixed': when full, newer packets will be dropped - 'cyclic: when full, oldest packets will be + + 'cyclic: when full, older packets will be dropped :returns: - returns a dictionary containing + returns a dictionary: {'id: , 'ts': } + where 'id' is the new capture ID for future commands + and 'ts' is that server monotonic timestamp when + the capture was created + :raises: + :exe:'STLError' """ - + # TODO: remove this when TX is implemented + if tx_ports: + raise STLError('TX port capturing is not yet implemented') + # check arguments tx_ports = self._validate_port_list(tx_ports, allow_empty = True) rx_ports = self._validate_port_list(rx_ports, allow_empty = True) merge_ports = set(tx_ports + rx_ports) + # make sure at least one port to capture if not merge_ports: raise STLError("start_capture - must get at least one port to capture") @@ -3044,13 +3052,13 @@ class STLClient(object): raise STLError("'mode' must be either 'fixed' or 'cyclic'") # verify service mode - non_service_ports = list_difference(set(tx_ports + rx_ports), self.get_service_enabled_ports()) + non_service_ports = list_difference(merge_ports, self.get_service_enabled_ports()) if non_service_ports: - raise STLError("Port(s) {0} are not under service mode. PCAP capturing requires all ports to be in service mode".format(non_service_ports)) + raise STLError("Port(s) {0} are not under service mode. packet capturing requires all ports to be in service mode".format(non_service_ports)) # actual job - self.logger.pre_cmd("Starting PCAP capturing up to {0} packets".format(limit)) + self.logger.pre_cmd("Starting packet capturing up to {0} packets".format(limit)) rc = self._transmit("capture", params = {'command': 'start', 'limit': limit, 'mode': mode, 'tx': tx_ports, 'rx': rx_ports}) self.logger.post_cmd(rc) @@ -3059,50 +3067,18 @@ class STLClient(object): return {'id': rc.data()['capture_id'], 'ts': rc.data()['start_ts']} - - - def __fetch_capture_packets (self, capture_id, output_filename, pkt_count): - self.logger.pre_cmd("Writing {0} packets to '{1}'".format(pkt_count, output_filename)) - # create a PCAP file - writer = RawPcapWriter(output_filename, linktype = 1) - writer._write_header(None) - - # fetch - pending = pkt_count - rc = RC_OK() - while pending > 0: - rc = self._transmit("capture", params = {'command': 'fetch', 'capture_id': capture_id, 'pkt_limit': 50}) - if not rc: - self.logger.post_cmd(rc) - raise STLError(rc) - - pkts = rc.data()['pkts'] - pending = rc.data()['pending'] - start_ts = rc.data()['start_ts'] - - for pkt in pkts: - ts = pkt['ts'] - start_ts - ts_sec = int(ts) - ts_usec = int( (ts - ts_sec) * 1e6 ) - - pkt_bin = base64.b64decode(pkt['binary']) - writer._write_packet(pkt_bin, sec = ts_sec, usec = ts_usec) - - - - - self.logger.post_cmd(rc) - @__api_check(True) def stop_capture (self, capture_id, output_filename = None): """ - Stops an active capture + Stops an active capture and optionally save it to a PCAP file :parameters: capture_id - an active capture ID to stop output_filename - output filename to save capture + if None all captured packets + will be discarded :raises: + :exe:'STLError' @@ -3116,7 +3092,7 @@ class STLClient(object): # stop - self.logger.pre_cmd("Stopping PCAP capture {0}".format(capture_id)) + self.logger.pre_cmd("Stopping packet capture {0}".format(capture_id)) rc = self._transmit("capture", params = {'command': 'stop', 'capture_id': capture_id}) self.logger.post_cmd(rc) if not rc: @@ -3137,6 +3113,47 @@ class STLClient(object): raise STLError(rc) + + # fetch packets from the server and save them to a file + def __fetch_capture_packets (self, capture_id, output_filename, pkt_count): + self.logger.pre_cmd("Writing {0} packets to '{1}'".format(pkt_count, output_filename)) + + # create a PCAP file + writer = RawPcapWriter(output_filename, linktype = 1) + writer._write_header(None) + + pending = pkt_count + rc = RC_OK() + + # fetch with iteratios - each iteration up to 50 packets + while pending > 0: + rc = self._transmit("capture", params = {'command': 'fetch', 'capture_id': capture_id, 'pkt_limit': 50}) + if not rc: + self.logger.post_cmd(rc) + raise STLError(rc) + + # make sure we are getting some progress + assert(rc.data()['pending'] < pending) + + pkts = rc.data()['pkts'] + pending = rc.data()['pending'] + start_ts = rc.data()['start_ts'] + + # write packets + for pkt in pkts: + # split the server timestamp relative to the capture start time + ts_sec, ts_usec = sec_split_usec(pkt['ts'] - start_ts) + + pkt_bin = base64.b64decode(pkt['binary']) + writer._write_packet(pkt_bin, sec = ts_sec, usec = ts_usec) + + + + + self.logger.post_cmd(rc) + + + @__api_check(True) def get_capture_status (self): """ @@ -3145,14 +3162,13 @@ class STLClient(object): info about the capture """ - rc = self._transmit("capture", params = {'command': 'status'}) - if not rc: raise STLError(rc) return rc.data() + @__api_check(True) def remove_all_captures (self): """ @@ -3160,7 +3176,7 @@ class STLClient(object): """ captures = self.get_capture_status() - self.logger.pre_cmd("Removing all PCAP captures from server") + self.logger.pre_cmd("Removing all packet captures from server") for c in captures: # remove diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py index db216532..405f76be 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_jsonrpc_client.py @@ -10,6 +10,7 @@ import struct from .trex_stl_types import * from .utils.common import random_id_gen from .utils.zipmsg import ZippedMsg +from threading import Lock class bcolors: BLUE = '\033[94m' @@ -72,6 +73,8 @@ class JsonRpcClient(object): self.id_gen = random_id_gen() self.zipper = ZippedMsg() + self.lock = Lock() + def get_connection_details (self): rc = {} rc['server'] = self.server @@ -137,6 +140,12 @@ class JsonRpcClient(object): def send_msg (self, msg, retry = 0): + # REQ/RESP pattern in ZMQ requires no interrupts during the send + with self.lock: + return self.__send_msg(msg, retry) + + + def __send_msg (self, msg, retry = 0): # print before if self.logger.check_verbose(self.logger.VERBOSE_HIGH): self.verbose_msg("Sending Request To Server:\n\n" + self.pretty_json(msg) + "\n") diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py index 31d752af..1ef3a8ff 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_port.py @@ -492,22 +492,6 @@ class Port(object): return self.ok() - @owned - def start_capture (self, pcap_filename, mode, limit): - - if mode != 'tx' and not self.is_service_mode_on(): - return self.err('port service mode must be enabled for performing RX capturing. Please enable service mode') - - params = {"handler": self.handler, - "port_id": self.port_id, - "mode": mode, - "limit": limit} - - rc = self.transmit("start_capture", params) - if rc.bad(): - return self.err(rc.err()) - - return self.ok() @writeable def set_l2_mode (self, dst_mac): diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py index c386451b..72d3fa9f 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py @@ -71,6 +71,11 @@ def list_difference (l1, l2): def is_sub_list (l1, l2): return set(l1) <= set(l2) +# splits a timestamp in seconds to sec/usec +def sec_split_usec (ts): + return int(ts), int( (ts - int(ts)) * 1e6 ) + + # a simple passive timer class PassiveTimer(object): diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py index 8d3aedbe..53db533c 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py @@ -689,19 +689,23 @@ class _MergeAction(argparse._AppendAction): class CCmdArgParser(argparse.ArgumentParser): - def __init__(self, stateless_client = None, x = None, *args, **kwargs): + def __init__(self, stateless_client = None, *args, **kwargs): super(CCmdArgParser, self).__init__(*args, **kwargs) self.stateless_client = stateless_client self.cmd_name = kwargs.get('prog') self.register('action', 'merge', _MergeAction) + def add_arg_list (self, *args): populate_parser(self, *args) + + # a simple hook for add subparsers to add stateless client def add_subparsers(self, *args, **kwargs): sub = super(CCmdArgParser, self).add_subparsers(*args, **kwargs) + # save pointer to the original add parser method add_parser = sub.add_parser stateless_client = self.stateless_client @@ -710,13 +714,17 @@ class CCmdArgParser(argparse.ArgumentParser): parser.stateless_client = stateless_client return parser + # override with the hook sub.add_parser = add_parser_hook + return sub + # hook this to the logger def _print_message(self, message, file=None): self.stateless_client.logger.log(message) + def error(self, message): self.print_usage() self._print_message(('%s: error: %s\n') % (self.prog, message)) @@ -783,6 +791,7 @@ class CCmdArgParser(argparse.ArgumentParser): # recover from system exit scenarios, such as "help", or bad arguments. return RC_ERR("'{0}' - {1}".format(self.cmd_name, "no action")) + def formatted_error (self, msg): self.print_usage() self._print_message(('%s: error: %s\n') % (self.prog, msg)) -- cgit 1.2.3-korg From 42664b05103d0f4a7ed272301051d58d6e8f3737 Mon Sep 17 00:00:00 2001 From: imarom Date: Sun, 29 Jan 2017 17:49:53 +0200 Subject: code review - cont. Signed-off-by: imarom --- .../trex_control_plane/stl/console/trex_capture.py | 357 +++++++++++---------- .../stl/trex_stl_lib/trex_stl_client.py | 2 +- .../stl/trex_stl_lib/trex_stl_types.py | 2 +- .../stl/trex_stl_lib/utils/text_opts.py | 16 +- 4 files changed, 191 insertions(+), 186 deletions(-) (limited to 'scripts') diff --git a/scripts/automation/trex_control_plane/stl/console/trex_capture.py b/scripts/automation/trex_control_plane/stl/console/trex_capture.py index aac685a1..2132458e 100644 --- a/scripts/automation/trex_control_plane/stl/console/trex_capture.py +++ b/scripts/automation/trex_control_plane/stl/console/trex_capture.py @@ -4,53 +4,44 @@ import threading import tempfile import select +# defines a generic monitor writer class CaptureMonitorWriter(object): - def init (self, start_ts): - raise NotImplementedError def deinit(self): - raise NotImplementedError + # by default - nothing to deinit + pass def handle_pkts (self, pkts): raise NotImplementedError def periodic_check (self): - raise NotImplementedError - + # by default - nothing to check + pass + +# a stdout monitor class CaptureMonitorWriterStdout(CaptureMonitorWriter): - def __init__ (self, logger, is_brief): + def __init__ (self, logger, is_brief, start_ts): self.logger = logger self.is_brief = is_brief - + self.start_ts = start_ts + + # unicode arrows self.RX_ARROW = u'\u25c0\u2500\u2500' self.TX_ARROW = u'\u25b6\u2500\u2500' - - def init (self, start_ts): - self.start_ts = start_ts + # decode issues with Python 2 + if sys.version_info < (3,0): + self.RX_ARROW = self.RX_ARROW.encode('utf-8') + self.TX_ARROW = self.TX_ARROW.encode('utf-8') + + self.logger.pre_cmd("Starting stdout capture monitor - verbose: '{0}'".format('low' if self.is_brief else 'high')) self.logger.post_cmd(RC_OK) self.logger.log(format_text("\n*** use 'capture monitor stop' to abort capturing... ***\n", 'bold')) - - def deinit (self): - pass - - - def periodic_check (self): - return RC_OK() - - def handle_pkts (self, pkts): - byte_count = 0 - - for pkt in pkts: - byte_count += self.__handle_pkt(pkt) - - self.logger.prompt_redraw() - return RC_OK(byte_count) - + def get_scapy_name (self, pkt_scapy): layer = pkt_scapy @@ -62,9 +53,9 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter): def format_origin (self, origin): if origin == 'RX': - return u'{0} {1}'.format(self.RX_ARROW, 'RX') + return '{0} {1}'.format(self.RX_ARROW, 'RX') elif origin == 'TX': - return u'{0} {1}'.format(self.TX_ARROW, 'TX') + return '{0} {1}'.format(self.TX_ARROW, 'TX') else: return '{0}'.format(origin) @@ -73,10 +64,9 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter): pkt_bin = base64.b64decode(pkt['binary']) pkt_scapy = Ether(pkt_bin) - self.logger.log(format_text(u'\n\n#{} Port: {} {}\n'.format(pkt['index'], pkt['port'], self.format_origin(pkt['origin'])), 'bold', '')) + self.logger.log(format_text('\n\n#{} Port: {} {}\n'.format(pkt['index'], pkt['port'], self.format_origin(pkt['origin'])), 'bold', '')) self.logger.log(format_text(' Type: {}, Size: {} B, TS: {:.2f} [sec]\n'.format(self.get_scapy_name(pkt_scapy), len(pkt_bin), pkt['ts'] - self.start_ts), 'bold')) - if self.is_brief: self.logger.log(' {0}'.format(pkt_scapy.command())) else: @@ -85,16 +75,29 @@ class CaptureMonitorWriterStdout(CaptureMonitorWriter): return len(pkt_bin) -# + + def handle_pkts (self, pkts): + try: + byte_count = 0 + for pkt in pkts: + byte_count += self.__handle_pkt(pkt) + + return byte_count + + finally: + # make sure to restore the logger + self.logger.prompt_redraw() + + +# a pipe based monitor class CaptureMonitorWriterPipe(CaptureMonitorWriter): - def __init__ (self, logger): + def __init__ (self, logger, start_ts): + self.logger = logger - self.fifo_name = None self.fifo = None - self.start_ts = None - - def init (self, start_ts): self.start_ts = start_ts + + # generate a temp fifo pipe self.fifo_name = tempfile.mktemp() try: @@ -105,27 +108,35 @@ class CaptureMonitorWriterPipe(CaptureMonitorWriter): self.logger.log(format_text("*** Please run 'wireshark -k -i {0}' ***".format(self.fifo_name), 'bold')) self.logger.pre_cmd("Waiting for Wireshark pipe connection") + + # blocks until pipe is connected self.fifo = os.open(self.fifo_name, os.O_WRONLY) self.logger.post_cmd(RC_OK()) self.logger.log(format_text('\n*** Capture monitoring started ***\n', 'bold')) + # open for write using a PCAP writer self.writer = RawPcapWriter(self.fifo_name, linktype = 1, sync = True) self.writer._write_header(None) # register a poller self.poll = select.poll() self.poll.register(self.fifo, select.EPOLLERR) - + + self.is_init = True + except KeyboardInterrupt as e: + self.deinit() self.logger.post_cmd(RC_ERR("")) raise STLError("*** pipe monitor aborted...cleaning up") except OSError as e: + self.deinit() self.logger.post_cmd(RC_ERR("")) raise STLError("failed to create pipe {0}\n{1}".format(self.fifo_name, str(e))) + def deinit (self): try: if self.fifo: @@ -138,141 +149,109 @@ class CaptureMonitorWriterPipe(CaptureMonitorWriter): except OSError: pass + def periodic_check (self): - return self.check_pipe() + self.check_pipe() def check_pipe (self): if self.poll.poll(0): - return RC_ERR('*** pipe has been disconnected - aborting monitoring ***') - - return RC_OK() + raise STLError('pipe has been disconnected') def handle_pkts (self, pkts): - rc = self.check_pipe() - if not rc: - return rc + # first check the pipe is alive + self.check_pipe() + + return self.handle_pkts_internal(pkts) + + + def handle_pkts_internal (self, pkts): byte_count = 0 for pkt in pkts: pkt_bin = base64.b64decode(pkt['binary']) - ts = pkt['ts'] - sec = int(ts) - usec = int( (ts - sec) * 1e6 ) - + ts_sec, ts_usec = sec_split_usec(pkt['ts'] - self.start_ts) + try: - self.writer._write_packet(pkt_bin, sec = sec, usec = usec) - except IOError: - return RC_ERR("*** failed to write packet to pipe ***") - + self.writer._write_packet(pkt_bin, sec = ts_sec, usec = ts_usec) + except Exception as e: + raise STLError('fail to write packets to pipe: {}'.format(str(e))) + byte_count += len(pkt_bin) - return RC_OK(byte_count) + return byte_count +# capture monitor - a live capture class CaptureMonitor(object): - def __init__ (self, client, cmd_lock): + def __init__ (self, client, cmd_lock, tx_port_list, rx_port_list, rate_pps, mon_type): self.client = client - self.cmd_lock = cmd_lock - self.active = False - self.capture_id = None self.logger = client.logger - self.writer = None - - def is_active (self): - return self.active - + self.cmd_lock = cmd_lock - def get_capture_id (self): - return self.capture_id + self.t = None + self.writer = None + self.capture_id = None + self.tx_port_list = tx_port_list + self.rx_port_list = rx_port_list + self.rate_pps = rate_pps + self.mon_type = mon_type - def start (self, tx_port_list, rx_port_list, rate_pps, mon_type): + # try to launch try: - self.start_internal(tx_port_list, rx_port_list, rate_pps, mon_type) + self.__start() except Exception as e: self.__stop() - raise e + raise - def start_internal (self, tx_port_list, rx_port_list, rate_pps, mon_type): - # stop any previous monitors - if self.active: - self.stop() - - self.tx_port_list = tx_port_list - self.rx_port_list = rx_port_list - - if mon_type == 'compact': - self.writer = CaptureMonitorWriterStdout(self.logger, is_brief = True) - elif mon_type == 'verbose': - self.writer = CaptureMonitorWriterStdout(self.logger, is_brief = False) - elif mon_type == 'pipe': - self.writer = CaptureMonitorWriterPipe(self.logger) - else: - raise STLError('unknown writer type') + def __start (self): + # create a capture on the server with self.logger.supress(): - data = self.client.start_capture(tx_port_list, rx_port_list, limit = rate_pps, mode = 'cyclic') - + data = self.client.start_capture(self.tx_port_list, self.rx_port_list, limit = self.rate_pps, mode = 'cyclic') + self.capture_id = data['id'] self.start_ts = data['ts'] - - self.writer.init(self.start_ts) - - self.t = threading.Thread(target = self.__thread_cb) - self.t.setDaemon(True) - - try: - self.active = True - self.t.start() - except Exception as e: - self.active = False - self.stop() - raise e - - # entry point stop - def stop (self): - if self.active: - self.stop_logged() + # create a writer + if self.mon_type == 'compact': + self.writer = CaptureMonitorWriterStdout(self.logger, True, self.start_ts) + elif self.mon_type == 'verbose': + self.writer = CaptureMonitorWriterStdout(self.logger, False, self.start_ts) + elif self.mon_type == 'pipe': + self.writer = CaptureMonitorWriterPipe(self.logger, self.start_ts) else: - self.__stop() - - # wraps stop with a logging - def stop_logged (self): - self.logger.pre_cmd("Stopping capture monitor") + raise STLError('Internal error: unknown writer type') - try: - self.__stop() - except Exception as e: - self.logger.post_cmd(RC_ERR("")) - raise e - - self.logger.post_cmd(RC_OK()) + # start the fetching thread + self.t = threading.Thread(target = self.__thread_cb) + self.t.setDaemon(True) + self.active = True + self.t.start() + # internal stop def __stop (self): - # shutdown thread - if self.active: + # stop the thread + if self.t and self.t.is_alive(): self.active = False self.t.join() + self.t = None # deinit the writer - if self.writer is not None: + if self.writer: self.writer.deinit() self.writer = None - # cleanup capture ID if possible - if self.capture_id is None: - return - + # take the capture ID capture_id = self.capture_id self.capture_id = None @@ -280,31 +259,48 @@ class CaptureMonitor(object): if not self.client.is_connected(): return - try: - captures = [x['id'] for x in self.client.get_capture_status()] - if capture_id not in captures: - return - - with self.logger.supress(): - self.client.stop_capture(capture_id) + # make sure the capture is active on the server + captures = [x['id'] for x in self.client.get_capture_status()] + if capture_id not in captures: + return - except STLError as e: - self.logger.post_cmd(RC_ERR("")) - raise e + # remove the capture + with self.logger.supress(): + self.client.stop_capture(capture_id) - + + # user call for stop (adds log) + def stop (self): + self.logger.pre_cmd("Stopping capture monitor") + + try: + self.__stop() + except Exception as e: + self.logger.post_cmd(RC_ERR("")) + raise + + self.logger.post_cmd(RC_OK()) + + def get_mon_row (self): - if not self.is_active(): - return None return [self.capture_id, + format_text('ACTIVE' if self.t.is_alive() else 'DEAD', 'bold'), self.pkt_count, format_num(self.byte_count, suffix = 'B'), ', '.join([str(x) for x in self.tx_port_list] if self.tx_port_list else '-'), ', '.join([str(x) for x in self.rx_port_list] if self.rx_port_list else '-') ] - + + def is_active (self): + return self.active + + + def get_capture_id (self): + return self.capture_id + + # sleeps with high freq checks for active def __sleep (self): for _ in range(5): @@ -331,13 +327,18 @@ class CaptureMonitor(object): def __thread_cb (self): try: - rc = self.__thread_main_loop() - finally: - pass + self.__thread_main_loop() + + # common errors + except STLError as e: + self.logger.log(format_text("\n\nMonitor has encountered the following error: '{}'\n".format(e.brief()), 'bold')) + self.logger.log(format_text("\n*** monitor is inactive - please restart the monitor ***\n", 'bold')) + self.logger.prompt_redraw() - if not rc: - self.logger.log(str(rc)) - self.logger.log(format_text('\n*** monitor is inactive - please restart the monitor ***\n', 'bold')) + # unexpected errors + except Exception as e: + self.logger.log("\n\n*** A fatal internal error has occurred: '{}'\n".format(str(e))) + self.logger.log(format_text("\n*** monitor is inactive - please restart the monitor ***\n", 'bold')) self.logger.prompt_redraw() @@ -347,54 +348,50 @@ class CaptureMonitor(object): while self.active: - # sleep + # sleep - if interrupt by graceful shutdown - go out if not self.__sleep(): - break + return # check that the writer is ok - rc = self.writer.periodic_check() - if not rc: - return rc + self.writer.periodic_check() - # try to lock + # try to lock - if interrupt by graceful shutdown - go out if not self.__lock(): - break + return try: if not self.client.is_connected(): - return RC_ERR('*** client has been disconnected, aborting monitoring ***') + raise STLError('client has been disconnected') + rc = self.client._transmit("capture", params = {'command': 'fetch', 'capture_id': self.capture_id, 'pkt_limit': 10}) if not rc: - return rc + raise STLError(rc) finally: self.__unlock() + # no packets - skip pkts = rc.data()['pkts'] if not pkts: continue - rc = self.writer.handle_pkts(pkts) - if not rc: - return rc + byte_count = self.writer.handle_pkts(pkts) self.pkt_count += len(pkts) - self.byte_count += rc.data() - - # graceful shutdown - return RC_OK() - - + self.byte_count += byte_count + + + # main class class CaptureManager(object): def __init__ (self, client, cmd_lock): self.c = client self.cmd_lock = cmd_lock - self.monitor = CaptureMonitor(client, cmd_lock) self.logger = client.logger - + self.monitor = None + # install parsers self.parser = parsing_opts.gen_parser(self, "capture", self.parse_line_internal.__doc__) @@ -445,7 +442,9 @@ class CaptureManager(object): def stop (self): - self.monitor.stop() + if self.monitor: + self.monitor.stop() + self.monitor = None # main entry point for parsing commands from console @@ -453,7 +452,7 @@ class CaptureManager(object): try: self.parse_line_internal(line) except STLError as e: - self.logger.log("\nAction has failed with the following error:\n" + format_text(e.brief() + "\n", 'bold')) + self.logger.log("\nAction has failed with the following error:\n\n" + format_text(e.brief() + "\n", 'bold')) return RC_ERR(e.brief()) @@ -497,7 +496,7 @@ class CaptureManager(object): captures = self.c.get_capture_status() ids = [c['id'] for c in captures] - if opts.capture_id == self.monitor.get_capture_id(): + if self.monitor and (opts.capture_id == self.monitor.get_capture_id()): self.record_stop_parser.formatted_error("'{0}' is a monitor, please use 'capture monitor stop'".format(opts.capture_id)) return @@ -530,14 +529,24 @@ class CaptureManager(object): self.monitor_start_parser.formatted_error('please provide either --tx or --rx') return - self.monitor.stop() - self.monitor.start(opts.tx_port_list, opts.rx_port_list, 100, mon_type) + if self.monitor: + self.monitor.stop() + self.monitor = None + + self.monitor = CaptureMonitor(self.c, self.cmd_lock, opts.tx_port_list, opts.rx_port_list, 100, mon_type) + def parse_monitor_stop (self, opts): - self.monitor.stop() + if self.monitor: + self.monitor.stop() + self.monitor = None + def parse_clear (self, opts): - self.monitor.stop() + if self.monitor: + self.monitor.stop() + self.monitor = None + self.c.remove_all_captures() @@ -552,13 +561,13 @@ class CaptureManager(object): # monitor mon_table = text_tables.TRexTextTable() - mon_table.set_cols_align(["c"] * 5) - mon_table.set_cols_width([15] * 5) + mon_table.set_cols_align(["c"] * 6) + mon_table.set_cols_width([15] * 6) for elem in data: id = elem['id'] - if self.monitor.get_capture_id() == id: + if self.monitor and (self.monitor.get_capture_id() == id): row = self.monitor.get_mon_row() mon_table.add_rows([row], header=False) @@ -573,7 +582,7 @@ class CaptureManager(object): cap_table.add_rows([row], header=False) cap_table.header(['ID', 'Status', 'Packets', 'Bytes', 'TX Ports', 'RX Ports']) - mon_table.header(['ID', 'Packets Seen', 'Bytes Seen', 'TX Ports', 'RX Ports']) + mon_table.header(['ID', 'Status', 'Packets Seen', 'Bytes Seen', 'TX Ports', 'RX Ports']) if cap_table._rows: text_tables.print_table_with_header(cap_table, '\nActive Recorders') diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index 571334ee..c46a7d78 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -3296,7 +3296,7 @@ class STLClient(object): try: rc = f(*args) except STLError as e: - client.logger.log("\nAction has failed with the following error:\n" + format_text(e.brief() + "\n", 'bold')) + client.logger.log("\nAction has failed with the following error:\n\n" + format_text(e.brief() + "\n", 'bold')) return RC_ERR(e.brief()) # if got true - print time diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_types.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_types.py index 0230db23..7ac508a2 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_types.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_types.py @@ -64,7 +64,7 @@ class RC(): err_count += 1 if len(err_list) < show_count: err_list.append(format_text(x, 'bold')) - s = '\n' + s = '' if err_count > show_count: s += format_text('Occurred %s errors, showing first %s:\n' % (err_count, show_count), 'bold') s += '\n'.join(err_list) diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py index 3ffd07e2..477d81a6 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/text_opts.py @@ -133,16 +133,12 @@ def underline(text): # apply attribute on each non-empty line def text_attribute(text, attribute): - if isinstance(text, str): - return "{start}{txt}{stop}".format(start=TEXT_CODES[attribute]['start'], - txt=text, - stop=TEXT_CODES[attribute]['end']) - elif isinstance(text, unicode): - return u"{start}{txt}{stop}".format(start=TEXT_CODES[attribute]['start'], - txt=text, - stop=TEXT_CODES[attribute]['end']) - else: - raise Exception("not a string") + return '\n'.join(['{start}{txt}{end}'.format( + start = TEXT_CODES[attribute]['start'], + txt = line, + end = TEXT_CODES[attribute]['end']) + if line else '' for line in ('%s' % text).split('\n')]) + FUNC_DICT = {'blue': blue, 'bold': bold, -- cgit 1.2.3-korg From 78a3270eda09ba24a7f9f795800df3337f8953bf Mon Sep 17 00:00:00 2001 From: imarom Date: Sun, 29 Jan 2017 18:35:19 +0200 Subject: documenation errors Signed-off-by: imarom --- .../stl/trex_stl_lib/trex_stl_client.py | 32 ++++++++++++---------- 1 file changed, 18 insertions(+), 14 deletions(-) (limited to 'scripts') diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index c46a7d78..215c0253 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -3008,16 +3008,18 @@ class STLClient(object): Starts a low rate packet capturing on the server :parameters: - tx_ports - on which ports to capture TX - rx_ports - on which ports to capture RX - limit - limit how many packets will be written - memory requierment is O(9K * limit) - - mode - 'fixed': when full, newer packets will be - dropped - - 'cyclic: when full, older packets will be - dropped + tx_ports: list + on which ports to capture TX + + rx_ports: list + on which ports to capture RX + + limit: int + limit how many packets will be written memory requierment is O(9K * limit) + + mode: str + 'fixed' - when full, newer packets will be dropped + 'cyclic' - when full, older packets will be dropped :returns: returns a dictionary: @@ -3075,10 +3077,12 @@ class STLClient(object): Stops an active capture and optionally save it to a PCAP file :parameters: - capture_id - an active capture ID to stop - output_filename - output filename to save capture - if None all captured packets - will be discarded + capture_id: int + an active capture ID to stop + + output_filename: str + output filename to save capture + if 'None', all captured packets will be discarded :raises: + :exe:'STLError' -- cgit 1.2.3-korg