summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2017-01-11 18:19:47 +0200
committerimarom <imarom@cisco.com>2017-01-11 18:19:47 +0200
commitac2e93d4247b2db94cd07301b274336bb08dec46 (patch)
tree8dfe8250526cd797ab9af46f4b54cfbec0832fc0
parent5257dbb8253fe5b70b75f9c064c4593ca7aee99f (diff)
capture - draft commit
Signed-off-by: imarom <imarom@cisco.com>
-rwxr-xr-xscripts/automation/trex_control_plane/stl/console/trex_console.py4
-rwxr-xr-xscripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py143
-rw-r--r--scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py16
-rwxr-xr-xscripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py52
-rw-r--r--src/rpc-server/commands/trex_rpc_cmd_general.cpp118
-rw-r--r--src/rpc-server/commands/trex_rpc_cmds.h8
-rw-r--r--src/rpc-server/trex_rpc_cmds_table.cpp2
-rw-r--r--src/stateless/cp/trex_stateless.cpp31
-rw-r--r--src/stateless/cp/trex_stateless.h12
-rw-r--r--src/stateless/cp/trex_stateless_port.h15
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp46
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h54
-rw-r--r--src/stateless/rx/trex_stateless_capture.cpp142
-rw-r--r--src/stateless/rx/trex_stateless_capture.h205
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.cpp14
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.h10
16 files changed, 723 insertions, 149 deletions
diff --git a/scripts/automation/trex_control_plane/stl/console/trex_console.py b/scripts/automation/trex_control_plane/stl/console/trex_console.py
index 38a1fca4..b0ab70e0 100755
--- a/scripts/automation/trex_control_plane/stl/console/trex_console.py
+++ b/scripts/automation/trex_control_plane/stl/console/trex_console.py
@@ -348,8 +348,8 @@ class TRexConsole(TRexGeneralCmd):
@verify_connected
def do_capture (self, line):
- '''Start PCAP capturing on port'''
- self.stateless_client.start_capture_line(line)
+ '''Manage PCAP captures'''
+ self.stateless_client.capture_line(line)
def help_capture (self):
self.do_capture("-h")
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
index 1b57218f..d75c554e 100755
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/trex_stl_client.py
@@ -26,6 +26,7 @@ import random
import json
import traceback
import os.path
+import argparse
############################ logger #############################
############################ #############################
@@ -2961,7 +2962,7 @@ class STLClient(object):
Resolves ports (ARP resolution)
:parameters:
- ports - for which ports to apply a unique sniffer (each port gets a unique file)
+ ports - which ports to resolve
retires - how many times to retry on each port (intervals of 100 milliseconds)
verbose - log for each request the response
:raises:
@@ -3022,7 +3023,7 @@ class STLClient(object):
self.logger.pre_cmd("Starting PCAP capturing up to {0} packets".format(limit))
- rc = self._transmit("start_capture", params = {'limit': limit, 'tx': tx_ports, 'rx': rx_ports})
+ rc = self._transmit("capture", params = {'command': 'start', 'limit': limit, 'tx': tx_ports, 'rx': rx_ports})
self.logger.post_cmd(rc)
@@ -3032,24 +3033,82 @@ class STLClient(object):
@__api_check(True)
- def stop_capture (self, ports = None):
+ def stop_capture (self, capture_id, output_filename):
"""
- Removes RX sniffer from port(s)
+ Stops an active capture
+
+ :parameters:
+ capture_id - an active capture ID to stop
+ output_filename - output filename to save capture
:raises:
+ :exe:'STLError'
"""
- ports = ports if ports is not None else self.get_acquired_ports()
- ports = self._validate_port_list(ports)
- self.logger.pre_cmd("Removing RX sniffers on port(s) {0}:".format(ports))
- rc = self.__remove_rx_sniffer(ports)
+
+
+ # stopping a capture requires:
+ # 1. stopping
+ # 2. fetching
+ # 3. saving to file
+
+ # stop
+
+ self.logger.pre_cmd("Stopping PCAP capture {0}".format(capture_id))
+ rc = self._transmit("capture", params = {'command': 'stop', 'capture_id': capture_id})
self.logger.post_cmd(rc)
+ if not rc:
+ raise STLError(rc)
+
+ # pkt count
+ pkt_count = rc.data()['pkt_count']
+
+ if not output_filename or pkt_count == 0:
+ return
+
+ self.logger.pre_cmd("Writing {0} packets to '{1}'".format(pkt_count, output_filename))
+
+ # create a PCAP file
+ writer = RawPcapWriter(output_filename, linktype = 1)
+ writer._write_header(None)
+
+ # fetch
+ while True:
+ rc = self._transmit("capture", params = {'command': 'fetch', 'capture_id': capture_id, 'pkt_limit': 50})
+ if not rc:
+ self.logger.post_cmd(rc)
+ raise STLError(rc)
+
+ pkts = rc.data()['pkts']
+ for pkt in pkts:
+ ts = pkt['ts']
+ pkt_bin = base64.b64decode(pkt['binary'])
+ writer._write_packet(pkt_bin, sec = 0, usec = 0)
+
+ if rc.data()['pending'] == 0:
+ break
+
+ self.logger.post_cmd(rc)
+
+
+ # get capture status
+ @__api_check(True)
+ def get_capture_status (self):
+ """
+ returns a list of all active captures
+ each element in the list is an object containing
+ info about the capture
+
+ """
+
+ rc = self._transmit("capture", params = {'command': 'status'})
if not rc:
raise STLError(rc)
+ return rc.data()
+
@__api_check(True)
def set_rx_queue (self, ports = None, size = 1000):
@@ -3766,23 +3825,71 @@ class STLClient(object):
@__console
- def start_capture_line (self, line):
- '''Starts PCAP recorder on port(s)'''
+ def capture_line (self, line):
+ '''Manage PCAP recorders'''
- parser = parsing_opts.gen_parser(self,
- "capture",
- self.start_capture_line.__doc__,
- parsing_opts.TX_PORT_LIST,
- parsing_opts.RX_PORT_LIST,
- parsing_opts.LIMIT)
+ # default
+ if not line:
+ line = "show"
+
+ parser = parsing_opts.gen_parser(self, "capture", self.capture_line.__doc__)
+ subparsers = parser.add_subparsers(title = "commands", dest="commands")
+
+ # start
+ start_parser = subparsers.add_parser('start', help = "starts a new capture")
+ start_parser.add_arg_list(parsing_opts.TX_PORT_LIST,
+ parsing_opts.RX_PORT_LIST,
+ parsing_opts.LIMIT)
+
+ # stop
+ stop_parser = subparsers.add_parser('stop', help = "stops an active capture")
+ stop_parser.add_arg_list(parsing_opts.CAPTURE_ID,
+ parsing_opts.OUTPUT_FILENAME)
+
+ # show
+ show_parser = subparsers.add_parser('show', help = "show all active captures")
+
+ opts = parser.parse_args(line.split())
- opts = parser.parse_args(line.split(), default_ports = self.get_acquired_ports(), verify_acquired = True)
if not opts:
return opts
- self.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit)
+ # start
+ if opts.commands == 'start':
+ if not opts.tx_port_list and not opts.rx_port_list:
+ start_parser.formatted_error('please provide either --tx or --rx')
+ return
+
+ self.start_capture(opts.tx_port_list, opts.rx_port_list, opts.limit)
+
+ # stop
+ elif opts.commands == 'stop':
+ self.stop_capture(opts.capture_id, opts.output_filename)
+
+ # show
+ else:
+ data = self.get_capture_status()
+
+ stats_table = text_tables.TRexTextTable()
+ stats_table.set_cols_align(["c"] * 6)
+ stats_table.set_cols_width([15] * 6)
+
+ for elem in data:
+ row = [elem['id'],
+ elem['state'],
+ '[{0}/{1}]'.format(elem['count'], elem['limit']),
+ format_num(elem['bytes'], suffix = 'B'),
+ bitfield_to_str(elem['filter']['tx']),
+ bitfield_to_str(elem['filter']['rx'])]
+
+ stats_table.add_rows([row], header=False)
+
+ stats_table.header(['ID', 'Status', 'Count', 'Bytes', 'TX Ports', 'RX Ports'])
+ text_tables.print_table_with_header(stats_table, "Captures")
+
return RC_OK()
+
@__console
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py
index cbbacb27..c386451b 100644
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/common.py
@@ -107,4 +107,18 @@ def list_remove_dup (l):
return tmp
-
+def bitfield_to_list (bf):
+ rc = []
+ bitpos = 0
+
+ while bf > 0:
+ if bf & 0x1:
+ rc.append(bitpos)
+ bitpos += 1
+ bf = bf >> 1
+
+ return rc
+
+def bitfield_to_str (bf):
+ lst = bitfield_to_list(bf)
+ return "-" if not lst else ', '.join([str(x) for x in lst])
diff --git a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py
index 265c43fb..cb594ef4 100755
--- a/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py
+++ b/scripts/automation/trex_control_plane/stl/trex_stl_lib/utils/parsing_opts.py
@@ -69,6 +69,8 @@ RX_PORT_LIST
SRC_IPV4
DST_IPV4
+CAPTURE_ID
+
GLOBAL_STATS
PORT_STATS
PORT_STATUS
@@ -81,12 +83,14 @@ EXTENDED_INC_ZERO_STATS
STREAMS_MASK
CORE_MASK_GROUP
+CAPTURE_PORTS_GROUP
# ALL_STREAMS
# STREAM_LIST_WITH_ALL
# list of ArgumentGroup types
MUTEX
+NON_MUTEX
'''
@@ -392,7 +396,6 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'],
{'help': 'Output PCAP filename',
'dest': 'output_filename',
'default': None,
- 'required': True,
'type': str}),
@@ -612,6 +615,12 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'],
'help': 'A list of ports to capture on the RX side',
'default': []}),
+ CAPTURE_ID: ArgumentPack(['-i', '--id'],
+ {'help': "capture ID to remove",
+ 'dest': "capture_id",
+ 'type': int,
+ 'required': True}),
+
# advanced options
PORT_LIST_WITH_ALL: ArgumentGroup(MUTEX, [PORT_LIST,
ALL_PORTS],
@@ -636,6 +645,7 @@ OPTIONS_DB = {MULTIPLIER: ArgumentPack(['-m', '--multiplier'],
CORE_MASK],
{'required': False}),
+ CAPTURE_PORTS_GROUP: ArgumentGroup(NON_MUTEX, [TX_PORT_LIST, RX_PORT_LIST], {}),
}
class _MergeAction(argparse._AppendAction):
@@ -654,12 +664,30 @@ class _MergeAction(argparse._AppendAction):
class CCmdArgParser(argparse.ArgumentParser):
- def __init__(self, stateless_client, *args, **kwargs):
+ def __init__(self, stateless_client = None, x = None, *args, **kwargs):
super(CCmdArgParser, self).__init__(*args, **kwargs)
self.stateless_client = stateless_client
self.cmd_name = kwargs.get('prog')
self.register('action', 'merge', _MergeAction)
+
+ def add_arg_list (self, *args):
+ populate_parser(self, *args)
+
+ def add_subparsers(self, *args, **kwargs):
+ sub = super(CCmdArgParser, self).add_subparsers(*args, **kwargs)
+
+ add_parser = sub.add_parser
+ stateless_client = self.stateless_client
+
+ def add_parser_hook (self, *args, **kwargs):
+ parser = add_parser(self, *args, **kwargs)
+ parser.stateless_client = stateless_client
+ return parser
+
+ sub.add_parser = add_parser_hook
+ return sub
+
# hook this to the logger
def _print_message(self, message, file=None):
self.stateless_client.logger.log(message)
@@ -730,13 +758,15 @@ class CCmdArgParser(argparse.ArgumentParser):
# recover from system exit scenarios, such as "help", or bad arguments.
return RC_ERR("'{0}' - {1}".format(self.cmd_name, "no action"))
+ def formatted_error (self, msg):
+ self.print_usage()
+ self.stateless_client.logger.log(msg)
+
def get_flags (opt):
return OPTIONS_DB[opt].name_or_flags
-def gen_parser(stateless_client, op_name, description, *args):
- parser = CCmdArgParser(stateless_client, prog=op_name, conflict_handler='resolve',
- description=description)
+def populate_parser (parser, *args):
for param in args:
try:
@@ -752,6 +782,12 @@ def gen_parser(stateless_client, op_name, description, *args):
for sub_argument in argument.args:
group.add_argument(*OPTIONS_DB[sub_argument].name_or_flags,
**OPTIONS_DB[sub_argument].options)
+
+ elif argument.type == NON_MUTEX:
+ group = parser.add_argument_group(**argument.options)
+ for sub_argument in argument.args:
+ group.add_argument(*OPTIONS_DB[sub_argument].name_or_flags,
+ **OPTIONS_DB[sub_argument].options)
else:
# ignore invalid objects
continue
@@ -764,6 +800,12 @@ def gen_parser(stateless_client, op_name, description, *args):
except KeyError as e:
cause = e.args[0]
raise KeyError("The attribute '{0}' is missing as a field of the {1} option.\n".format(cause, param))
+
+def gen_parser(stateless_client, op_name, description, *args):
+ parser = CCmdArgParser(stateless_client, prog=op_name, conflict_handler='resolve',
+ description=description)
+
+ populate_parser(parser, *args)
return parser
diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
index ec5c3158..80f69fa3 100644
--- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp
+++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp
@@ -29,6 +29,7 @@ limitations under the License.
#include "trex_stateless_rx_core.h"
#include "trex_stateless_capture.h"
+#include "trex_stateless_messaging.h"
#include <fstream>
#include <iostream>
@@ -844,13 +845,38 @@ TrexRpcCmdSetL3::_run(const Json::Value &params, Json::Value &result) {
return (TREX_RPC_CMD_OK);
}
-
+
+
/**
- * starts PCAP capturing
+ * capture command tree
*
*/
trex_rpc_cmd_rc_e
-TrexRpcCmdStartCapture::_run(const Json::Value &params, Json::Value &result) {
+TrexRpcCmdCapture::_run(const Json::Value &params, Json::Value &result) {
+ const std::string cmd = parse_choice(params, "command", {"start", "stop", "fetch", "status"}, result);
+
+ if (cmd == "start") {
+ parse_cmd_start(params, result);
+ } else if (cmd == "stop") {
+ parse_cmd_stop(params, result);
+ } else if (cmd == "fetch") {
+ parse_cmd_fetch(params, result);
+ } else if (cmd == "status") {
+ parse_cmd_status(params, result);
+ } else {
+ /* can't happen */
+ assert(0);
+ }
+
+ return TREX_RPC_CMD_OK;
+}
+
+/**
+ * starts PCAP capturing
+ *
+ */
+void
+TrexRpcCmdCapture::parse_cmd_start(const Json::Value &params, Json::Value &result) {
uint32_t limit = parse_uint32(params, "limit", result);
const Json::Value &tx_json = parse_array(params, "tx", result);
@@ -881,8 +907,90 @@ TrexRpcCmdStartCapture::_run(const Json::Value &params, Json::Value &result) {
}
}
- get_stateless_obj()->start_capture(filter, limit);
+ static MsgReply<TrexCaptureRCStart> reply;
+ reply.reset();
+
+ TrexStatelessRxCaptureStart *start_msg = new TrexStatelessRxCaptureStart(filter, limit, reply);
+ get_stateless_obj()->send_msg_to_rx(start_msg);
+
+ TrexCaptureRCStart rc = reply.wait_for_reply();
+ if (!rc) {
+ generate_execute_err(result, rc.get_err());
+ }
result["result"] = Json::objectValue;
- return (TREX_RPC_CMD_OK);
}
+
+/**
+ * stops PCAP capturing
+ *
+ */
+void
+TrexRpcCmdCapture::parse_cmd_stop(const Json::Value &params, Json::Value &result) {
+
+ uint32_t capture_id = parse_uint32(params, "capture_id", result);
+
+ static MsgReply<TrexCaptureRCStop> reply;
+ reply.reset();
+
+ TrexStatelessRxCaptureStop *stop_msg = new TrexStatelessRxCaptureStop(capture_id, reply);
+ get_stateless_obj()->send_msg_to_rx(stop_msg);
+
+ TrexCaptureRCStop rc = reply.wait_for_reply();
+ if (!rc) {
+ generate_execute_err(result, rc.get_err());
+ }
+
+ result["result"]["pkt_count"] = rc.get_pkt_count();
+}
+
+/**
+ * gets the status of all captures in the system
+ *
+ */
+void
+TrexRpcCmdCapture::parse_cmd_status(const Json::Value &params, Json::Value &result) {
+
+ /* generate a status command */
+
+ static MsgReply<TrexCaptureRCStatus> reply;
+ reply.reset();
+
+ TrexStatelessRxCaptureStatus *status_msg = new TrexStatelessRxCaptureStatus(reply);
+ get_stateless_obj()->send_msg_to_rx(status_msg);
+
+ TrexCaptureRCStatus rc = reply.wait_for_reply();
+ if (!rc) {
+ generate_execute_err(result, rc.get_err());
+ }
+
+ result["result"] = rc.get_status();
+}
+
+/**
+ * fetch packets from a capture
+ *
+ */
+void
+TrexRpcCmdCapture::parse_cmd_fetch(const Json::Value &params, Json::Value &result) {
+
+ uint32_t capture_id = parse_uint32(params, "capture_id", result);
+ uint32_t pkt_limit = parse_uint32(params, "pkt_limit", result);
+
+ /* generate a fetch command */
+
+ static MsgReply<TrexCaptureRCFetch> reply;
+ reply.reset();
+
+ TrexStatelessRxCaptureFetch *fetch_msg = new TrexStatelessRxCaptureFetch(capture_id, pkt_limit, reply);
+ get_stateless_obj()->send_msg_to_rx(fetch_msg);
+
+ TrexCaptureRCFetch rc = reply.wait_for_reply();
+ if (!rc) {
+ generate_execute_err(result, rc.get_err());
+ }
+
+ result["result"]["pkts"] = rc.get_pkt_buffer()->to_json();
+ result["result"]["pending"] = rc.get_pending();
+}
+
diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h
index 1ea63cc7..bf78ff80 100644
--- a/src/rpc-server/commands/trex_rpc_cmds.h
+++ b/src/rpc-server/commands/trex_rpc_cmds.h
@@ -160,7 +160,13 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdGetRxQueuePkts, "get_rx_queue_pkts", 1, true, APIC
TREX_RPC_CMD_DEFINE(TrexRpcCmdSetServiceMode, "service", 2, true, APIClass::API_CLASS_TYPE_CORE);
-TREX_RPC_CMD_DEFINE(TrexRpcCmdStartCapture, "start_capture", 3, false, APIClass::API_CLASS_TYPE_CORE);
+TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdCapture, "capture", 1, false, APIClass::API_CLASS_TYPE_CORE,
+ void parse_cmd_start(const Json::Value &msg, Json::Value &result);
+ void parse_cmd_stop(const Json::Value &msg, Json::Value &result);
+ void parse_cmd_status(const Json::Value &msg, Json::Value &result);
+ void parse_cmd_fetch(const Json::Value &msg, Json::Value &result);
+);
+
#endif /* __TREX_RPC_CMD_H__ */
diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp
index 3d4d5a23..2af9f4f5 100644
--- a/src/rpc-server/trex_rpc_cmds_table.cpp
+++ b/src/rpc-server/trex_rpc_cmds_table.cpp
@@ -79,7 +79,7 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() {
register_command(new TrexRpcCmdSetL2());
register_command(new TrexRpcCmdSetL3());
- register_command(new TrexRpcCmdStartCapture());
+ register_command(new TrexRpcCmdCapture());
}
diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp
index 32babbf7..6ab9b417 100644
--- a/src/stateless/cp/trex_stateless.cpp
+++ b/src/stateless/cp/trex_stateless.cpp
@@ -19,7 +19,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-//#include <sched.h>
#include <iostream>
#include <unistd.h>
@@ -142,35 +141,11 @@ TrexStateless::get_dp_core_count() {
return m_platform_api->get_dp_core_count();
}
-capture_id_t
-TrexStateless::start_capture(const CaptureFilter &filter, uint64_t limit) {
- static MsgReply<capture_id_t> reply;
-
- reply.reset();
-
- CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
- TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(filter, limit, reply);
-
- ring->Enqueue((CGenNode *)msg);
-
- capture_id_t new_id = reply.wait_for_reply();
-
- return (new_id);
-}
+void
+TrexStateless::send_msg_to_rx(TrexStatelessCpToRxMsgBase *msg) const {
-capture_id_t
-TrexStateless::stop_capture(capture_id_t capture_id) {
- static MsgReply<capture_id_t> reply;
-
- reply.reset();
-
CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
- TrexStatelessRxStopCapture *msg = new TrexStatelessRxStopCapture(capture_id, reply);
-
ring->Enqueue((CGenNode *)msg);
-
- capture_id_t rc = reply.wait_for_reply();
-
- return (rc);
}
+
diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h
index 33f16ce9..87d227f6 100644
--- a/src/stateless/cp/trex_stateless.h
+++ b/src/stateless/cp/trex_stateless.h
@@ -102,7 +102,6 @@ public:
* defines the TRex stateless operation mode
*
*/
-class CaptureFilter;
class TrexStateless {
public:
@@ -133,16 +132,9 @@ public:
/**
- * starts a capture on a 'filter' of ports
- * with a limit of packets
+ * send a message to the RX core
*/
- capture_id_t start_capture(const CaptureFilter &filter, uint64_t limit);
-
- /**
- * stops an active capture
- *
- */
- capture_id_t stop_capture(capture_id_t capture_id);
+ void send_msg_to_rx(TrexStatelessCpToRxMsgBase *msg) const;
/**
* shutdown the server
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index 2cc1b9ca..0ef8ae60 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -140,7 +140,7 @@ public:
}
if (TrexStatelessCaptureMngr::getInstance().is_active(m_port_id)) {
- throw TrexException("unable to disable service - an active capture on port " + std::to_string(m_port_id) + " exists");
+ throw TrexException("unable to disable service mode - an active capture on port " + std::to_string(m_port_id) + " exists");
}
m_port_attr->set_rx_filter_mode(RX_FILTER_MODE_HW);
@@ -439,19 +439,6 @@ public:
void get_pci_info(std::string &pci_addr, int &numa_node);
-
- /**
- * starts capturing packets
- *
- */
- void start_capture(capture_mode_e mode, uint64_t limit);
-
- /**
- * stops capturing packets
- *
- */
- void stop_capture();
-
/**
* start RX queueing of packets
*
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index f441c692..b9bb1d1c 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -262,24 +262,58 @@ bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) {
bool
-TrexStatelessRxStartCapture::handle(CRxCoreStateless *rx_core) {
- capture_id_t capture_id = rx_core->start_capture(m_limit, m_filter);
+TrexStatelessRxCaptureStart::handle(CRxCoreStateless *rx_core) {
+
+ TrexCaptureRCStart start_rc;
+
+ TrexStatelessCaptureMngr::getInstance().start(m_filter, m_limit, start_rc);
+
+ /* mark as done */
+ m_reply.set_reply(start_rc);
+
+ return true;
+}
+
+bool
+TrexStatelessRxCaptureStop::handle(CRxCoreStateless *rx_core) {
+
+ TrexCaptureRCStop stop_rc;
+
+ TrexStatelessCaptureMngr::getInstance().stop(m_capture_id, stop_rc);
+
+ /* mark as done */
+ m_reply.set_reply(stop_rc);
+
+ return true;
+}
+bool
+TrexStatelessRxCaptureFetch::handle(CRxCoreStateless *rx_core) {
+
+ TrexCaptureRCFetch fetch_rc;
+
+ TrexStatelessCaptureMngr::getInstance().fetch(m_capture_id, m_pkt_limit, fetch_rc);
+
/* mark as done */
- m_reply.set_reply(capture_id);
+ m_reply.set_reply(fetch_rc);
return true;
}
bool
-TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) {
- capture_id_t rc = rx_core->stop_capture(m_capture_id);
+TrexStatelessRxCaptureStatus::handle(CRxCoreStateless *rx_core) {
+
+ TrexCaptureRCStatus status_rc;
- m_reply.set_reply(rc);
+ status_rc.set_status(TrexStatelessCaptureMngr::getInstance().to_json());
+
+ /* mark as done */
+ m_reply.set_reply(status_rc);
return true;
}
+
bool
TrexStatelessRxStartQueue::handle(CRxCoreStateless *rx_core) {
rx_core->start_queue(m_port_id, m_size);
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index 5f4978f5..4027d075 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -485,12 +485,16 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase {
};
+class TrexStatelessRxCapture : public TrexStatelessCpToRxMsgBase {
+public:
+ virtual bool handle (CRxCoreStateless *rx_core) = 0;
+};
-class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase {
+class TrexStatelessRxCaptureStart : public TrexStatelessRxCapture {
public:
- TrexStatelessRxStartCapture(const CaptureFilter& filter,
+ TrexStatelessRxCaptureStart(const CaptureFilter& filter,
uint64_t limit,
- MsgReply<capture_id_t> &reply) : m_reply(reply) {
+ MsgReply<TrexCaptureRCStart> &reply) : m_reply(reply) {
m_limit = limit;
m_filter = filter;
@@ -499,24 +503,52 @@ public:
virtual bool handle(CRxCoreStateless *rx_core);
private:
- uint8_t m_port_id;
- uint64_t m_limit;
- CaptureFilter m_filter;
- MsgReply<capture_id_t> &m_reply;
+ uint8_t m_port_id;
+ uint64_t m_limit;
+ CaptureFilter m_filter;
+ MsgReply<TrexCaptureRCStart> &m_reply;
+};
+
+
+class TrexStatelessRxCaptureStop : public TrexStatelessRxCapture {
+public:
+ TrexStatelessRxCaptureStop(capture_id_t capture_id, MsgReply<TrexCaptureRCStop> &reply) : m_reply(reply) {
+ m_capture_id = capture_id;
+ }
+
+ virtual bool handle(CRxCoreStateless *rx_core);
+
+private:
+ capture_id_t m_capture_id;
+ MsgReply<TrexCaptureRCStop> &m_reply;
};
-class TrexStatelessRxStopCapture : public TrexStatelessCpToRxMsgBase {
+class TrexStatelessRxCaptureFetch : public TrexStatelessRxCapture {
public:
- TrexStatelessRxStopCapture(capture_id_t capture_id, MsgReply<capture_id_t> &reply) : m_reply(reply) {
+ TrexStatelessRxCaptureFetch(capture_id_t capture_id, uint32_t pkt_limit, MsgReply<TrexCaptureRCFetch> &reply) : m_reply(reply) {
m_capture_id = capture_id;
+ m_pkt_limit = pkt_limit;
+ }
+
+ virtual bool handle(CRxCoreStateless *rx_core);
+
+private:
+ capture_id_t m_capture_id;
+ uint32_t m_pkt_limit;
+ MsgReply<TrexCaptureRCFetch> &m_reply;
+};
+
+
+class TrexStatelessRxCaptureStatus : public TrexStatelessRxCapture {
+public:
+ TrexStatelessRxCaptureStatus(MsgReply<TrexCaptureRCStatus> &reply) : m_reply(reply) {
}
virtual bool handle(CRxCoreStateless *rx_core);
private:
- capture_id_t m_capture_id;
- MsgReply<capture_id_t> &m_reply;
+ MsgReply<TrexCaptureRCStatus> &m_reply;
};
diff --git a/src/stateless/rx/trex_stateless_capture.cpp b/src/stateless/rx/trex_stateless_capture.cpp
index 4ed126cc..85be7aef 100644
--- a/src/stateless/rx/trex_stateless_capture.cpp
+++ b/src/stateless/rx/trex_stateless_capture.cpp
@@ -25,6 +25,7 @@ TrexStatelessCapture::TrexStatelessCapture(capture_id_t id, uint64_t limit, cons
m_id = id;
m_pkt_buffer = new TrexPktBuffer(limit, TrexPktBuffer::MODE_DROP_TAIL);
m_filter = filter;
+ m_state = STATE_ACTIVE;
}
TrexStatelessCapture::~TrexStatelessCapture() {
@@ -35,9 +36,15 @@ TrexStatelessCapture::~TrexStatelessCapture() {
void
TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) {
+
+ if (m_state != STATE_ACTIVE) {
+ delete pkt;
+ return;
+ }
/* if not in filter - back off */
if (!m_filter.in_filter(pkt)) {
+ delete pkt;
return;
}
@@ -46,6 +53,11 @@ TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) {
void
TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) {
+
+ if (m_state != STATE_ACTIVE) {
+ return;
+ }
+
if (!m_filter.in_rx(port)) {
return;
}
@@ -53,6 +65,56 @@ TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) {
m_pkt_buffer->push(m);
}
+
+Json::Value
+TrexStatelessCapture::to_json() const {
+ Json::Value output = Json::objectValue;
+
+ output["id"] = Json::UInt64(m_id);
+ output["filter"] = m_filter.to_json();
+ output["count"] = m_pkt_buffer->get_element_count();
+ output["bytes"] = m_pkt_buffer->get_bytes();
+ output["limit"] = m_pkt_buffer->get_capacity();
+
+ switch (m_state) {
+ case STATE_ACTIVE:
+ output["state"] = "ACTIVE";
+ break;
+
+ case STATE_STOPPED:
+ output["state"] = "STOPPED";
+ break;
+
+ default:
+ assert(0);
+
+ }
+
+ return output;
+}
+
+TrexPktBuffer *
+TrexStatelessCapture::fetch(uint32_t pkt_limit, uint32_t &pending) {
+
+ /* if the total sum of packets is within the limit range - take it */
+ if (m_pkt_buffer->get_element_count() <= pkt_limit) {
+ TrexPktBuffer *current = m_pkt_buffer;
+ m_pkt_buffer = new TrexPktBuffer(m_pkt_buffer->get_capacity(), m_pkt_buffer->get_mode());
+ pending = 0;
+ return current;
+ }
+
+ /* harder part - partial fetch */
+ TrexPktBuffer *partial = new TrexPktBuffer(pkt_limit);
+ for (int i = 0; i < pkt_limit; i++) {
+ const TrexPkt *pkt = m_pkt_buffer->pop();
+ partial->push(pkt);
+ }
+
+ pending = m_pkt_buffer->get_element_count();
+ return partial;
+}
+
void
TrexStatelessCaptureMngr::update_global_filter() {
CaptureFilter new_filter;
@@ -64,11 +126,25 @@ TrexStatelessCaptureMngr::update_global_filter() {
m_global_filter = new_filter;
}
-capture_id_t
-TrexStatelessCaptureMngr::add(uint64_t limit, const CaptureFilter &filter) {
+TrexStatelessCapture *
+TrexStatelessCaptureMngr::lookup(capture_id_t capture_id) {
+ for (int i = 0; i < m_captures.size(); i++) {
+ if (m_captures[i]->get_id() == capture_id) {
+ return m_captures[i];
+ }
+ }
+
+ /* does not exist */
+ return nullptr;
+}
+
+void
+TrexStatelessCaptureMngr::start(const CaptureFilter &filter, uint64_t limit, TrexCaptureRCStart &rc) {
+
if (m_captures.size() > MAX_CAPTURE_SIZE) {
- return CAPTURE_TOO_MANY_CAPTURES;
+ rc.set_err(TrexCaptureRC::RC_CAPTURE_LIMIT_REACHED);
+ return;
}
@@ -79,15 +155,46 @@ TrexStatelessCaptureMngr::add(uint64_t limit, const CaptureFilter &filter) {
/* update global filter */
update_global_filter();
- return new_id;
+ /* result */
+ rc.set_new_id(new_id);
+}
+
+void
+TrexStatelessCaptureMngr::stop(capture_id_t capture_id, TrexCaptureRCStop &rc) {
+ TrexStatelessCapture *capture = lookup(capture_id);
+ if (!capture) {
+ rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND);
+ return;
+ }
+
+ capture->stop();
+ rc.set_count(capture->get_pkt_count());
}
-capture_id_t
-TrexStatelessCaptureMngr::remove(capture_id_t id) {
+void
+TrexStatelessCaptureMngr::fetch(capture_id_t capture_id, uint32_t pkt_limit, TrexCaptureRCFetch &rc) {
+ TrexStatelessCapture *capture = lookup(capture_id);
+ if (!capture) {
+ rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND);
+ return;
+ }
+ if (capture->is_active()) {
+ rc.set_err(TrexCaptureRC::RC_CAPTURE_FETCH_UNDER_ACTIVE);
+ return;
+ }
+ uint32_t pending = 0;
+ TrexPktBuffer *pkt_buffer = capture->fetch(pkt_limit, pending);
+
+ rc.set_pkt_buffer(pkt_buffer, pending);
+}
+
+void
+TrexStatelessCaptureMngr::remove(capture_id_t capture_id, TrexCaptureRCRemove &rc) {
+
int index = -1;
for (int i = 0; i < m_captures.size(); i++) {
- if (m_captures[i]->get_id() == id) {
+ if (m_captures[i]->get_id() == capture_id) {
index = i;
break;
}
@@ -95,24 +202,26 @@ TrexStatelessCaptureMngr::remove(capture_id_t id) {
/* does not exist */
if (index == -1) {
- return CAPTURE_ID_NOT_FOUND;
+ rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND);
+ return;
}
TrexStatelessCapture *capture = m_captures[index];
m_captures.erase(m_captures.begin() + index);
+ /* free memory */
delete capture;
/* update global filter */
update_global_filter();
-
- return id;
}
void
TrexStatelessCaptureMngr::reset() {
+ TrexCaptureRCRemove dummy;
+
while (m_captures.size() > 0) {
- remove(m_captures[0]->get_id());
+ remove(m_captures[0]->get_id(), dummy);
}
}
@@ -130,3 +239,14 @@ TrexStatelessCaptureMngr::handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port)
}
}
+Json::Value
+TrexStatelessCaptureMngr::to_json() const {
+ Json::Value lst = Json::arrayValue;
+
+ for (TrexStatelessCapture *capture : m_captures) {
+ lst.append(capture->to_json());
+ }
+
+ return lst;
+}
+
diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h
index 4d0b6a78..6cd25a94 100644
--- a/src/stateless/rx/trex_stateless_capture.h
+++ b/src/stateless/rx/trex_stateless_capture.h
@@ -22,7 +22,146 @@ limitations under the License.
#define __TREX_STATELESS_CAPTURE_H__
#include <stdint.h>
+#include <assert.h>
+
#include "trex_stateless_pkt.h"
+#include "trex_stateless_capture_msg.h"
+
+typedef int64_t capture_id_t;
+
+class TrexCaptureRC {
+public:
+
+ TrexCaptureRC() {
+ m_rc = RC_INVALID;
+ m_pkt_buffer = NULL;
+ }
+
+ enum rc_e {
+ RC_INVALID = 0,
+ RC_OK = 1,
+ RC_CAPTURE_NOT_FOUND,
+ RC_CAPTURE_LIMIT_REACHED,
+ RC_CAPTURE_FETCH_UNDER_ACTIVE
+ };
+
+ bool operator !() const {
+ return (m_rc != RC_OK);
+ }
+
+ std::string get_err() const {
+ assert(m_rc != RC_INVALID);
+
+ switch (m_rc) {
+ case RC_OK:
+ return "";
+ case RC_CAPTURE_LIMIT_REACHED:
+ return "capture limit has reached";
+ case RC_CAPTURE_NOT_FOUND:
+ return "capture ID not found";
+ case RC_CAPTURE_FETCH_UNDER_ACTIVE:
+ return "fetch command cannot be executed on an active capture";
+ default:
+ assert(0);
+ }
+ }
+
+ void set_err(rc_e rc) {
+ m_rc = rc;
+ }
+
+ Json::Value get_json() const {
+ return m_json_rc;
+ }
+
+public:
+ rc_e m_rc;
+ capture_id_t m_capture_id;
+ TrexPktBuffer *m_pkt_buffer;
+ Json::Value m_json_rc;
+};
+
+class TrexCaptureRCStart : public TrexCaptureRC {
+public:
+
+ void set_new_id(capture_id_t new_id) {
+ m_capture_id = new_id;
+ m_rc = RC_OK;
+ }
+
+ capture_id_t get_new_id() const {
+ return m_capture_id;
+ }
+
+private:
+ capture_id_t m_capture_id;
+};
+
+
+class TrexCaptureRCStop : public TrexCaptureRC {
+public:
+ void set_count(uint32_t pkt_count) {
+ m_pkt_count = pkt_count;
+ m_rc = RC_OK;
+ }
+
+ uint32_t get_pkt_count() const {
+ return m_pkt_count;
+ }
+
+private:
+ uint32_t m_pkt_count;
+};
+
+class TrexCaptureRCFetch : public TrexCaptureRC {
+public:
+
+ TrexCaptureRCFetch() {
+ m_pkt_buffer = nullptr;
+ m_pending = 0;
+ }
+
+ void set_pkt_buffer(const TrexPktBuffer *pkt_buffer, uint32_t pending) {
+ m_pkt_buffer = pkt_buffer;
+ m_pending = pending;
+ m_rc = RC_OK;
+ }
+
+ const TrexPktBuffer *get_pkt_buffer() const {
+ return m_pkt_buffer;
+ }
+
+ uint32_t get_pending() const {
+ return m_pending;
+ }
+
+private:
+ const TrexPktBuffer *m_pkt_buffer;
+ uint32_t m_pending;
+};
+
+class TrexCaptureRCRemove : public TrexCaptureRC {
+public:
+ void set_ok() {
+ m_rc = RC_OK;
+ }
+};
+
+class TrexCaptureRCStatus : public TrexCaptureRC {
+public:
+
+ void set_status(const Json::Value &json) {
+ m_json = json;
+ m_rc = RC_OK;
+ }
+
+ const Json::Value & get_status() const {
+ return m_json;
+ }
+
+private:
+ Json::Value m_json;
+};
/**
* capture filter
@@ -82,20 +221,27 @@ public:
return *this;
}
+ Json::Value to_json() const {
+ Json::Value output = Json::objectValue;
+ output["tx"] = Json::UInt64(m_tx_active);
+ output["rx"] = Json::UInt64(m_rx_active);
+
+ return output;
+ }
+
private:
uint64_t m_tx_active;
uint64_t m_rx_active;
};
-typedef int64_t capture_id_t;
-enum {
- CAPTURE_ID_NOT_FOUND = -1,
- CAPTURE_TOO_MANY_CAPTURES = -2,
-};
class TrexStatelessCapture {
public:
+ enum state_e {
+ STATE_ACTIVE,
+ STATE_STOPPED,
+ };
TrexStatelessCapture(capture_id_t id, uint64_t limit, const CaptureFilter &filter);
@@ -112,7 +258,24 @@ public:
return m_filter;
}
+ Json::Value to_json() const;
+
+ void stop() {
+ m_state = STATE_STOPPED;
+ }
+
+ TrexPktBuffer * fetch(uint32_t pkt_limit, uint32_t &pending);
+
+ bool is_active() const {
+ return m_state == STATE_ACTIVE;
+ }
+
+ uint32_t get_pkt_count() const {
+ return m_pkt_buffer->get_element_count();
+ }
+
private:
+ state_e m_state;
TrexPktBuffer *m_pkt_buffer;
CaptureFilter m_filter;
uint64_t m_id;
@@ -134,18 +297,28 @@ public:
}
/**
- * adds a capture buffer
- * returns ID
+ * starts a new capture
*/
- capture_id_t add(uint64_t limit, const CaptureFilter &filter);
+ void start(const CaptureFilter &filter, uint64_t limit, TrexCaptureRCStart &rc);
-
/**
- * stops capture mode
- * on success, will return the ID of the removed one
- * o.w it will be an error
+ * stops an existing capture
+ *
*/
- capture_id_t remove(capture_id_t id);
+ void stop(capture_id_t capture_id, TrexCaptureRCStop &rc);
+
+ /**
+ * fetch packets from an existing capture
+ *
+ */
+ void fetch(capture_id_t capture_id, uint32_t pkt_limit, TrexCaptureRCFetch &rc);
+
+ /**
+ * removes an existing capture
+ * all packets captured will be detroyed
+ */
+ void remove(capture_id_t capture_id, TrexCaptureRCRemove &rc);
+
/**
* removes all captures
@@ -153,6 +326,7 @@ public:
*/
void reset();
+
/**
* return true if any filter is active
*
@@ -182,6 +356,8 @@ public:
handle_pkt_rx_slow_path(m, port);
}
+ Json::Value to_json() const;
+
private:
TrexStatelessCaptureMngr() {
@@ -189,6 +365,9 @@ private:
m_id_counter = 1;
}
+
+ TrexStatelessCapture * lookup(capture_id_t capture_id);
+
void handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port);
void update_global_filter();
diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp
index f1ba303a..00c18082 100644
--- a/src/stateless/rx/trex_stateless_rx_core.cpp
+++ b/src/stateless/rx/trex_stateless_rx_core.cpp
@@ -270,10 +270,6 @@ void CRxCoreStateless::start() {
m_monitor.disable();
}
-void CRxCoreStateless::capture_pkt(rte_mbuf_t *m) {
-
-}
-
int CRxCoreStateless::process_all_pending_pkts(bool flush_rx) {
int total_pkts = 0;
@@ -332,16 +328,6 @@ double CRxCoreStateless::get_cpu_util() {
}
-capture_id_t
-CRxCoreStateless::start_capture(uint64_t limit, const CaptureFilter &filter) {
- return TrexStatelessCaptureMngr::getInstance().add(limit, filter);
-}
-
-capture_id_t
-CRxCoreStateless::stop_capture(capture_id_t capture_id) {
- return TrexStatelessCaptureMngr::getInstance().remove(capture_id);
-}
-
void
CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size) {
m_rx_port_mngr[port_id].start_queue(size);
diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h
index 21ed51ba..954a5f04 100644
--- a/src/stateless/rx/trex_stateless_rx_core.h
+++ b/src/stateless/rx/trex_stateless_rx_core.h
@@ -131,14 +131,7 @@ class CRxCoreStateless {
const TrexPktBuffer *get_rx_queue_pkts(uint8_t port_id) {
return m_rx_port_mngr[port_id].get_pkt_buffer();
}
-
- /**
- * start capturing packets
- *
- */
- capture_id_t start_capture(uint64_t limit, const CaptureFilter &filter);
- capture_id_t stop_capture(capture_id_t capture_id);
-
+
/**
* start RX queueing of packets
*
@@ -175,7 +168,6 @@ class CRxCoreStateless {
void recalculate_next_state();
bool are_any_features_active();
- void capture_pkt(rte_mbuf_t *m);
void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r);
void handle_work_stage();
void handle_grat_arp();