From ac2e93d4247b2db94cd07301b274336bb08dec46 Mon Sep 17 00:00:00 2001 From: imarom Date: Wed, 11 Jan 2017 18:19:47 +0200 Subject: capture - draft commit Signed-off-by: imarom --- .../trex_control_plane/stl/console/trex_console.py | 4 +- .../stl/trex_stl_lib/trex_stl_client.py | 143 ++++++++++++-- .../stl/trex_stl_lib/utils/common.py | 16 +- .../stl/trex_stl_lib/utils/parsing_opts.py | 52 +++++- src/rpc-server/commands/trex_rpc_cmd_general.cpp | 118 +++++++++++- src/rpc-server/commands/trex_rpc_cmds.h | 8 +- src/rpc-server/trex_rpc_cmds_table.cpp | 2 +- src/stateless/cp/trex_stateless.cpp | 31 +--- src/stateless/cp/trex_stateless.h | 12 +- src/stateless/cp/trex_stateless_port.h | 15 +- .../messaging/trex_stateless_messaging.cpp | 46 ++++- src/stateless/messaging/trex_stateless_messaging.h | 54 ++++-- src/stateless/rx/trex_stateless_capture.cpp | 142 ++++++++++++-- src/stateless/rx/trex_stateless_capture.h | 205 +++++++++++++++++++-- src/stateless/rx/trex_stateless_rx_core.cpp | 14 -- src/stateless/rx/trex_stateless_rx_core.h | 10 +- 16 files changed, 723 insertions(+), 149 deletions(-) diff --git a/scripts/automation/trex_control_plane/stl/console/trex_console.py b/scripts/automation/trex_control_plane/stl/console/trex_console.py index 38a1fca4..b0ab70e0 100755 --- a/scripts/automation/trex_control_plane/stl/console/trex_console.py +++ b/scripts/automation/trex_control_plane/stl/console/trex_console.py @@ -348,8 +348,8 @@ class TRexConsole(TRexGeneralCmd): @verify_connected def do_capture (self, line): - '''Start PCAP capturing on port''' - self.stateless_client.start_capture_line(line) + '''Manage PCAP captures''' + self.stateless_client.capture_line(line) def help_capture (self): self.do_capture("-h") diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py index 1b57218f..d75c554e 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py @@ -26,6 +26,7 @@ import random import json import traceback import os.path +import argparse ############################ logger ############################# ############################ ############################# @@ -2961,7 +2962,7 @@ class STLClient(object): Resolves ports (ARP resolution) :parameters: - ports - for which ports to apply a unique sniffer (each port gets a unique file) + ports - which ports to resolve retires - how many times to retry on each port (intervals of 100 milliseconds) verbose - log for each request the response :raises: @@ -3022,7 +3023,7 @@ class STLClient(object): self.logger.pre_cmd("Starting PCAP capturing up to {0} packets".format(limit)) - rc = self._transmit("start_capture", params = {'limit': limit, 'tx': tx_ports, 'rx': rx_ports}) + rc = self._transmit("capture", params = {'command': 'start', 'limit': limit, 'tx': tx_ports, 'rx': rx_ports}) self.logger.post_cmd(rc) @@ -3032,24 +3033,82 @@ class STLClient(object): @__api_check(True) - def stop_capture (self, ports = None): + def stop_capture (self, capture_id, output_filename): """ - Removes RX sniffer from port(s) + Stops an active capture + + :parameters: + capture_id - an active capture ID to stop + output_filename - output filename to save capture :raises: + :exe:'STLError' """ - ports = ports if ports is not None else self.get_acquired_ports() - ports = self._validate_port_list(ports) - self.logger.pre_cmd("Removing RX sniffers on port(s) {0}:".format(ports)) - rc = self.__remove_rx_sniffer(ports) + + + # stopping a capture requires: + # 1. stopping + # 2. fetching + # 3. saving to file + + # stop + + self.logger.pre_cmd("Stopping PCAP capture {0}".format(capture_id)) + rc = self._transmit("capture", params = {'command': 'stop', 'capture_id': capture_id}) self.logger.post_cmd(rc) + if not rc: + raise STLError(rc) + + # pkt count + pkt_count = rc.data()['pkt_count'] + + if not output_filename or pkt_count == 0: + return + + self.logger.pre_cmd("Writing {0} packets to '{1}'".format(pkt_count, output_filename)) + + # create a PCAP file + writer = RawPcapWriter(output_filename, linktype = 1) + writer._write_header(None) + + # fetch + while True: + rc = self._transmit("capture", params = {'command': 'fetch', 'capture_id': capture_id, 'pkt_limit': 50}) + if not rc: + self.logger.post_cmd(rc) + raise STLError(rc) + + pkts = rc.data()['pkts'] + for pkt in pkts: + ts = pkt['ts'] + pkt_bin = base64.b64decode(pkt['binary']) + writer._write_packet(pkt_bin, sec = 0, usec = 0) + + if rc.data()['pending'] == 0: + break + + self.logger.post_cmd(rc) + + + # get capture status + @__api_check(True) + def get_capture_status (self): + """ + returns a list of all active captures + each element in the list is an object containing + info about the capture + + """ + + rc = self._transmit("capture", params = {'command': 'status'}) if not rc: raise STLError(rc) + return rc.data() + @__api_check(True) def set_rx_queue (self, ports = None, size = 1000): @@ -3766,23 +3825,71 @@ class STLClient(object): @__console - def start_capture_line (self, line): - '''Starts PCAP recorder on port(s)''' + def capture_line (self, line): + '''Manage PCAP recorders''' - parser = parsing_opts.gen_parser(self, - "capture", - self.start_capture_line.__doc__, - parsing_opts.TX_PORT_LIST, - parsing_opts.RX_PORT_LIST, - parsing_opts.LIMIT) + # default + if not line: + line = "show" + + parser = parsing_opts.gen_parser(self, "capture", self.capture_line.__doc__) + subparsers = parser.add_subparsers(title = "commands", dest="commands") + + # start + start_parser = subparsers.add_parser('start', help = "starts a new capture") + start_parser.add_arg_list(parsing_opts.TX_PORT_LIST, + parsing_opts.RX_PORT_LIST, + parsing_opts.LIMIT) + + # stop + stop_parser = subparsers.add_parser('stop', help = "stops an active capture") + stop_parser.add_arg_list(parsing_opts.CAPTURE_ID, + parsing_opts.OUTPUT_FILENAME) + + # show + show_parser = subparsers.add_parser('show', help = "show all active captures") + + opts = parser.parse_args(line.split()) - opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True) if not opts: return opts - self.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit) + # start + if opts.commands == 'start': + if not opts.tx_port_list and not opts.rx_port_list: + start_parser.formatted_error('please provide either --tx or --rx') + return + + self.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit) + + # stop + elif opts.commands == 'stop': + self.stop_capture(opts.capture_id, opts.output_filename) + + # show + else: + data = self.get_capture_status() + + stats_table = text_tables.TRexTextTable() + stats_table.set_cols_align(["c"] * 6) + stats_table.set_cols_width([15] * 6) + + for elem in data: + row = [elem['id'], + elem['state'], + '[{0}/{1}]'.format(elem['count'], elem['limit']), + format_num(elem['bytes'], suffix = 'B'), + bitfield_to_str(elem['filter']['tx']), + bitfield_to_str(elem['filter']['rx'])] + + stats_table.add_rows([row], header=False) + + stats_table.header(['ID', 'Status', 'Count', 'Bytes', 'TX Ports', 'RX Ports']) + text_tables.print_table_with_header(stats_table, "Captures") + return RC_OK() + @__console diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py index cbbacb27..c386451b 100644 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py @@ -107,4 +107,18 @@ def list_remove_dup (l): return tmp - +def bitfield_to_list (bf): + rc = [] + bitpos = 0 + + while bf > 0: + if bf & 0x1: + rc.append(bitpos) + bitpos += 1 + bf = bf >> 1 + + return rc + +def bitfield_to_str (bf): + lst = bitfield_to_list(bf) + return "-" if not lst else ', '.join([str(x) for x in lst]) diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py index 265c43fb..cb594ef4 100755 --- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py +++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py @@ -69,6 +69,8 @@ RX_PORT_LIST SRC_IPV4 DST_IPV4 +CAPTURE_ID + GLOBAL_STATS PORT_STATS PORT_STATUS @@ -81,12 +83,14 @@ EXTENDED_INC_ZERO_STATS STREAMS_MASK CORE_MASK_GROUP +CAPTURE_PORTS_GROUP # ALL_STREAMS # STREAM_LIST_WITH_ALL # list of ArgumentGroup types MUTEX +NON_MUTEX ''' @@ -392,7 +396,6 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'], {'help': 'Output PCAP filename', 'dest': 'output_filename', 'default': None, - 'required': True, 'type': str}), @@ -612,6 +615,12 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'], 'help': 'A list of ports to capture on the RX side', 'default': []}), + CAPTURE_ID: ArgumentPack(['-i', '--id'], + {'help': "capture ID to remove", + 'dest': "capture_id", + 'type': int, + 'required': True}), + # advanced options PORT_LIST_WITH_ALL: ArgumentGroup(MUTEX, [PORT_LIST, ALL_PORTS], @@ -636,6 +645,7 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'], CORE_MASK], {'required': False}), + CAPTURE_PORTS_GROUP: ArgumentGroup(NON_MUTEX, [TX_PORT_LIST, RX_PORT_LIST], {}), } class _MergeAction(argparse._AppendAction): @@ -654,12 +664,30 @@ class _MergeAction(argparse._AppendAction): class CCmdArgParser(argparse.ArgumentParser): - def __init__(self, stateless_client, *args, **kwargs): + def __init__(self, stateless_client = None, x = None, *args, **kwargs): super(CCmdArgParser, self).__init__(*args, **kwargs) self.stateless_client = stateless_client self.cmd_name = kwargs.get('prog') self.register('action', 'merge', _MergeAction) + + def add_arg_list (self, *args): + populate_parser(self, *args) + + def add_subparsers(self, *args, **kwargs): + sub = super(CCmdArgParser, self).add_subparsers(*args, **kwargs) + + add_parser = sub.add_parser + stateless_client = self.stateless_client + + def add_parser_hook (self, *args, **kwargs): + parser = add_parser(self, *args, **kwargs) + parser.stateless_client = stateless_client + return parser + + sub.add_parser = add_parser_hook + return sub + # hook this to the logger def _print_message(self, message, file=None): self.stateless_client.logger.log(message) @@ -730,13 +758,15 @@ class CCmdArgParser(argparse.ArgumentParser): # recover from system exit scenarios, such as "help", or bad arguments. return RC_ERR("'{0}' - {1}".format(self.cmd_name, "no action")) + def formatted_error (self, msg): + self.print_usage() + self.stateless_client.logger.log(msg) + def get_flags (opt): return OPTIONS_DB[opt].name_or_flags -def gen_parser(stateless_client, op_name, description, *args): - parser = CCmdArgParser(stateless_client, prog=op_name, conflict_handler='resolve', - description=description) +def populate_parser (parser, *args): for param in args: try: @@ -752,6 +782,12 @@ def gen_parser(stateless_client, op_name, description, *args): for sub_argument in argument.args: group.add_argument(*OPTIONS_DB[sub_argument].name_or_flags, **OPTIONS_DB[sub_argument].options) + + elif argument.type == NON_MUTEX: + group = parser.add_argument_group(**argument.options) + for sub_argument in argument.args: + group.add_argument(*OPTIONS_DB[sub_argument].name_or_flags, + **OPTIONS_DB[sub_argument].options) else: # ignore invalid objects continue @@ -764,6 +800,12 @@ def gen_parser(stateless_client, op_name, description, *args): except KeyError as e: cause = e.args[0] raise KeyError("The attribute '{0}' is missing as a field of the {1} option.\n".format(cause, param)) + +def gen_parser(stateless_client, op_name, description, *args): + parser = CCmdArgParser(stateless_client, prog=op_name, conflict_handler='resolve', + description=description) + + populate_parser(parser, *args) return parser diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index ec5c3158..80f69fa3 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -29,6 +29,7 @@ limitations under the License. #include "trex_stateless_rx_core.h" #include "trex_stateless_capture.h" +#include "trex_stateless_messaging.h" #include #include @@ -844,13 +845,38 @@ TrexRpcCmdSetL3::_run(const Json::Value ¶ms, Json::Value &result) { return (TREX_RPC_CMD_OK); } - + + /** - * starts PCAP capturing + * capture command tree * */ trex_rpc_cmd_rc_e -TrexRpcCmdStartCapture::_run(const Json::Value ¶ms, Json::Value &result) { +TrexRpcCmdCapture::_run(const Json::Value ¶ms, Json::Value &result) { + const std::string cmd = parse_choice(params, "command", {"start", "stop", "fetch", "status"}, result); + + if (cmd == "start") { + parse_cmd_start(params, result); + } else if (cmd == "stop") { + parse_cmd_stop(params, result); + } else if (cmd == "fetch") { + parse_cmd_fetch(params, result); + } else if (cmd == "status") { + parse_cmd_status(params, result); + } else { + /* can't happen */ + assert(0); + } + + return TREX_RPC_CMD_OK; +} + +/** + * starts PCAP capturing + * + */ +void +TrexRpcCmdCapture::parse_cmd_start(const Json::Value ¶ms, Json::Value &result) { uint32_t limit = parse_uint32(params, "limit", result); const Json::Value &tx_json = parse_array(params, "tx", result); @@ -881,8 +907,90 @@ TrexRpcCmdStartCapture::_run(const Json::Value ¶ms, Json::Value &result) { } } - get_stateless_obj()->start_capture(filter, limit); + static MsgReply reply; + reply.reset(); + + TrexStatelessRxCaptureStart *start_msg = new TrexStatelessRxCaptureStart(filter, limit, reply); + get_stateless_obj()->send_msg_to_rx(start_msg); + + TrexCaptureRCStart rc = reply.wait_for_reply(); + if (!rc) { + generate_execute_err(result, rc.get_err()); + } result["result"] = Json::objectValue; - return (TREX_RPC_CMD_OK); } + +/** + * stops PCAP capturing + * + */ +void +TrexRpcCmdCapture::parse_cmd_stop(const Json::Value ¶ms, Json::Value &result) { + + uint32_t capture_id = parse_uint32(params, "capture_id", result); + + static MsgReply reply; + reply.reset(); + + TrexStatelessRxCaptureStop *stop_msg = new TrexStatelessRxCaptureStop(capture_id, reply); + get_stateless_obj()->send_msg_to_rx(stop_msg); + + TrexCaptureRCStop rc = reply.wait_for_reply(); + if (!rc) { + generate_execute_err(result, rc.get_err()); + } + + result["result"]["pkt_count"] = rc.get_pkt_count(); +} + +/** + * gets the status of all captures in the system + * + */ +void +TrexRpcCmdCapture::parse_cmd_status(const Json::Value ¶ms, Json::Value &result) { + + /* generate a status command */ + + static MsgReply reply; + reply.reset(); + + TrexStatelessRxCaptureStatus *status_msg = new TrexStatelessRxCaptureStatus(reply); + get_stateless_obj()->send_msg_to_rx(status_msg); + + TrexCaptureRCStatus rc = reply.wait_for_reply(); + if (!rc) { + generate_execute_err(result, rc.get_err()); + } + + result["result"] = rc.get_status(); +} + +/** + * fetch packets from a capture + * + */ +void +TrexRpcCmdCapture::parse_cmd_fetch(const Json::Value ¶ms, Json::Value &result) { + + uint32_t capture_id = parse_uint32(params, "capture_id", result); + uint32_t pkt_limit = parse_uint32(params, "pkt_limit", result); + + /* generate a fetch command */ + + static MsgReply reply; + reply.reset(); + + TrexStatelessRxCaptureFetch *fetch_msg = new TrexStatelessRxCaptureFetch(capture_id, pkt_limit, reply); + get_stateless_obj()->send_msg_to_rx(fetch_msg); + + TrexCaptureRCFetch rc = reply.wait_for_reply(); + if (!rc) { + generate_execute_err(result, rc.get_err()); + } + + result["result"]["pkts"] = rc.get_pkt_buffer()->to_json(); + result["result"]["pending"] = rc.get_pending(); +} + diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h index 1ea63cc7..bf78ff80 100644 --- a/src/rpc-server/commands/trex_rpc_cmds.h +++ b/src/rpc-server/commands/trex_rpc_cmds.h @@ -160,7 +160,13 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdGetRxQueuePkts, "get_rx_queue_pkts", 1, true, APIC TREX_RPC_CMD_DEFINE(TrexRpcCmdSetServiceMode, "service", 2, true, APIClass::API_CLASS_TYPE_CORE); -TREX_RPC_CMD_DEFINE(TrexRpcCmdStartCapture, "start_capture", 3, false, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdCapture, "capture", 1, false, APIClass::API_CLASS_TYPE_CORE, + void parse_cmd_start(const Json::Value &msg, Json::Value &result); + void parse_cmd_stop(const Json::Value &msg, Json::Value &result); + void parse_cmd_status(const Json::Value &msg, Json::Value &result); + void parse_cmd_fetch(const Json::Value &msg, Json::Value &result); +); + #endif /* __TREX_RPC_CMD_H__ */ diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp index 3d4d5a23..2af9f4f5 100644 --- a/src/rpc-server/trex_rpc_cmds_table.cpp +++ b/src/rpc-server/trex_rpc_cmds_table.cpp @@ -79,7 +79,7 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() { register_command(new TrexRpcCmdSetL2()); register_command(new TrexRpcCmdSetL3()); - register_command(new TrexRpcCmdStartCapture()); + register_command(new TrexRpcCmdCapture()); } diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index 32babbf7..6ab9b417 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -19,7 +19,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -//#include #include #include @@ -142,35 +141,11 @@ TrexStateless::get_dp_core_count() { return m_platform_api->get_dp_core_count(); } -capture_id_t -TrexStateless::start_capture(const CaptureFilter &filter, uint64_t limit) { - static MsgReply reply; - - reply.reset(); - - CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0); - TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(filter, limit, reply); - - ring->Enqueue((CGenNode *)msg); - - capture_id_t new_id = reply.wait_for_reply(); - - return (new_id); -} +void +TrexStateless::send_msg_to_rx(TrexStatelessCpToRxMsgBase *msg) const { -capture_id_t -TrexStateless::stop_capture(capture_id_t capture_id) { - static MsgReply reply; - - reply.reset(); - CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0); - TrexStatelessRxStopCapture *msg = new TrexStatelessRxStopCapture(capture_id, reply); - ring->Enqueue((CGenNode *)msg); - - capture_id_t rc = reply.wait_for_reply(); - - return (rc); } + diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h index 33f16ce9..87d227f6 100644 --- a/src/stateless/cp/trex_stateless.h +++ b/src/stateless/cp/trex_stateless.h @@ -102,7 +102,6 @@ public: * defines the TRex stateless operation mode * */ -class CaptureFilter; class TrexStateless { public: @@ -133,16 +132,9 @@ public: /** - * starts a capture on a 'filter' of ports - * with a limit of packets + * send a message to the RX core */ - capture_id_t start_capture(const CaptureFilter &filter, uint64_t limit); - - /** - * stops an active capture - * - */ - capture_id_t stop_capture(capture_id_t capture_id); + void send_msg_to_rx(TrexStatelessCpToRxMsgBase *msg) const; /** * shutdown the server diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index 2cc1b9ca..0ef8ae60 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -140,7 +140,7 @@ public: } if (TrexStatelessCaptureMngr::getInstance().is_active(m_port_id)) { - throw TrexException("unable to disable service - an active capture on port " + std::to_string(m_port_id) + " exists"); + throw TrexException("unable to disable service mode - an active capture on port " + std::to_string(m_port_id) + " exists"); } m_port_attr->set_rx_filter_mode(RX_FILTER_MODE_HW); @@ -439,19 +439,6 @@ public: void get_pci_info(std::string &pci_addr, int &numa_node); - - /** - * starts capturing packets - * - */ - void start_capture(capture_mode_e mode, uint64_t limit); - - /** - * stops capturing packets - * - */ - void stop_capture(); - /** * start RX queueing of packets * diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index f441c692..b9bb1d1c 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -262,24 +262,58 @@ bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) { bool -TrexStatelessRxStartCapture::handle(CRxCoreStateless *rx_core) { - capture_id_t capture_id = rx_core->start_capture(m_limit, m_filter); +TrexStatelessRxCaptureStart::handle(CRxCoreStateless *rx_core) { + + TrexCaptureRCStart start_rc; + + TrexStatelessCaptureMngr::getInstance().start(m_filter, m_limit, start_rc); + + /* mark as done */ + m_reply.set_reply(start_rc); + + return true; +} + +bool +TrexStatelessRxCaptureStop::handle(CRxCoreStateless *rx_core) { + + TrexCaptureRCStop stop_rc; + + TrexStatelessCaptureMngr::getInstance().stop(m_capture_id, stop_rc); + + /* mark as done */ + m_reply.set_reply(stop_rc); + + return true; +} +bool +TrexStatelessRxCaptureFetch::handle(CRxCoreStateless *rx_core) { + + TrexCaptureRCFetch fetch_rc; + + TrexStatelessCaptureMngr::getInstance().fetch(m_capture_id, m_pkt_limit, fetch_rc); + /* mark as done */ - m_reply.set_reply(capture_id); + m_reply.set_reply(fetch_rc); return true; } bool -TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) { - capture_id_t rc = rx_core->stop_capture(m_capture_id); +TrexStatelessRxCaptureStatus::handle(CRxCoreStateless *rx_core) { + + TrexCaptureRCStatus status_rc; - m_reply.set_reply(rc); + status_rc.set_status(TrexStatelessCaptureMngr::getInstance().to_json()); + + /* mark as done */ + m_reply.set_reply(status_rc); return true; } + bool TrexStatelessRxStartQueue::handle(CRxCoreStateless *rx_core) { rx_core->start_queue(m_port_id, m_size); diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index 5f4978f5..4027d075 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -485,12 +485,16 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase { }; +class TrexStatelessRxCapture : public TrexStatelessCpToRxMsgBase { +public: + virtual bool handle (CRxCoreStateless *rx_core) = 0; +}; -class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase { +class TrexStatelessRxCaptureStart : public TrexStatelessRxCapture { public: - TrexStatelessRxStartCapture(const CaptureFilter& filter, + TrexStatelessRxCaptureStart(const CaptureFilter& filter, uint64_t limit, - MsgReply &reply) : m_reply(reply) { + MsgReply &reply) : m_reply(reply) { m_limit = limit; m_filter = filter; @@ -499,24 +503,52 @@ public: virtual bool handle(CRxCoreStateless *rx_core); private: - uint8_t m_port_id; - uint64_t m_limit; - CaptureFilter m_filter; - MsgReply &m_reply; + uint8_t m_port_id; + uint64_t m_limit; + CaptureFilter m_filter; + MsgReply &m_reply; +}; + + +class TrexStatelessRxCaptureStop : public TrexStatelessRxCapture { +public: + TrexStatelessRxCaptureStop(capture_id_t capture_id, MsgReply &reply) : m_reply(reply) { + m_capture_id = capture_id; + } + + virtual bool handle(CRxCoreStateless *rx_core); + +private: + capture_id_t m_capture_id; + MsgReply &m_reply; }; -class TrexStatelessRxStopCapture : public TrexStatelessCpToRxMsgBase { +class TrexStatelessRxCaptureFetch : public TrexStatelessRxCapture { public: - TrexStatelessRxStopCapture(capture_id_t capture_id, MsgReply &reply) : m_reply(reply) { + TrexStatelessRxCaptureFetch(capture_id_t capture_id, uint32_t pkt_limit, MsgReply &reply) : m_reply(reply) { m_capture_id = capture_id; + m_pkt_limit = pkt_limit; + } + + virtual bool handle(CRxCoreStateless *rx_core); + +private: + capture_id_t m_capture_id; + uint32_t m_pkt_limit; + MsgReply &m_reply; +}; + + +class TrexStatelessRxCaptureStatus : public TrexStatelessRxCapture { +public: + TrexStatelessRxCaptureStatus(MsgReply &reply) : m_reply(reply) { } virtual bool handle(CRxCoreStateless *rx_core); private: - capture_id_t m_capture_id; - MsgReply &m_reply; + MsgReply &m_reply; }; diff --git a/src/stateless/rx/trex_stateless_capture.cpp b/src/stateless/rx/trex_stateless_capture.cpp index 4ed126cc..85be7aef 100644 --- a/src/stateless/rx/trex_stateless_capture.cpp +++ b/src/stateless/rx/trex_stateless_capture.cpp @@ -25,6 +25,7 @@ TrexStatelessCapture::TrexStatelessCapture(capture_id_t id, uint64_t limit, cons m_id = id; m_pkt_buffer = new TrexPktBuffer(limit, TrexPktBuffer::MODE_DROP_TAIL); m_filter = filter; + m_state = STATE_ACTIVE; } TrexStatelessCapture::~TrexStatelessCapture() { @@ -35,9 +36,15 @@ TrexStatelessCapture::~TrexStatelessCapture() { void TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) { + + if (m_state != STATE_ACTIVE) { + delete pkt; + return; + } /* if not in filter - back off */ if (!m_filter.in_filter(pkt)) { + delete pkt; return; } @@ -46,6 +53,11 @@ TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) { void TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) { + + if (m_state != STATE_ACTIVE) { + return; + } + if (!m_filter.in_rx(port)) { return; } @@ -53,6 +65,56 @@ TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) { m_pkt_buffer->push(m); } + +Json::Value +TrexStatelessCapture::to_json() const { + Json::Value output = Json::objectValue; + + output["id"] = Json::UInt64(m_id); + output["filter"] = m_filter.to_json(); + output["count"] = m_pkt_buffer->get_element_count(); + output["bytes"] = m_pkt_buffer->get_bytes(); + output["limit"] = m_pkt_buffer->get_capacity(); + + switch (m_state) { + case STATE_ACTIVE: + output["state"] = "ACTIVE"; + break; + + case STATE_STOPPED: + output["state"] = "STOPPED"; + break; + + default: + assert(0); + + } + + return output; +} + +TrexPktBuffer * +TrexStatelessCapture::fetch(uint32_t pkt_limit, uint32_t &pending) { + + /* if the total sum of packets is within the limit range - take it */ + if (m_pkt_buffer->get_element_count() <= pkt_limit) { + TrexPktBuffer *current = m_pkt_buffer; + m_pkt_buffer = new TrexPktBuffer(m_pkt_buffer->get_capacity(), m_pkt_buffer->get_mode()); + pending = 0; + return current; + } + + /* harder part - partial fetch */ + TrexPktBuffer *partial = new TrexPktBuffer(pkt_limit); + for (int i = 0; i < pkt_limit; i++) { + const TrexPkt *pkt = m_pkt_buffer->pop(); + partial->push(pkt); + } + + pending = m_pkt_buffer->get_element_count(); + return partial; +} + void TrexStatelessCaptureMngr::update_global_filter() { CaptureFilter new_filter; @@ -64,11 +126,25 @@ TrexStatelessCaptureMngr::update_global_filter() { m_global_filter = new_filter; } -capture_id_t -TrexStatelessCaptureMngr::add(uint64_t limit, const CaptureFilter &filter) { +TrexStatelessCapture * +TrexStatelessCaptureMngr::lookup(capture_id_t capture_id) { + for (int i = 0; i < m_captures.size(); i++) { + if (m_captures[i]->get_id() == capture_id) { + return m_captures[i]; + } + } + + /* does not exist */ + return nullptr; +} + +void +TrexStatelessCaptureMngr::start(const CaptureFilter &filter, uint64_t limit, TrexCaptureRCStart &rc) { + if (m_captures.size() > MAX_CAPTURE_SIZE) { - return CAPTURE_TOO_MANY_CAPTURES; + rc.set_err(TrexCaptureRC::RC_CAPTURE_LIMIT_REACHED); + return; } @@ -79,15 +155,46 @@ TrexStatelessCaptureMngr::add(uint64_t limit, const CaptureFilter &filter) { /* update global filter */ update_global_filter(); - return new_id; + /* result */ + rc.set_new_id(new_id); +} + +void +TrexStatelessCaptureMngr::stop(capture_id_t capture_id, TrexCaptureRCStop &rc) { + TrexStatelessCapture *capture = lookup(capture_id); + if (!capture) { + rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND); + return; + } + + capture->stop(); + rc.set_count(capture->get_pkt_count()); } -capture_id_t -TrexStatelessCaptureMngr::remove(capture_id_t id) { +void +TrexStatelessCaptureMngr::fetch(capture_id_t capture_id, uint32_t pkt_limit, TrexCaptureRCFetch &rc) { + TrexStatelessCapture *capture = lookup(capture_id); + if (!capture) { + rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND); + return; + } + if (capture->is_active()) { + rc.set_err(TrexCaptureRC::RC_CAPTURE_FETCH_UNDER_ACTIVE); + return; + } + uint32_t pending = 0; + TrexPktBuffer *pkt_buffer = capture->fetch(pkt_limit, pending); + + rc.set_pkt_buffer(pkt_buffer, pending); +} + +void +TrexStatelessCaptureMngr::remove(capture_id_t capture_id, TrexCaptureRCRemove &rc) { + int index = -1; for (int i = 0; i < m_captures.size(); i++) { - if (m_captures[i]->get_id() == id) { + if (m_captures[i]->get_id() == capture_id) { index = i; break; } @@ -95,24 +202,26 @@ TrexStatelessCaptureMngr::remove(capture_id_t id) { /* does not exist */ if (index == -1) { - return CAPTURE_ID_NOT_FOUND; + rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND); + return; } TrexStatelessCapture *capture = m_captures[index]; m_captures.erase(m_captures.begin() + index); + /* free memory */ delete capture; /* update global filter */ update_global_filter(); - - return id; } void TrexStatelessCaptureMngr::reset() { + TrexCaptureRCRemove dummy; + while (m_captures.size() > 0) { - remove(m_captures[0]->get_id()); + remove(m_captures[0]->get_id(), dummy); } } @@ -130,3 +239,14 @@ TrexStatelessCaptureMngr::handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port) } } +Json::Value +TrexStatelessCaptureMngr::to_json() const { + Json::Value lst = Json::arrayValue; + + for (TrexStatelessCapture *capture : m_captures) { + lst.append(capture->to_json()); + } + + return lst; +} + diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h index 4d0b6a78..6cd25a94 100644 --- a/src/stateless/rx/trex_stateless_capture.h +++ b/src/stateless/rx/trex_stateless_capture.h @@ -22,7 +22,146 @@ limitations under the License. #define __TREX_STATELESS_CAPTURE_H__ #include +#include + #include "trex_stateless_pkt.h" +#include "trex_stateless_capture_msg.h" + +typedef int64_t capture_id_t; + +class TrexCaptureRC { +public: + + TrexCaptureRC() { + m_rc = RC_INVALID; + m_pkt_buffer = NULL; + } + + enum rc_e { + RC_INVALID = 0, + RC_OK = 1, + RC_CAPTURE_NOT_FOUND, + RC_CAPTURE_LIMIT_REACHED, + RC_CAPTURE_FETCH_UNDER_ACTIVE + }; + + bool operator !() const { + return (m_rc != RC_OK); + } + + std::string get_err() const { + assert(m_rc != RC_INVALID); + + switch (m_rc) { + case RC_OK: + return ""; + case RC_CAPTURE_LIMIT_REACHED: + return "capture limit has reached"; + case RC_CAPTURE_NOT_FOUND: + return "capture ID not found"; + case RC_CAPTURE_FETCH_UNDER_ACTIVE: + return "fetch command cannot be executed on an active capture"; + default: + assert(0); + } + } + + void set_err(rc_e rc) { + m_rc = rc; + } + + Json::Value get_json() const { + return m_json_rc; + } + +public: + rc_e m_rc; + capture_id_t m_capture_id; + TrexPktBuffer *m_pkt_buffer; + Json::Value m_json_rc; +}; + +class TrexCaptureRCStart : public TrexCaptureRC { +public: + + void set_new_id(capture_id_t new_id) { + m_capture_id = new_id; + m_rc = RC_OK; + } + + capture_id_t get_new_id() const { + return m_capture_id; + } + +private: + capture_id_t m_capture_id; +}; + + +class TrexCaptureRCStop : public TrexCaptureRC { +public: + void set_count(uint32_t pkt_count) { + m_pkt_count = pkt_count; + m_rc = RC_OK; + } + + uint32_t get_pkt_count() const { + return m_pkt_count; + } + +private: + uint32_t m_pkt_count; +}; + +class TrexCaptureRCFetch : public TrexCaptureRC { +public: + + TrexCaptureRCFetch() { + m_pkt_buffer = nullptr; + m_pending = 0; + } + + void set_pkt_buffer(const TrexPktBuffer *pkt_buffer, uint32_t pending) { + m_pkt_buffer = pkt_buffer; + m_pending = pending; + m_rc = RC_OK; + } + + const TrexPktBuffer *get_pkt_buffer() const { + return m_pkt_buffer; + } + + uint32_t get_pending() const { + return m_pending; + } + +private: + const TrexPktBuffer *m_pkt_buffer; + uint32_t m_pending; +}; + +class TrexCaptureRCRemove : public TrexCaptureRC { +public: + void set_ok() { + m_rc = RC_OK; + } +}; + +class TrexCaptureRCStatus : public TrexCaptureRC { +public: + + void set_status(const Json::Value &json) { + m_json = json; + m_rc = RC_OK; + } + + const Json::Value & get_status() const { + return m_json; + } + +private: + Json::Value m_json; +}; /** * capture filter @@ -82,20 +221,27 @@ public: return *this; } + Json::Value to_json() const { + Json::Value output = Json::objectValue; + output["tx"] = Json::UInt64(m_tx_active); + output["rx"] = Json::UInt64(m_rx_active); + + return output; + } + private: uint64_t m_tx_active; uint64_t m_rx_active; }; -typedef int64_t capture_id_t; -enum { - CAPTURE_ID_NOT_FOUND = -1, - CAPTURE_TOO_MANY_CAPTURES = -2, -}; class TrexStatelessCapture { public: + enum state_e { + STATE_ACTIVE, + STATE_STOPPED, + }; TrexStatelessCapture(capture_id_t id, uint64_t limit, const CaptureFilter &filter); @@ -112,7 +258,24 @@ public: return m_filter; } + Json::Value to_json() const; + + void stop() { + m_state = STATE_STOPPED; + } + + TrexPktBuffer * fetch(uint32_t pkt_limit, uint32_t &pending); + + bool is_active() const { + return m_state == STATE_ACTIVE; + } + + uint32_t get_pkt_count() const { + return m_pkt_buffer->get_element_count(); + } + private: + state_e m_state; TrexPktBuffer *m_pkt_buffer; CaptureFilter m_filter; uint64_t m_id; @@ -134,18 +297,28 @@ public: } /** - * adds a capture buffer - * returns ID + * starts a new capture */ - capture_id_t add(uint64_t limit, const CaptureFilter &filter); + void start(const CaptureFilter &filter, uint64_t limit, TrexCaptureRCStart &rc); - /** - * stops capture mode - * on success, will return the ID of the removed one - * o.w it will be an error + * stops an existing capture + * */ - capture_id_t remove(capture_id_t id); + void stop(capture_id_t capture_id, TrexCaptureRCStop &rc); + + /** + * fetch packets from an existing capture + * + */ + void fetch(capture_id_t capture_id, uint32_t pkt_limit, TrexCaptureRCFetch &rc); + + /** + * removes an existing capture + * all packets captured will be detroyed + */ + void remove(capture_id_t capture_id, TrexCaptureRCRemove &rc); + /** * removes all captures @@ -153,6 +326,7 @@ public: */ void reset(); + /** * return true if any filter is active * @@ -182,6 +356,8 @@ public: handle_pkt_rx_slow_path(m, port); } + Json::Value to_json() const; + private: TrexStatelessCaptureMngr() { @@ -189,6 +365,9 @@ private: m_id_counter = 1; } + + TrexStatelessCapture * lookup(capture_id_t capture_id); + void handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port); void update_global_filter(); diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index f1ba303a..00c18082 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -270,10 +270,6 @@ void CRxCoreStateless::start() { m_monitor.disable(); } -void CRxCoreStateless::capture_pkt(rte_mbuf_t *m) { - -} - int CRxCoreStateless::process_all_pending_pkts(bool flush_rx) { int total_pkts = 0; @@ -332,16 +328,6 @@ double CRxCoreStateless::get_cpu_util() { } -capture_id_t -CRxCoreStateless::start_capture(uint64_t limit, const CaptureFilter &filter) { - return TrexStatelessCaptureMngr::getInstance().add(limit, filter); -} - -capture_id_t -CRxCoreStateless::stop_capture(capture_id_t capture_id) { - return TrexStatelessCaptureMngr::getInstance().remove(capture_id); -} - void CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size) { m_rx_port_mngr[port_id].start_queue(size); diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index 21ed51ba..954a5f04 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -131,14 +131,7 @@ class CRxCoreStateless { const TrexPktBuffer *get_rx_queue_pkts(uint8_t port_id) { return m_rx_port_mngr[port_id].get_pkt_buffer(); } - - /** - * start capturing packets - * - */ - capture_id_t start_capture(uint64_t limit, const CaptureFilter &filter); - capture_id_t stop_capture(capture_id_t capture_id); - + /** * start RX queueing of packets * @@ -175,7 +168,6 @@ class CRxCoreStateless { void recalculate_next_state(); bool are_any_features_active(); - void capture_pkt(rte_mbuf_t *m); void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r); void handle_work_stage(); void handle_grat_arp(); -- cgit 1.2.3-korg