From ea10422c22479c8e498d8efb5cb19882e70db9ff Mon Sep 17 00:00:00 2001 From: imarom Date: Wed, 4 Jan 2017 09:38:19 +0200 Subject: added new files (capture) Signed-off-by: imarom --- src/stateless/rx/trex_stateless_capture.cpp | 112 +++++++++++++++++ src/stateless/rx/trex_stateless_capture.h | 181 ++++++++++++++++++++++++++++ 2 files changed, 293 insertions(+) create mode 100644 src/stateless/rx/trex_stateless_capture.cpp create mode 100644 src/stateless/rx/trex_stateless_capture.h (limited to 'src') diff --git a/src/stateless/rx/trex_stateless_capture.cpp b/src/stateless/rx/trex_stateless_capture.cpp new file mode 100644 index 00000000..83bb2d38 --- /dev/null +++ b/src/stateless/rx/trex_stateless_capture.cpp @@ -0,0 +1,112 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2016 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#include "trex_stateless_capture.h" +#include "trex_exception.h" + +TrexStatelessCapture::TrexStatelessCapture(capture_id_t id, uint64_t limit, const CaptureFilter &filter) { + m_id = id; + m_pkt_buffer = new TrexPktBuffer(limit, TrexPktBuffer::MODE_DROP_TAIL); + m_filter = filter; +} + +TrexStatelessCapture::~TrexStatelessCapture() { + if (m_pkt_buffer) { + delete m_pkt_buffer; + } +} + +void +TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) { + + /* if not in filter - back off */ + if (!m_filter.in_filter(pkt)) { + return; + } + + m_pkt_buffer->push(pkt); +} + +void +TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) { + if (!m_filter.in_rx(port)) { + return; + } + + m_pkt_buffer->push(m); +} + +capture_id_t +TrexStatelessCaptureMngr::add(uint64_t limit, const CaptureFilter &filter) { + + if (m_captures.size() > MAX_CAPTURE_SIZE) { + throw TrexException(TrexException::T_CAPTURE_MAX_INSTANCES); + } + + + int new_id = m_id_counter++; + TrexStatelessCapture *new_buffer = new TrexStatelessCapture(new_id, limit, filter); + m_captures.push_back(new_buffer); + + return new_id; +} + +void +TrexStatelessCaptureMngr::remove(capture_id_t id) { + + int index = -1; + for (int i = 0; i < m_captures.size(); i++) { + if (m_captures[i]->get_id() == id) { + index = i; + break; + } + } + + /* does not exist */ + if (index == -1) { + throw TrexException(TrexException::T_CAPTURE_INVALID_ID); + } + + TrexStatelessCapture *capture = m_captures[index]; + m_captures.erase(m_captures.begin() + index); + delete capture; +} + +void +TrexStatelessCaptureMngr::reset() { + while (m_captures.size() > 0) { + remove(m_captures[0]->get_id()); + } +} + +void +TrexStatelessCaptureMngr::handle_pkt_tx(const TrexPkt *pkt) { + for (TrexStatelessCapture *capture : m_captures) { + capture->handle_pkt_tx(pkt); + } +} + +void +TrexStatelessCaptureMngr::handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port) { + for (TrexStatelessCapture *capture : m_captures) { + capture->handle_pkt_rx(m, port); + } +} + diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h new file mode 100644 index 00000000..f7cd451f --- /dev/null +++ b/src/stateless/rx/trex_stateless_capture.h @@ -0,0 +1,181 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2016 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_STATELESS_CAPTURE_H__ +#define __TREX_STATELESS_CAPTURE_H__ + +#include +#include "trex_stateless_pkt.h" + +/** + * capture filter + * specify which ports to capture and if TX/RX or both + */ +class CaptureFilter { +public: + CaptureFilter() { + tx_active = 0; + rx_active = 0; + } + + void add_tx(uint8_t port_id) { + tx_active |= (1LL << port_id); + } + + void add_rx(uint8_t port_id) { + rx_active |= (1LL << port_id); + } + + void add(uint8_t port_id) { + add_tx(port_id); + add_rx(port_id); + } + + bool in_filter(const TrexPkt *pkt) { + switch (pkt->get_origin()) { + case TrexPkt::ORIGIN_TX: + return in_tx(pkt->get_port()); + + case TrexPkt::ORIGIN_RX: + return in_rx(pkt->get_port()); + + default: + return false; + } + } + + bool in_rx(uint8_t port_id) const { + uint64_t bit = (1LL << port_id); + return ((rx_active & bit) == bit); + } + + bool in_tx(uint8_t port_id) const { + uint64_t bit = (1LL << port_id); + return ((tx_active & bit) == bit); + } + +private: + + uint64_t tx_active; + uint64_t rx_active; +}; + +typedef uint64_t capture_id_t; + +class TrexStatelessCapture { +public: + + TrexStatelessCapture(capture_id_t id, uint64_t limit, const CaptureFilter &filter); + + void handle_pkt_tx(const TrexPkt *pkt); + void handle_pkt_rx(const rte_mbuf_t *m, int port); + + ~TrexStatelessCapture(); + + uint64_t get_id() const { + return m_id; + } + +private: + TrexPktBuffer *m_pkt_buffer; + CaptureFilter m_filter; + uint64_t m_id; +}; + +class TrexStatelessCaptureMngr { + +public: + + static TrexStatelessCaptureMngr& getInstance() { + static TrexStatelessCaptureMngr instance; + + return instance; + } + + + ~TrexStatelessCaptureMngr() { + reset(); + } + + /** + * adds a capture buffer + * returns ID + */ + capture_id_t add(uint64_t limit, const CaptureFilter &filter); + + + /** + * stops capture mode + */ + void remove(capture_id_t id); + + /** + * removes all captures + * + */ + void reset(); + + /** + * return true if any filter is active + * + * @author imarom (1/3/2017) + * + * @return bool + */ + bool is_active() const { + return (m_captures.size() != 0); + } + + /** + * handle packet from TX + */ + void handle_pkt_tx(const TrexPkt *pkt); + + /** + * handle packet from RX + */ + void handle_pkt_rx(const rte_mbuf_t *m, int port) { + /* fast path */ + if (!is_active()) { + return; + } + + /* slow path */ + handle_pkt_rx_slow_path(m, port); + } + +private: + + TrexStatelessCaptureMngr() { + /* init this to 1 */ + m_id_counter = 1; + } + + void handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port); + + std::vector m_captures; + + capture_id_t m_id_counter; + + static const int MAX_CAPTURE_SIZE = 10; +}; + +#endif /* __TREX_STATELESS_CAPTURE_H__ */ + -- cgit From 5257dbb8253fe5b70b75f9c064c4593ca7aee99f Mon Sep 17 00:00:00 2001 From: imarom Date: Wed, 4 Jan 2017 18:46:45 +0200 Subject: draft - unreviewed Signed-off-by: imarom --- src/main_dpdk.cpp | 5 +- src/rpc-server/commands/trex_rpc_cmd_general.cpp | 143 +++++++------ src/rpc-server/commands/trex_rpc_cmds.h | 15 +- src/rpc-server/trex_rpc_cmds_table.cpp | 3 + src/stateless/cp/trex_stateless.cpp | 83 +++----- src/stateless/cp/trex_stateless.h | 30 +-- src/stateless/cp/trex_stateless_port.cpp | 28 +-- src/stateless/cp/trex_stateless_port.h | 95 ++++++++- .../messaging/trex_stateless_messaging.cpp | 12 +- src/stateless/messaging/trex_stateless_messaging.h | 32 +-- src/stateless/rx/trex_stateless_capture.cpp | 28 ++- src/stateless/rx/trex_stateless_capture.h | 53 +++-- src/stateless/rx/trex_stateless_rx_core.cpp | 65 +++--- src/stateless/rx/trex_stateless_rx_core.h | 17 +- src/stateless/rx/trex_stateless_rx_defs.h | 2 + src/stateless/rx/trex_stateless_rx_port_mngr.cpp | 227 +-------------------- src/stateless/rx/trex_stateless_rx_port_mngr.h | 149 +------------- 17 files changed, 383 insertions(+), 604 deletions(-) (limited to 'src') diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index 9cc0e612..36fe1804 100644 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -2169,6 +2169,8 @@ int CCoreEthIF::send_burst(CCorePerPort * lp_port, uint16_t len, CVirtualIFPerSideStats * lp_stats){ + //assert(m_ring_to_rx->Enqueue((CGenNode *)0x0) == 0); + uint16_t ret = lp_port->m_port->tx_burst(lp_port->m_tx_queue_id,lp_port->m_table,len); #ifdef DELAY_IF_NEEDED while ( unlikely( retget_num_crc_fix_bytes(); - + if ( get_vm_one_queue_enable() ) { /* vm mode, indirect queues */ for (i=0; i < m_max_ports; i++) { diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index d4854a79..ec5c3158 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -28,6 +28,7 @@ limitations under the License. #include #include "trex_stateless_rx_core.h" +#include "trex_stateless_capture.h" #include #include @@ -339,24 +340,6 @@ TrexRpcCmdGetSysInfo::_run(const Json::Value ¶ms, Json::Value &result) { return (TREX_RPC_CMD_OK); } - -int -TrexRpcCmdSetPortAttr::parse_rx_filter_mode(const Json::Value &msg, uint8_t port_id, Json::Value &result) { - const std::string type = parse_choice(msg, "mode", {"hw", "all"}, result); - - rx_filter_mode_e filter_mode; - if (type == "hw") { - filter_mode = RX_FILTER_MODE_HW; - } else if (type == "all") { - filter_mode = RX_FILTER_MODE_ALL; - } else { - /* can't happen - parsed choice */ - assert(0); - } - - return get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_rx_filter_mode(filter_mode); -} - /** * set port commands * @@ -399,11 +382,6 @@ TrexRpcCmdSetPortAttr::_run(const Json::Value ¶ms, Json::Value &result) { ret = get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->set_flow_ctrl(mode); } - else if (name == "rx_filter_mode") { - const Json::Value &rx = parse_object(attr, name, result); - ret = parse_rx_filter_mode(rx, port_id, result); - } - /* unknown attribute */ else { generate_execute_err(result, "unknown attribute type: '" + name + "'"); @@ -588,7 +566,8 @@ TrexRpcCmdGetPortStatus::_run(const Json::Value ¶ms, Json::Value &result) { result["result"]["owner"] = (port->get_owner().is_free() ? "" : port->get_owner().get_name()); result["result"]["state"] = port->get_state_as_string(); result["result"]["max_stream_id"] = port->get_max_stream_id(); - + result["result"]["service"] = port->is_service_mode_on(); + /* attributes */ get_stateless_obj()->get_platform_api()->getPortAttrObj(port_id)->to_json(result["result"]["attr"]); @@ -660,6 +639,27 @@ TrexRpcCmdPushRemote::_run(const Json::Value ¶ms, Json::Value &result) { } + +/** + * set service mode on/off + * + */ +trex_rpc_cmd_rc_e +TrexRpcCmdSetServiceMode::_run(const Json::Value ¶ms, Json::Value &result) { + uint8_t port_id = parse_port(params, result); + bool enabled = parse_bool(params, "enabled", result); + + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); + try { + port->set_service_mode(enabled); + } catch (TrexException &ex) { + generate_execute_err(result, ex.what()); + } + + result["result"] = Json::objectValue; + return (TREX_RPC_CMD_OK); +} + /** * set on/off RX software receive mode * @@ -670,12 +670,14 @@ TrexRpcCmdSetRxFeature::_run(const Json::Value ¶ms, Json::Value &result) { uint8_t port_id = parse_port(params, result); TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); + if (!port->is_service_mode_on()) { + generate_execute_err(result, "rx_feature - available only under service mode"); + } + /* decide which feature is being set */ - const std::string type = parse_choice(params, "type", {"capture", "queue", "server"}, result); + const std::string type = parse_choice(params, "type", {"queue", "server"}, result); - if (type == "capture") { - parse_capture_msg(params, port, result); - } else if (type == "queue") { + if (type == "queue") { parse_queue_msg(params, port, result); } else if (type == "server") { parse_server_msg(params, port, result); @@ -688,38 +690,6 @@ TrexRpcCmdSetRxFeature::_run(const Json::Value ¶ms, Json::Value &result) { } -void -TrexRpcCmdSetRxFeature::parse_capture_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result) { - - bool enabled = parse_bool(msg, "enabled", result); - - if (enabled) { - - std::string pcap_filename = parse_string(msg, "pcap_filename", result); - uint64_t limit = parse_uint32(msg, "limit", result); - - if (limit == 0) { - generate_parse_err(result, "limit cannot be zero"); - } - - try { - port->start_rx_capture(pcap_filename, limit); - } catch (const TrexException &ex) { - generate_execute_err(result, ex.what()); - } - - } else { - - try { - port->stop_rx_capture(); - } catch (const TrexException &ex) { - generate_execute_err(result, ex.what()); - } - - } - -} - void TrexRpcCmdSetRxFeature::parse_queue_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result) { bool enabled = parse_bool(msg, "enabled", result); @@ -762,8 +732,13 @@ TrexRpcCmdGetRxQueuePkts::_run(const Json::Value ¶ms, Json::Value &result) { TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); + if (!port->is_service_mode_on()) { + generate_execute_err(result, "get_rx_queue_pkts - available only under service mode"); + } + + try { - const RXPacketBuffer *pkt_buffer = port->get_rx_queue_pkts(); + const TrexPktBuffer *pkt_buffer = port->get_rx_queue_pkts(); if (pkt_buffer) { result["result"]["pkts"] = pkt_buffer->to_json(); delete pkt_buffer; @@ -806,6 +781,7 @@ TrexRpcCmdSetL2::_run(const Json::Value ¶ms, Json::Value &result) { generate_execute_err(result, ex.what()); } + result["result"] = Json::objectValue; return (TREX_RPC_CMD_OK); } @@ -863,7 +839,50 @@ TrexRpcCmdSetL3::_run(const Json::Value ¶ms, Json::Value &result) { } } - + + result["result"] = Json::objectValue; return (TREX_RPC_CMD_OK); } + +/** + * starts PCAP capturing + * + */ +trex_rpc_cmd_rc_e +TrexRpcCmdStartCapture::_run(const Json::Value ¶ms, Json::Value &result) { + + uint32_t limit = parse_uint32(params, "limit", result); + const Json::Value &tx_json = parse_array(params, "tx", result); + const Json::Value &rx_json = parse_array(params, "rx", result); + + CaptureFilter filter; + + std::set ports; + + /* populate the filter */ + for (int i = 0; i < tx_json.size(); i++) { + uint8_t tx_port = parse_byte(tx_json, i, result); + filter.add_tx(tx_port); + ports.insert(tx_port); + } + + for (int i = 0; i < rx_json.size(); i++) { + uint8_t rx_port = parse_byte(rx_json, i, result); + filter.add_rx(rx_port); + ports.insert(rx_port); + } + + /* check that all ports are under service mode */ + for (uint8_t port_id : ports) { + TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); + if (!port->is_service_mode_on()) { + generate_parse_err(result, "start_capture is available only under service mode"); + } + } + + get_stateless_obj()->start_capture(filter, limit); + + result["result"] = Json::objectValue; + return (TREX_RPC_CMD_OK); +} diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h index 6639be7b..1ea63cc7 100644 --- a/src/rpc-server/commands/trex_rpc_cmds.h +++ b/src/rpc-server/commands/trex_rpc_cmds.h @@ -94,8 +94,6 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortXStatsValues, "get_port_xstats_values", 1, TREX_RPC_CMD_DEFINE(TrexRpcCmdGetPortXStatsNames, "get_port_xstats_names", 1, false, APIClass::API_CLASS_TYPE_CORE); TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdSetPortAttr, "set_port_attr", 2, true, APIClass::API_CLASS_TYPE_CORE, - - int parse_rx_filter_mode(const Json::Value &msg, uint8_t port_id, Json::Value &result); ); @@ -150,16 +148,19 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdPushRemote, "push_remote", 6, true, APIClass::API_ TREX_RPC_CMD_DEFINE(TrexRpcCmdShutdown, "shutdown", 2, false, APIClass::API_CLASS_TYPE_CORE); -TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdSetRxFeature, "set_rx_feature", 3, false, APIClass::API_CLASS_TYPE_CORE, - void parse_capture_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result); +TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdSetRxFeature, "set_rx_feature", 3, true, APIClass::API_CLASS_TYPE_CORE, void parse_queue_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result); void parse_server_msg(const Json::Value &msg, TrexStatelessPort *port, Json::Value &result); ); -TREX_RPC_CMD_DEFINE(TrexRpcCmdSetL2, "set_l2", 2, false, APIClass::API_CLASS_TYPE_CORE); -TREX_RPC_CMD_DEFINE(TrexRpcCmdSetL3, "set_l3", 3, false, APIClass::API_CLASS_TYPE_CORE); -TREX_RPC_CMD_DEFINE(TrexRpcCmdGetRxQueuePkts, "get_rx_queue_pkts", 2, false, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE(TrexRpcCmdSetL2, "set_l2", 2, true, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE(TrexRpcCmdSetL3, "set_l3", 3, true, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE(TrexRpcCmdGetRxQueuePkts, "get_rx_queue_pkts", 1, true, APIClass::API_CLASS_TYPE_CORE); + +TREX_RPC_CMD_DEFINE(TrexRpcCmdSetServiceMode, "service", 2, true, APIClass::API_CLASS_TYPE_CORE); + +TREX_RPC_CMD_DEFINE(TrexRpcCmdStartCapture, "start_capture", 3, false, APIClass::API_CLASS_TYPE_CORE); #endif /* __TREX_RPC_CMD_H__ */ diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp index 94a3e1b9..3d4d5a23 100644 --- a/src/rpc-server/trex_rpc_cmds_table.cpp +++ b/src/rpc-server/trex_rpc_cmds_table.cpp @@ -75,8 +75,11 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() { register_command(new TrexRpcCmdSetRxFeature()); register_command(new TrexRpcCmdGetRxQueuePkts()); + register_command(new TrexRpcCmdSetServiceMode()); register_command(new TrexRpcCmdSetL2()); register_command(new TrexRpcCmdSetL3()); + + register_command(new TrexRpcCmdStartCapture()); } diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index c31ba0a5..32babbf7 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -18,13 +18,15 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include -#include -#include +//#include #include #include +#include "trex_stateless.h" +#include "trex_stateless_port.h" +#include "trex_stateless_messaging.h" + using namespace std; /*********************************************************** @@ -140,54 +142,35 @@ TrexStateless::get_dp_core_count() { return m_platform_api->get_dp_core_count(); } -void -TrexStateless::encode_stats(Json::Value &global) { - - TrexPlatformGlobalStats stats; - m_platform_api->get_global_stats(stats); - - global["cpu_util"] = stats.m_stats.m_cpu_util; - global["rx_cpu_util"] = stats.m_stats.m_rx_cpu_util; - - global["tx_bps"] = stats.m_stats.m_tx_bps; - global["rx_bps"] = stats.m_stats.m_rx_bps; - - global["tx_pps"] = stats.m_stats.m_tx_pps; - global["rx_pps"] = stats.m_stats.m_rx_pps; - - global["total_tx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_tx_pkts); - global["total_rx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_rx_pkts); - - global["total_tx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_tx_bytes); - global["total_rx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_rx_bytes); - - global["tx_rx_errors"] = Json::Value::UInt64(stats.m_stats.m_tx_rx_errors); - - for (uint8_t i = 0; i < m_port_count; i++) { - std::stringstream ss; - - ss << "port " << i; - Json::Value &port_section = global[ss.str()]; - - m_ports[i]->encode_stats(port_section); - } +capture_id_t +TrexStateless::start_capture(const CaptureFilter &filter, uint64_t limit) { + static MsgReply reply; + + reply.reset(); + + CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0); + TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(filter, limit, reply); + + ring->Enqueue((CGenNode *)msg); + + capture_id_t new_id = reply.wait_for_reply(); + + return (new_id); } -/** - * generate a snapshot for publish (async publish) - * - */ -void -TrexStateless::generate_publish_snapshot(std::string &snapshot) { - Json::FastWriter writer; - Json::Value root; - - root["name"] = "trex-stateless-info"; - root["type"] = 0; - - /* stateless specific info goes here */ - root["data"] = Json::nullValue; - - snapshot = writer.write(root); +capture_id_t +TrexStateless::stop_capture(capture_id_t capture_id) { + static MsgReply reply; + + reply.reset(); + + CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0); + TrexStatelessRxStopCapture *msg = new TrexStatelessRxStopCapture(capture_id, reply); + + ring->Enqueue((CGenNode *)msg); + + capture_id_t rc = reply.wait_for_reply(); + + return (rc); } diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h index 3a1a2c24..33f16ce9 100644 --- a/src/stateless/cp/trex_stateless.h +++ b/src/stateless/cp/trex_stateless.h @@ -102,6 +102,7 @@ public: * defines the TRex stateless operation mode * */ +class CaptureFilter; class TrexStateless { public: @@ -132,32 +133,21 @@ public: /** - * shutdown the server - */ - void shutdown(); - - /** - * fetch xstats names (keys of dict) - * - */ - void encode_xstats_names(Json::Value &global); - - /** - * fetch xstats values - * + * starts a capture on a 'filter' of ports + * with a limit of packets */ - void encode_xstats_values(Json::Value &global); - + capture_id_t start_capture(const CaptureFilter &filter, uint64_t limit); + /** - * fetch all the stats + * stops an active capture * */ - void encode_stats(Json::Value &global); - + capture_id_t stop_capture(capture_id_t capture_id); + /** - * generate a snapshot for publish + * shutdown the server */ - void generate_publish_snapshot(std::string &snapshot); + void shutdown(); const TrexPlatformApi * get_platform_api() { return (m_platform_api); diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index 7d331c6e..9cf048b0 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -162,7 +162,7 @@ private: * trex stateless port * **************************/ -TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this) { +TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this), m_service_mode(port_id, api) { std::vector> core_pair_list; m_port_id = port_id; @@ -948,24 +948,6 @@ TrexStatelessPort::remove_and_delete_all_streams() { } } -void -TrexStatelessPort::start_rx_capture(const std::string &pcap_filename, uint64_t limit) { - static MsgReply reply; - - reply.reset(); - - TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(m_port_id, pcap_filename, limit, reply); - send_message_to_rx((TrexStatelessCpToRxMsgBase *)msg); - - /* as below, must wait for ACK from RX core before returning ACK */ - reply.wait_for_reply(); -} - -void -TrexStatelessPort::stop_rx_capture() { - TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopCapture(m_port_id); - send_message_to_rx(msg); -} void TrexStatelessPort::start_rx_queue(uint64_t size) { @@ -980,18 +962,22 @@ TrexStatelessPort::start_rx_queue(uint64_t size) { this might cause the user to lose some packets from the queue */ reply.wait_for_reply(); + + m_service_mode.set_rx_queue(); } void TrexStatelessPort::stop_rx_queue() { TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopQueue(m_port_id); send_message_to_rx(msg); + + m_service_mode.unset_rx_queue(); } -const RXPacketBuffer * +const TrexPktBuffer * TrexStatelessPort::get_rx_queue_pkts() { - static MsgReply reply; + static MsgReply reply; reply.reset(); diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index d4ac4018..2cc1b9ca 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -26,12 +26,14 @@ limitations under the License. #include "trex_dp_port_events.h" #include "trex_stateless_rx_defs.h" #include "trex_stream.h" +#include "trex_exception.h" +#include "trex_stateless_capture.h" class TrexStatelessCpToDpMsgBase; class TrexStatelessCpToRxMsgBase; class TrexStreamsGraphObj; class TrexPortMultiplier; -class RXPacketBuffer; +class TrexPktBuffer; /** @@ -113,6 +115,56 @@ private: static const std::string g_unowned_handler; }; +/** + * enforces in/out from service mode + * + * @author imarom (1/4/2017) + */ +class TrexServiceMode { +public: + TrexServiceMode(uint8_t port_id, const TrexPlatformApi *api) { + m_is_enabled = false; + m_has_rx_queue = false; + m_port_id = port_id; + m_port_attr = api->getPortAttrObj(port_id); + } + + void enable() { + m_port_attr->set_rx_filter_mode(RX_FILTER_MODE_ALL); + m_is_enabled = true; + } + + void disable() { + if (m_has_rx_queue) { + throw TrexException("unable to disable service mode - please remove RX queue"); + } + + if (TrexStatelessCaptureMngr::getInstance().is_active(m_port_id)) { + throw TrexException("unable to disable service - an active capture on port " + std::to_string(m_port_id) + " exists"); + } + + m_port_attr->set_rx_filter_mode(RX_FILTER_MODE_HW); + m_is_enabled = false; + } + + bool is_enabled() const { + return m_is_enabled; + } + + void set_rx_queue() { + m_has_rx_queue = true; + } + + void unset_rx_queue() { + m_has_rx_queue = false; + } + +private: + bool m_is_enabled; + bool m_has_rx_queue; + TRexPortAttr *m_port_attr; + uint8_t m_port_id; +}; class AsyncStopEvent; @@ -150,7 +202,15 @@ public: RC_ERR_FAILED_TO_COMPILE_STREAMS }; - + /** + * port capture mode + */ + enum capture_mode_e { + PORT_CAPTURE_NONE = 0, + PORT_CAPTURE_RX, + PORT_CAPTURE_ALL + }; + TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api); ~TrexStatelessPort(); @@ -227,6 +287,20 @@ public: double duration, bool is_dual); + /** + * moves port to / out service mode + */ + void set_service_mode(bool enabled) { + if (enabled) { + m_service_mode.enable(); + } else { + m_service_mode.disable(); + } + } + bool is_service_mode_on() const { + return m_service_mode.is_enabled(); + } + /** * get the port state * @@ -367,16 +441,16 @@ public: /** - * enable RX capture on port + * starts capturing packets * */ - void start_rx_capture(const std::string &pcap_filename, uint64_t limit); + void start_capture(capture_mode_e mode, uint64_t limit); /** - * disable RX capture if on + * stops capturing packets * */ - void stop_rx_capture(); + void stop_capture(); /** * start RX queueing of packets @@ -398,7 +472,7 @@ public: * fetch the RX queue packets from the queue * */ - const RXPacketBuffer *get_rx_queue_pkts(); + const TrexPktBuffer *get_rx_queue_pkts(); /** * configures port for L2 mode @@ -429,7 +503,9 @@ public: } private: - + void set_service_mode_on(); + void set_service_mode_off(); + bool is_core_active(int core_id); const std::vector get_core_id_list () { @@ -514,6 +590,9 @@ private: TrexPortOwner m_owner; int m_pending_async_stop_event; + + TrexServiceMode m_service_mode; + static const uint32_t MAX_STREAMS = 20000; }; diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index aeb1e677..f441c692 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -263,18 +263,20 @@ bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) { bool TrexStatelessRxStartCapture::handle(CRxCoreStateless *rx_core) { - rx_core->start_recorder(m_port_id, m_pcap_filename, m_limit); + capture_id_t capture_id = rx_core->start_capture(m_limit, m_filter); /* mark as done */ - m_reply.set_reply(true); + m_reply.set_reply(capture_id); return true; } bool TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) { - rx_core->stop_recorder(m_port_id); - + capture_id_t rc = rx_core->stop_capture(m_capture_id); + + m_reply.set_reply(rc); + return true; } @@ -299,7 +301,7 @@ TrexStatelessRxStopQueue::handle(CRxCoreStateless *rx_core) { bool TrexStatelessRxQueueGetPkts::handle(CRxCoreStateless *rx_core) { - const RXPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id); + const TrexPktBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id); /* set the reply */ m_reply.set_reply(pkt_buffer); diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index 72b92d11..5f4978f5 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -28,12 +28,13 @@ limitations under the License. #include "trex_stateless_rx_defs.h" #include "os_time.h" #include "utl_ip.h" +#include "trex_stateless_capture.h" class TrexStatelessDpCore; class CRxCoreStateless; class TrexStreamsCompiledObj; class CFlowGenListPerThread; -class RXPacketBuffer; +class TrexPktBuffer; /** * Generic message reply object @@ -487,36 +488,35 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase { class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase { public: - TrexStatelessRxStartCapture(uint8_t port_id, - const std::string &pcap_filename, + TrexStatelessRxStartCapture(const CaptureFilter& filter, uint64_t limit, - MsgReply &reply) : m_reply(reply) { + MsgReply &reply) : m_reply(reply) { - m_port_id = port_id; - m_limit = limit; - m_pcap_filename = pcap_filename; + m_limit = limit; + m_filter = filter; } virtual bool handle(CRxCoreStateless *rx_core); private: - uint8_t m_port_id; - std::string m_pcap_filename; - uint64_t m_limit; - MsgReply &m_reply; + uint8_t m_port_id; + uint64_t m_limit; + CaptureFilter m_filter; + MsgReply &m_reply; }; class TrexStatelessRxStopCapture : public TrexStatelessCpToRxMsgBase { public: - TrexStatelessRxStopCapture(uint8_t port_id) { - m_port_id = port_id; + TrexStatelessRxStopCapture(capture_id_t capture_id, MsgReply &reply) : m_reply(reply) { + m_capture_id = capture_id; } virtual bool handle(CRxCoreStateless *rx_core); private: - uint8_t m_port_id; + capture_id_t m_capture_id; + MsgReply &m_reply; }; @@ -556,7 +556,7 @@ private: class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgBase { public: - TrexStatelessRxQueueGetPkts(uint8_t port_id, MsgReply &reply) : m_reply(reply) { + TrexStatelessRxQueueGetPkts(uint8_t port_id, MsgReply &reply) : m_reply(reply) { m_port_id = port_id; } @@ -568,7 +568,7 @@ public: private: uint8_t m_port_id; - MsgReply &m_reply; + MsgReply &m_reply; }; diff --git a/src/stateless/rx/trex_stateless_capture.cpp b/src/stateless/rx/trex_stateless_capture.cpp index 83bb2d38..4ed126cc 100644 --- a/src/stateless/rx/trex_stateless_capture.cpp +++ b/src/stateless/rx/trex_stateless_capture.cpp @@ -53,22 +53,36 @@ TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) { m_pkt_buffer->push(m); } +void +TrexStatelessCaptureMngr::update_global_filter() { + CaptureFilter new_filter; + + for (TrexStatelessCapture *capture : m_captures) { + new_filter += capture->get_filter(); + } + + m_global_filter = new_filter; +} + capture_id_t TrexStatelessCaptureMngr::add(uint64_t limit, const CaptureFilter &filter) { if (m_captures.size() > MAX_CAPTURE_SIZE) { - throw TrexException(TrexException::T_CAPTURE_MAX_INSTANCES); + return CAPTURE_TOO_MANY_CAPTURES; } int new_id = m_id_counter++; TrexStatelessCapture *new_buffer = new TrexStatelessCapture(new_id, limit, filter); m_captures.push_back(new_buffer); + + /* update global filter */ + update_global_filter(); return new_id; } -void +capture_id_t TrexStatelessCaptureMngr::remove(capture_id_t id) { int index = -1; @@ -81,12 +95,18 @@ TrexStatelessCaptureMngr::remove(capture_id_t id) { /* does not exist */ if (index == -1) { - throw TrexException(TrexException::T_CAPTURE_INVALID_ID); + return CAPTURE_ID_NOT_FOUND; } TrexStatelessCapture *capture = m_captures[index]; m_captures.erase(m_captures.begin() + index); - delete capture; + + delete capture; + + /* update global filter */ + update_global_filter(); + + return id; } void diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h index f7cd451f..4d0b6a78 100644 --- a/src/stateless/rx/trex_stateless_capture.h +++ b/src/stateless/rx/trex_stateless_capture.h @@ -31,16 +31,16 @@ limitations under the License. class CaptureFilter { public: CaptureFilter() { - tx_active = 0; - rx_active = 0; + m_tx_active = 0; + m_rx_active = 0; } void add_tx(uint8_t port_id) { - tx_active |= (1LL << port_id); + m_tx_active |= (1LL << port_id); } void add_rx(uint8_t port_id) { - rx_active |= (1LL << port_id); + m_rx_active |= (1LL << port_id); } void add(uint8_t port_id) { @@ -63,21 +63,36 @@ public: bool in_rx(uint8_t port_id) const { uint64_t bit = (1LL << port_id); - return ((rx_active & bit) == bit); + return ((m_rx_active & bit) == bit); } bool in_tx(uint8_t port_id) const { uint64_t bit = (1LL << port_id); - return ((tx_active & bit) == bit); + return ((m_tx_active & bit) == bit); + } + + bool in_any(uint8_t port_id) const { + return ( in_tx(port_id) || in_rx(port_id) ); + } + + CaptureFilter& operator +=(const CaptureFilter &other) { + m_tx_active |= other.m_tx_active; + m_rx_active |= other.m_rx_active; + + return *this; } private: - uint64_t tx_active; - uint64_t rx_active; + uint64_t m_tx_active; + uint64_t m_rx_active; }; -typedef uint64_t capture_id_t; +typedef int64_t capture_id_t; +enum { + CAPTURE_ID_NOT_FOUND = -1, + CAPTURE_TOO_MANY_CAPTURES = -2, +}; class TrexStatelessCapture { public: @@ -93,6 +108,10 @@ public: return m_id; } + const CaptureFilter & get_filter() const { + return m_filter; + } + private: TrexPktBuffer *m_pkt_buffer; CaptureFilter m_filter; @@ -122,9 +141,11 @@ public: /** - * stops capture mode + * stops capture mode + * on success, will return the ID of the removed one + * o.w it will be an error */ - void remove(capture_id_t id); + capture_id_t remove(capture_id_t id); /** * removes all captures @@ -139,8 +160,8 @@ public: * * @return bool */ - bool is_active() const { - return (m_captures.size() != 0); + bool is_active(uint8_t port) const { + return m_global_filter.in_any(port); } /** @@ -153,7 +174,7 @@ public: */ void handle_pkt_rx(const rte_mbuf_t *m, int port) { /* fast path */ - if (!is_active()) { + if (!is_active(port)) { return; } @@ -169,11 +190,15 @@ private: } void handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port); + void update_global_filter(); std::vector m_captures; capture_id_t m_id_counter; + /* a union of all the filters curently active */ + CaptureFilter m_global_filter; + static const int MAX_CAPTURE_SIZE = 10; }; diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index d27485de..f1ba303a 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -69,11 +69,11 @@ void CRFC2544Info::export_data(rfc2544_info_t_ &obj) { void CRxCoreStateless::create(const CRxSlCfg &cfg) { m_capture = false; m_max_ports = cfg.m_max_ports; - + m_tx_cores = cfg.m_tx_cores; + CMessagingManager * cp_rx = CMsgIns::Ins()->getCpRx(); m_ring_from_cp = cp_rx->getRingCpToDp(0); - m_ring_to_cp = cp_rx->getRingDpToCp(0); m_state = STATE_IDLE; for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) { @@ -130,6 +130,36 @@ bool CRxCoreStateless::periodic_check_for_cp_messages() { } +void +CRxCoreStateless::periodic_check_for_dp_messages() { + + for (int i = 0; i < m_tx_cores; i++) { + periodic_check_for_dp_messages_core(i); + } + +} + +void +CRxCoreStateless::periodic_check_for_dp_messages_core(uint32_t core_id) { + + CNodeRing *ring = CMsgIns::Ins()->getRxDp()->getRingDpToCp(core_id); + + /* fast path */ + if ( likely ( ring->isEmpty() ) ) { + return; + } + + while (true) { + CGenNode *node = NULL; + + if (ring->Dequeue(node) != 0) { + break; + } + + //assert(node); + } +} + void CRxCoreStateless::recalculate_next_state() { if (m_state == STATE_QUIT) { return; @@ -175,16 +205,6 @@ void CRxCoreStateless::idle_state_loop() { } } -/** - * for each port give a tick (for flushing if needed) - * - */ -void CRxCoreStateless::port_manager_tick() { - for (int i = 0; i < m_max_ports; i++) { - m_rx_port_mngr[i].tick(); - } -} - /** * for each port handle the grat ARP mechansim * @@ -199,7 +219,6 @@ void CRxCoreStateless::handle_work_stage() { /* set the next sync time to */ dsec_t sync_time_sec = now_sec() + (1.0 / 1000); - dsec_t tick_time_sec = now_sec() + 1.0; dsec_t grat_arp_sec = now_sec() + (double)CGlobalInfo::m_options.m_arp_ref_per; while (m_state == STATE_WORKING) { @@ -211,14 +230,10 @@ void CRxCoreStateless::handle_work_stage() { if ( (now - sync_time_sec) > 0 ) { periodic_check_for_cp_messages(); + //periodic_check_for_dp_messages(); sync_time_sec = now + (1.0 / 1000); } - if ( (now - tick_time_sec) > 0) { - port_manager_tick(); - tick_time_sec = now + 1.0; - } - if ( (now - grat_arp_sec) > 0) { handle_grat_arp(); grat_arp_sec = now + (double)CGlobalInfo::m_options.m_arp_ref_per; @@ -317,16 +332,14 @@ double CRxCoreStateless::get_cpu_util() { } -void -CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit) { - m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit); - recalculate_next_state(); +capture_id_t +CRxCoreStateless::start_capture(uint64_t limit, const CaptureFilter &filter) { + return TrexStatelessCaptureMngr::getInstance().add(limit, filter); } -void -CRxCoreStateless::stop_recorder(uint8_t port_id) { - m_rx_port_mngr[port_id].stop_recorder(); - recalculate_next_state(); +capture_id_t +CRxCoreStateless::stop_capture(capture_id_t capture_id) { + return TrexStatelessCaptureMngr::getInstance().remove(capture_id); } void diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index 4eed59a1..21ed51ba 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -27,6 +27,7 @@ #include "pal/linux/sanb_atomic.h" #include "utl_cpuu.h" #include "trex_stateless_rx_port_mngr.h" +#include "trex_stateless_capture.h" class TrexStatelessCpToRxMsgBase; @@ -127,16 +128,16 @@ class CRxCoreStateless { double get_cpu_util(); void update_cpu_util(); - const RXPacketBuffer *get_rx_queue_pkts(uint8_t port_id) { + const TrexPktBuffer *get_rx_queue_pkts(uint8_t port_id) { return m_rx_port_mngr[port_id].get_pkt_buffer(); } /** - * start capturing of RX packets on a specific port + * start capturing packets * */ - void start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit); - void stop_recorder(uint8_t port_id); + capture_id_t start_capture(uint64_t limit, const CaptureFilter &filter); + capture_id_t stop_capture(capture_id_t capture_id); /** * start RX queueing of packets @@ -162,7 +163,12 @@ class CRxCoreStateless { private: void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg); + bool periodic_check_for_cp_messages(); + + void periodic_check_for_dp_messages(); + void periodic_check_for_dp_messages_core(uint32_t core_id); + void tickle(); void idle_state_loop(); @@ -172,7 +178,6 @@ class CRxCoreStateless { void capture_pkt(rte_mbuf_t *m); void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r); void handle_work_stage(); - void port_manager_tick(); void handle_grat_arp(); int process_all_pending_pkts(bool flush_rx = false); @@ -186,10 +191,10 @@ class CRxCoreStateless { private: TrexMonitor m_monitor; uint32_t m_max_ports; + uint32_t m_tx_cores; bool m_capture; state_e m_state; CNodeRing *m_ring_from_cp; - CNodeRing *m_ring_to_cp; CCpuUtlDp m_cpu_dp_u; CCpuUtlCp m_cpu_cp_u; diff --git a/src/stateless/rx/trex_stateless_rx_defs.h b/src/stateless/rx/trex_stateless_rx_defs.h index aefcc133..367cf4e3 100644 --- a/src/stateless/rx/trex_stateless_rx_defs.h +++ b/src/stateless/rx/trex_stateless_rx_defs.h @@ -38,10 +38,12 @@ class CRxSlCfg { m_max_ports = 0; m_cps = 0.0; m_num_crc_fix_bytes = 0; + m_tx_cores = 0; } public: uint32_t m_max_ports; + uint32_t m_tx_cores; double m_cps; CPortLatencyHWBase * m_ports[TREX_MAX_PORTS]; uint8_t m_num_crc_fix_bytes; diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp index e16b3d0c..d2e0b4e8 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp @@ -20,48 +20,10 @@ */ #include "bp_sim.h" #include "trex_stateless_rx_port_mngr.h" -#include "common/captureFile.h" #include "trex_stateless_rx_core.h" #include "common/Network/Packet/Arp.h" #include "pkt_gen.h" - -/** - * copy MBUF to a flat buffer - * - * @author imarom (12/20/2016) - * - * @param dest - buffer with at least rte_pktmbuf_pkt_len(m) - * bytes - * @param m - MBUF to copy - * - * @return uint8_t* - */ -void copy_mbuf(uint8_t *dest, const rte_mbuf_t *m) { - - int index = 0; - for (const rte_mbuf_t *it = m; it != NULL; it = it->next) { - const uint8_t *src = rte_pktmbuf_mtod(it, const uint8_t *); - memcpy(dest + index, src, it->data_len); - index += it->data_len; - } -} - -/************************************** - * RX packet - * - *************************************/ -RXPacket::RXPacket(const rte_mbuf_t *m) { - - /* allocate buffer */ - m_size = m->pkt_len; - m_raw = new uint8_t[m_size]; - - /* copy data */ - copy_mbuf(m_raw, m); - - /* generate a packet timestamp */ - m_timestamp = now_sec(); -} +#include "trex_stateless_capture.h" /************************************** * latency RX feature @@ -231,79 +193,12 @@ RXLatency::to_json() const { * *************************************/ -RXPacketBuffer::RXPacketBuffer(uint64_t size) { - m_buffer = nullptr; - m_head = 0; - m_tail = 0; - m_size = (size + 1); // for the empty/full difference 1 slot reserved - - /* generate queue */ - m_buffer = new RXPacket*[m_size](); // zeroed -} - -RXPacketBuffer::~RXPacketBuffer() { - assert(m_buffer); - - while (!is_empty()) { - RXPacket *pkt = pop(); - delete pkt; - } - delete [] m_buffer; -} - -void -RXPacketBuffer::push(const rte_mbuf_t *m) { - /* if full - pop the oldest */ - if (is_full()) { - delete pop(); - } - - /* push packet */ - m_buffer[m_head] = new RXPacket(m); - m_head = next(m_head); -} - -RXPacket * -RXPacketBuffer::pop() { - assert(!is_empty()); - - RXPacket *pkt = m_buffer[m_tail]; - m_tail = next(m_tail); - - return pkt; -} - -uint64_t -RXPacketBuffer::get_element_count() const { - if (m_head >= m_tail) { - return (m_head - m_tail); - } else { - return ( get_capacity() - (m_tail - m_head - 1) ); - } -} - -Json::Value -RXPacketBuffer::to_json() const { - - Json::Value output = Json::arrayValue; - - int tmp = m_tail; - while (tmp != m_head) { - RXPacket *pkt = m_buffer[tmp]; - output.append(pkt->to_json()); - tmp = next(tmp); - } - - return output; -} - - void RXQueue::start(uint64_t size) { if (m_pkt_buffer) { delete m_pkt_buffer; } - m_pkt_buffer = new RXPacketBuffer(size); + m_pkt_buffer = new TrexPktBuffer(size, TrexPktBuffer::MODE_DROP_HEAD); } void @@ -314,7 +209,7 @@ RXQueue::stop() { } } -const RXPacketBuffer * +const TrexPktBuffer * RXQueue::fetch() { /* if no buffer or the buffer is empty - give a NULL one */ @@ -323,10 +218,10 @@ RXQueue::fetch() { } /* hold a pointer to the old one */ - RXPacketBuffer *old_buffer = m_pkt_buffer; + TrexPktBuffer *old_buffer = m_pkt_buffer; /* replace the old one with a new one and freeze the old */ - m_pkt_buffer = new RXPacketBuffer(old_buffer->get_capacity()); + m_pkt_buffer = new TrexPktBuffer(old_buffer->get_capacity(), old_buffer->get_mode()); return old_buffer; } @@ -348,97 +243,6 @@ RXQueue::to_json() const { return output; } -/************************************** - * RX feature recorder - * - *************************************/ - -RXPacketRecorder::RXPacketRecorder() { - m_writer = NULL; - m_count = 0; - m_limit = 0; - m_epoch = -1; - - m_pending_flush = false; -} - -void -RXPacketRecorder::start(const std::string &pcap, uint64_t limit) { - m_writer = CCapWriterFactory::CreateWriter(LIBPCAP, (char *)pcap.c_str()); - if (m_writer == NULL) { - std::stringstream ss; - ss << "unable to create PCAP file: " << pcap; - throw TrexException(ss.str()); - } - - assert(limit > 0); - - m_limit = limit; - m_count = 0; - m_pending_flush = false; - m_pcap_filename = pcap; -} - -void -RXPacketRecorder::stop() { - if (!m_writer) { - return; - } - - delete m_writer; - m_writer = NULL; -} - -void -RXPacketRecorder::flush_to_disk() { - - if (m_writer && m_pending_flush) { - m_writer->flush_to_disk(); - m_pending_flush = false; - } -} - -void -RXPacketRecorder::handle_pkt(const rte_mbuf_t *m) { - if (!m_writer) { - return; - } - - dsec_t now = now_sec(); - if (m_epoch < 0) { - m_epoch = now; - } - - dsec_t dt = now - m_epoch; - - CPktNsecTimeStamp t_c(dt); - m_pkt.time_nsec = t_c.m_time_nsec; - m_pkt.time_sec = t_c.m_time_sec; - - copy_mbuf((uint8_t *)m_pkt.raw, m); - m_pkt.pkt_len = m->pkt_len; - - m_writer->write_packet(&m_pkt); - m_count++; - m_pending_flush = true; - - if (m_count == m_limit) { - stop(); - } - -} - -Json::Value -RXPacketRecorder::to_json() const { - Json::Value output = Json::objectValue; - - output["pcap_filename"] = m_pcap_filename; - output["limit"] = Json::UInt64(m_limit); - output["count"] = Json::UInt64(m_count); - - return output; -} - /************************************** * RX feature server (ARP, ICMP) and etc. @@ -786,10 +590,6 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) { m_latency.handle_pkt(m); } - if (is_feature_set(RECORDER)) { - m_recorder.handle_pkt(m); - } - if (is_feature_set(QUEUE)) { m_queue.handle_pkt(m); } @@ -797,6 +597,9 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) { if (is_feature_set(SERVER)) { m_server.handle_pkt(m); } + + /* capture */ + TrexStatelessCaptureMngr::getInstance().handle_pkt_rx(m, m_port_id); } int RXPortManager::process_all_pending_pkts(bool flush_rx) { @@ -834,13 +637,6 @@ int RXPortManager::process_all_pending_pkts(bool flush_rx) { return cnt_p; } -void -RXPortManager::tick() { - if (is_feature_set(RECORDER)) { - m_recorder.flush_to_disk(); - } -} - void RXPortManager::send_next_grat_arp() { if (is_feature_set(GRAT_ARP)) { @@ -887,13 +683,6 @@ RXPortManager::to_json() const { output["latency"]["is_active"] = false; } - if (is_feature_set(RECORDER)) { - output["sniffer"] = m_recorder.to_json(); - output["sniffer"]["is_active"] = true; - } else { - output["sniffer"]["is_active"] = false; - } - if (is_feature_set(QUEUE)) { output["queue"] = m_queue.to_json(); output["queue"]["is_active"] = true; diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h index 6efdae64..0cc60716 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.h +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h @@ -25,8 +25,7 @@ #include #include "common/base64.h" -#include "common/captureFile.h" - +#include "trex_stateless_pkt.h" class CPortLatencyHWBase; class CRFC2544Info; @@ -80,97 +79,12 @@ public: CRxCoreErrCntrs *m_err_cntrs; }; -/** - * describes a single saved RX packet - * - */ -class RXPacket { -public: - - RXPacket(const rte_mbuf_t *m); - - /* slow path and also RVO - pass by value is ok */ - Json::Value to_json() { - Json::Value output; - output["ts"] = m_timestamp; - output["binary"] = base64_encode(m_raw, m_size); - return output; - } - - ~RXPacket() { - if (m_raw) { - delete [] m_raw; - } - } - -private: - - uint8_t *m_raw; - uint16_t m_size; - dsec_t m_timestamp; -}; - /************************************** * RX feature queue * *************************************/ -class RXPacketBuffer { -public: - - RXPacketBuffer(uint64_t size); - ~RXPacketBuffer(); - - /** - * push a packet to the buffer - * - */ - void push(const rte_mbuf_t *m); - - /** - * generate a JSON output of the queue - * - */ - Json::Value to_json() const; - - - bool is_empty() const { - return (m_head == m_tail); - } - - bool is_full() const { - return ( next(m_head) == m_tail); - } - - /** - * return the total amount of space possible - */ - uint64_t get_capacity() const { - /* one slot is used for diff between full/empty */ - return (m_size - 1); - } - - /** - * returns how many elements are in the queue - */ - uint64_t get_element_count() const; - -private: - int next(int v) const { - return ( (v + 1) % m_size ); - } - - /* pop in case of full queue - internal usage */ - RXPacket * pop(); - - int m_head; - int m_tail; - int m_size; - RXPacket **m_buffer; -}; - - class RXQueue { public: RXQueue() { @@ -191,7 +105,7 @@ public: * fetch the current buffer * return NULL if no packets */ - const RXPacketBuffer * fetch(); + const TrexPktBuffer * fetch(); /** * stop RX queue @@ -204,42 +118,7 @@ public: Json::Value to_json() const; private: - RXPacketBuffer *m_pkt_buffer; -}; - -/************************************** - * RX feature PCAP recorder - * - *************************************/ - -class RXPacketRecorder { -public: - RXPacketRecorder(); - - ~RXPacketRecorder() { - stop(); - } - - void start(const std::string &pcap, uint64_t limit); - void stop(); - void handle_pkt(const rte_mbuf_t *m); - - /** - * flush any cached packets to disk - * - */ - void flush_to_disk(); - - Json::Value to_json() const; - -private: - CFileWriterBase *m_writer; - std::string m_pcap_filename; - CCapPktRaw m_pkt; - dsec_t m_epoch; - uint64_t m_limit; - uint64_t m_count; - bool m_pending_flush; + TrexPktBuffer *m_pkt_buffer; }; @@ -311,7 +190,6 @@ public: enum feature_t { NO_FEATURES = 0x0, LATENCY = 0x1, - RECORDER = 0x2, QUEUE = 0x4, SERVER = 0x8, GRAT_ARP = 0x10, @@ -354,17 +232,6 @@ public: unset_feature(LATENCY); } - /* recorder */ - void start_recorder(const std::string &pcap, uint64_t limit_pkts) { - m_recorder.start(pcap, limit_pkts); - set_feature(RECORDER); - } - - void stop_recorder() { - m_recorder.stop(); - unset_feature(RECORDER); - } - /* queue */ void start_queue(uint32_t size) { m_queue.start(size); @@ -376,7 +243,7 @@ public: unset_feature(QUEUE); } - const RXPacketBuffer *get_pkt_buffer() { + const TrexPktBuffer *get_pkt_buffer() { if (!is_feature_set(QUEUE)) { return nullptr; } @@ -414,13 +281,6 @@ public: */ void handle_pkt(const rte_mbuf_t *m); - /** - * maintenance - * - * @author imarom (11/24/2016) - */ - void tick(); - /** * send next grat arp (if on) * @@ -482,7 +342,6 @@ private: uint32_t m_features; uint8_t m_port_id; RXLatency m_latency; - RXPacketRecorder m_recorder; RXQueue m_queue; RXServer m_server; RXGratARP m_grat_arp; -- cgit From ac2e93d4247b2db94cd07301b274336bb08dec46 Mon Sep 17 00:00:00 2001 From: imarom Date: Wed, 11 Jan 2017 18:19:47 +0200 Subject: capture - draft commit Signed-off-by: imarom --- src/rpc-server/commands/trex_rpc_cmd_general.cpp | 118 +++++++++++- src/rpc-server/commands/trex_rpc_cmds.h | 8 +- src/rpc-server/trex_rpc_cmds_table.cpp | 2 +- src/stateless/cp/trex_stateless.cpp | 31 +--- src/stateless/cp/trex_stateless.h | 12 +- src/stateless/cp/trex_stateless_port.h | 15 +- .../messaging/trex_stateless_messaging.cpp | 46 ++++- src/stateless/messaging/trex_stateless_messaging.h | 54 ++++-- src/stateless/rx/trex_stateless_capture.cpp | 142 ++++++++++++-- src/stateless/rx/trex_stateless_capture.h | 205 +++++++++++++++++++-- src/stateless/rx/trex_stateless_rx_core.cpp | 14 -- src/stateless/rx/trex_stateless_rx_core.h | 10 +- 12 files changed, 534 insertions(+), 123 deletions(-) (limited to 'src') diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index ec5c3158..80f69fa3 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -29,6 +29,7 @@ limitations under the License. #include "trex_stateless_rx_core.h" #include "trex_stateless_capture.h" +#include "trex_stateless_messaging.h" #include #include @@ -844,13 +845,38 @@ TrexRpcCmdSetL3::_run(const Json::Value ¶ms, Json::Value &result) { return (TREX_RPC_CMD_OK); } - + + /** - * starts PCAP capturing + * capture command tree * */ trex_rpc_cmd_rc_e -TrexRpcCmdStartCapture::_run(const Json::Value ¶ms, Json::Value &result) { +TrexRpcCmdCapture::_run(const Json::Value ¶ms, Json::Value &result) { + const std::string cmd = parse_choice(params, "command", {"start", "stop", "fetch", "status"}, result); + + if (cmd == "start") { + parse_cmd_start(params, result); + } else if (cmd == "stop") { + parse_cmd_stop(params, result); + } else if (cmd == "fetch") { + parse_cmd_fetch(params, result); + } else if (cmd == "status") { + parse_cmd_status(params, result); + } else { + /* can't happen */ + assert(0); + } + + return TREX_RPC_CMD_OK; +} + +/** + * starts PCAP capturing + * + */ +void +TrexRpcCmdCapture::parse_cmd_start(const Json::Value ¶ms, Json::Value &result) { uint32_t limit = parse_uint32(params, "limit", result); const Json::Value &tx_json = parse_array(params, "tx", result); @@ -881,8 +907,90 @@ TrexRpcCmdStartCapture::_run(const Json::Value ¶ms, Json::Value &result) { } } - get_stateless_obj()->start_capture(filter, limit); + static MsgReply reply; + reply.reset(); + + TrexStatelessRxCaptureStart *start_msg = new TrexStatelessRxCaptureStart(filter, limit, reply); + get_stateless_obj()->send_msg_to_rx(start_msg); + + TrexCaptureRCStart rc = reply.wait_for_reply(); + if (!rc) { + generate_execute_err(result, rc.get_err()); + } result["result"] = Json::objectValue; - return (TREX_RPC_CMD_OK); } + +/** + * stops PCAP capturing + * + */ +void +TrexRpcCmdCapture::parse_cmd_stop(const Json::Value ¶ms, Json::Value &result) { + + uint32_t capture_id = parse_uint32(params, "capture_id", result); + + static MsgReply reply; + reply.reset(); + + TrexStatelessRxCaptureStop *stop_msg = new TrexStatelessRxCaptureStop(capture_id, reply); + get_stateless_obj()->send_msg_to_rx(stop_msg); + + TrexCaptureRCStop rc = reply.wait_for_reply(); + if (!rc) { + generate_execute_err(result, rc.get_err()); + } + + result["result"]["pkt_count"] = rc.get_pkt_count(); +} + +/** + * gets the status of all captures in the system + * + */ +void +TrexRpcCmdCapture::parse_cmd_status(const Json::Value ¶ms, Json::Value &result) { + + /* generate a status command */ + + static MsgReply reply; + reply.reset(); + + TrexStatelessRxCaptureStatus *status_msg = new TrexStatelessRxCaptureStatus(reply); + get_stateless_obj()->send_msg_to_rx(status_msg); + + TrexCaptureRCStatus rc = reply.wait_for_reply(); + if (!rc) { + generate_execute_err(result, rc.get_err()); + } + + result["result"] = rc.get_status(); +} + +/** + * fetch packets from a capture + * + */ +void +TrexRpcCmdCapture::parse_cmd_fetch(const Json::Value ¶ms, Json::Value &result) { + + uint32_t capture_id = parse_uint32(params, "capture_id", result); + uint32_t pkt_limit = parse_uint32(params, "pkt_limit", result); + + /* generate a fetch command */ + + static MsgReply reply; + reply.reset(); + + TrexStatelessRxCaptureFetch *fetch_msg = new TrexStatelessRxCaptureFetch(capture_id, pkt_limit, reply); + get_stateless_obj()->send_msg_to_rx(fetch_msg); + + TrexCaptureRCFetch rc = reply.wait_for_reply(); + if (!rc) { + generate_execute_err(result, rc.get_err()); + } + + result["result"]["pkts"] = rc.get_pkt_buffer()->to_json(); + result["result"]["pending"] = rc.get_pending(); +} + diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h index 1ea63cc7..bf78ff80 100644 --- a/src/rpc-server/commands/trex_rpc_cmds.h +++ b/src/rpc-server/commands/trex_rpc_cmds.h @@ -160,7 +160,13 @@ TREX_RPC_CMD_DEFINE(TrexRpcCmdGetRxQueuePkts, "get_rx_queue_pkts", 1, true, APIC TREX_RPC_CMD_DEFINE(TrexRpcCmdSetServiceMode, "service", 2, true, APIClass::API_CLASS_TYPE_CORE); -TREX_RPC_CMD_DEFINE(TrexRpcCmdStartCapture, "start_capture", 3, false, APIClass::API_CLASS_TYPE_CORE); +TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdCapture, "capture", 1, false, APIClass::API_CLASS_TYPE_CORE, + void parse_cmd_start(const Json::Value &msg, Json::Value &result); + void parse_cmd_stop(const Json::Value &msg, Json::Value &result); + void parse_cmd_status(const Json::Value &msg, Json::Value &result); + void parse_cmd_fetch(const Json::Value &msg, Json::Value &result); +); + #endif /* __TREX_RPC_CMD_H__ */ diff --git a/src/rpc-server/trex_rpc_cmds_table.cpp b/src/rpc-server/trex_rpc_cmds_table.cpp index 3d4d5a23..2af9f4f5 100644 --- a/src/rpc-server/trex_rpc_cmds_table.cpp +++ b/src/rpc-server/trex_rpc_cmds_table.cpp @@ -79,7 +79,7 @@ TrexRpcCommandsTable::TrexRpcCommandsTable() { register_command(new TrexRpcCmdSetL2()); register_command(new TrexRpcCmdSetL3()); - register_command(new TrexRpcCmdStartCapture()); + register_command(new TrexRpcCmdCapture()); } diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index 32babbf7..6ab9b417 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -19,7 +19,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -//#include #include #include @@ -142,35 +141,11 @@ TrexStateless::get_dp_core_count() { return m_platform_api->get_dp_core_count(); } -capture_id_t -TrexStateless::start_capture(const CaptureFilter &filter, uint64_t limit) { - static MsgReply reply; - - reply.reset(); - - CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0); - TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(filter, limit, reply); - - ring->Enqueue((CGenNode *)msg); - - capture_id_t new_id = reply.wait_for_reply(); - - return (new_id); -} +void +TrexStateless::send_msg_to_rx(TrexStatelessCpToRxMsgBase *msg) const { -capture_id_t -TrexStateless::stop_capture(capture_id_t capture_id) { - static MsgReply reply; - - reply.reset(); - CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0); - TrexStatelessRxStopCapture *msg = new TrexStatelessRxStopCapture(capture_id, reply); - ring->Enqueue((CGenNode *)msg); - - capture_id_t rc = reply.wait_for_reply(); - - return (rc); } + diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h index 33f16ce9..87d227f6 100644 --- a/src/stateless/cp/trex_stateless.h +++ b/src/stateless/cp/trex_stateless.h @@ -102,7 +102,6 @@ public: * defines the TRex stateless operation mode * */ -class CaptureFilter; class TrexStateless { public: @@ -133,16 +132,9 @@ public: /** - * starts a capture on a 'filter' of ports - * with a limit of packets + * send a message to the RX core */ - capture_id_t start_capture(const CaptureFilter &filter, uint64_t limit); - - /** - * stops an active capture - * - */ - capture_id_t stop_capture(capture_id_t capture_id); + void send_msg_to_rx(TrexStatelessCpToRxMsgBase *msg) const; /** * shutdown the server diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index 2cc1b9ca..0ef8ae60 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -140,7 +140,7 @@ public: } if (TrexStatelessCaptureMngr::getInstance().is_active(m_port_id)) { - throw TrexException("unable to disable service - an active capture on port " + std::to_string(m_port_id) + " exists"); + throw TrexException("unable to disable service mode - an active capture on port " + std::to_string(m_port_id) + " exists"); } m_port_attr->set_rx_filter_mode(RX_FILTER_MODE_HW); @@ -439,19 +439,6 @@ public: void get_pci_info(std::string &pci_addr, int &numa_node); - - /** - * starts capturing packets - * - */ - void start_capture(capture_mode_e mode, uint64_t limit); - - /** - * stops capturing packets - * - */ - void stop_capture(); - /** * start RX queueing of packets * diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index f441c692..b9bb1d1c 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -262,24 +262,58 @@ bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) { bool -TrexStatelessRxStartCapture::handle(CRxCoreStateless *rx_core) { - capture_id_t capture_id = rx_core->start_capture(m_limit, m_filter); +TrexStatelessRxCaptureStart::handle(CRxCoreStateless *rx_core) { + + TrexCaptureRCStart start_rc; + + TrexStatelessCaptureMngr::getInstance().start(m_filter, m_limit, start_rc); + + /* mark as done */ + m_reply.set_reply(start_rc); + + return true; +} + +bool +TrexStatelessRxCaptureStop::handle(CRxCoreStateless *rx_core) { + + TrexCaptureRCStop stop_rc; + + TrexStatelessCaptureMngr::getInstance().stop(m_capture_id, stop_rc); + + /* mark as done */ + m_reply.set_reply(stop_rc); + + return true; +} +bool +TrexStatelessRxCaptureFetch::handle(CRxCoreStateless *rx_core) { + + TrexCaptureRCFetch fetch_rc; + + TrexStatelessCaptureMngr::getInstance().fetch(m_capture_id, m_pkt_limit, fetch_rc); + /* mark as done */ - m_reply.set_reply(capture_id); + m_reply.set_reply(fetch_rc); return true; } bool -TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) { - capture_id_t rc = rx_core->stop_capture(m_capture_id); +TrexStatelessRxCaptureStatus::handle(CRxCoreStateless *rx_core) { + + TrexCaptureRCStatus status_rc; - m_reply.set_reply(rc); + status_rc.set_status(TrexStatelessCaptureMngr::getInstance().to_json()); + + /* mark as done */ + m_reply.set_reply(status_rc); return true; } + bool TrexStatelessRxStartQueue::handle(CRxCoreStateless *rx_core) { rx_core->start_queue(m_port_id, m_size); diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index 5f4978f5..4027d075 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -485,12 +485,16 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase { }; +class TrexStatelessRxCapture : public TrexStatelessCpToRxMsgBase { +public: + virtual bool handle (CRxCoreStateless *rx_core) = 0; +}; -class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase { +class TrexStatelessRxCaptureStart : public TrexStatelessRxCapture { public: - TrexStatelessRxStartCapture(const CaptureFilter& filter, + TrexStatelessRxCaptureStart(const CaptureFilter& filter, uint64_t limit, - MsgReply &reply) : m_reply(reply) { + MsgReply &reply) : m_reply(reply) { m_limit = limit; m_filter = filter; @@ -499,24 +503,52 @@ public: virtual bool handle(CRxCoreStateless *rx_core); private: - uint8_t m_port_id; - uint64_t m_limit; - CaptureFilter m_filter; - MsgReply &m_reply; + uint8_t m_port_id; + uint64_t m_limit; + CaptureFilter m_filter; + MsgReply &m_reply; +}; + + +class TrexStatelessRxCaptureStop : public TrexStatelessRxCapture { +public: + TrexStatelessRxCaptureStop(capture_id_t capture_id, MsgReply &reply) : m_reply(reply) { + m_capture_id = capture_id; + } + + virtual bool handle(CRxCoreStateless *rx_core); + +private: + capture_id_t m_capture_id; + MsgReply &m_reply; }; -class TrexStatelessRxStopCapture : public TrexStatelessCpToRxMsgBase { +class TrexStatelessRxCaptureFetch : public TrexStatelessRxCapture { public: - TrexStatelessRxStopCapture(capture_id_t capture_id, MsgReply &reply) : m_reply(reply) { + TrexStatelessRxCaptureFetch(capture_id_t capture_id, uint32_t pkt_limit, MsgReply &reply) : m_reply(reply) { m_capture_id = capture_id; + m_pkt_limit = pkt_limit; + } + + virtual bool handle(CRxCoreStateless *rx_core); + +private: + capture_id_t m_capture_id; + uint32_t m_pkt_limit; + MsgReply &m_reply; +}; + + +class TrexStatelessRxCaptureStatus : public TrexStatelessRxCapture { +public: + TrexStatelessRxCaptureStatus(MsgReply &reply) : m_reply(reply) { } virtual bool handle(CRxCoreStateless *rx_core); private: - capture_id_t m_capture_id; - MsgReply &m_reply; + MsgReply &m_reply; }; diff --git a/src/stateless/rx/trex_stateless_capture.cpp b/src/stateless/rx/trex_stateless_capture.cpp index 4ed126cc..85be7aef 100644 --- a/src/stateless/rx/trex_stateless_capture.cpp +++ b/src/stateless/rx/trex_stateless_capture.cpp @@ -25,6 +25,7 @@ TrexStatelessCapture::TrexStatelessCapture(capture_id_t id, uint64_t limit, cons m_id = id; m_pkt_buffer = new TrexPktBuffer(limit, TrexPktBuffer::MODE_DROP_TAIL); m_filter = filter; + m_state = STATE_ACTIVE; } TrexStatelessCapture::~TrexStatelessCapture() { @@ -35,9 +36,15 @@ TrexStatelessCapture::~TrexStatelessCapture() { void TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) { + + if (m_state != STATE_ACTIVE) { + delete pkt; + return; + } /* if not in filter - back off */ if (!m_filter.in_filter(pkt)) { + delete pkt; return; } @@ -46,6 +53,11 @@ TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) { void TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) { + + if (m_state != STATE_ACTIVE) { + return; + } + if (!m_filter.in_rx(port)) { return; } @@ -53,6 +65,56 @@ TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) { m_pkt_buffer->push(m); } + +Json::Value +TrexStatelessCapture::to_json() const { + Json::Value output = Json::objectValue; + + output["id"] = Json::UInt64(m_id); + output["filter"] = m_filter.to_json(); + output["count"] = m_pkt_buffer->get_element_count(); + output["bytes"] = m_pkt_buffer->get_bytes(); + output["limit"] = m_pkt_buffer->get_capacity(); + + switch (m_state) { + case STATE_ACTIVE: + output["state"] = "ACTIVE"; + break; + + case STATE_STOPPED: + output["state"] = "STOPPED"; + break; + + default: + assert(0); + + } + + return output; +} + +TrexPktBuffer * +TrexStatelessCapture::fetch(uint32_t pkt_limit, uint32_t &pending) { + + /* if the total sum of packets is within the limit range - take it */ + if (m_pkt_buffer->get_element_count() <= pkt_limit) { + TrexPktBuffer *current = m_pkt_buffer; + m_pkt_buffer = new TrexPktBuffer(m_pkt_buffer->get_capacity(), m_pkt_buffer->get_mode()); + pending = 0; + return current; + } + + /* harder part - partial fetch */ + TrexPktBuffer *partial = new TrexPktBuffer(pkt_limit); + for (int i = 0; i < pkt_limit; i++) { + const TrexPkt *pkt = m_pkt_buffer->pop(); + partial->push(pkt); + } + + pending = m_pkt_buffer->get_element_count(); + return partial; +} + void TrexStatelessCaptureMngr::update_global_filter() { CaptureFilter new_filter; @@ -64,11 +126,25 @@ TrexStatelessCaptureMngr::update_global_filter() { m_global_filter = new_filter; } -capture_id_t -TrexStatelessCaptureMngr::add(uint64_t limit, const CaptureFilter &filter) { +TrexStatelessCapture * +TrexStatelessCaptureMngr::lookup(capture_id_t capture_id) { + for (int i = 0; i < m_captures.size(); i++) { + if (m_captures[i]->get_id() == capture_id) { + return m_captures[i]; + } + } + + /* does not exist */ + return nullptr; +} + +void +TrexStatelessCaptureMngr::start(const CaptureFilter &filter, uint64_t limit, TrexCaptureRCStart &rc) { + if (m_captures.size() > MAX_CAPTURE_SIZE) { - return CAPTURE_TOO_MANY_CAPTURES; + rc.set_err(TrexCaptureRC::RC_CAPTURE_LIMIT_REACHED); + return; } @@ -79,15 +155,46 @@ TrexStatelessCaptureMngr::add(uint64_t limit, const CaptureFilter &filter) { /* update global filter */ update_global_filter(); - return new_id; + /* result */ + rc.set_new_id(new_id); +} + +void +TrexStatelessCaptureMngr::stop(capture_id_t capture_id, TrexCaptureRCStop &rc) { + TrexStatelessCapture *capture = lookup(capture_id); + if (!capture) { + rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND); + return; + } + + capture->stop(); + rc.set_count(capture->get_pkt_count()); } -capture_id_t -TrexStatelessCaptureMngr::remove(capture_id_t id) { +void +TrexStatelessCaptureMngr::fetch(capture_id_t capture_id, uint32_t pkt_limit, TrexCaptureRCFetch &rc) { + TrexStatelessCapture *capture = lookup(capture_id); + if (!capture) { + rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND); + return; + } + if (capture->is_active()) { + rc.set_err(TrexCaptureRC::RC_CAPTURE_FETCH_UNDER_ACTIVE); + return; + } + uint32_t pending = 0; + TrexPktBuffer *pkt_buffer = capture->fetch(pkt_limit, pending); + + rc.set_pkt_buffer(pkt_buffer, pending); +} + +void +TrexStatelessCaptureMngr::remove(capture_id_t capture_id, TrexCaptureRCRemove &rc) { + int index = -1; for (int i = 0; i < m_captures.size(); i++) { - if (m_captures[i]->get_id() == id) { + if (m_captures[i]->get_id() == capture_id) { index = i; break; } @@ -95,24 +202,26 @@ TrexStatelessCaptureMngr::remove(capture_id_t id) { /* does not exist */ if (index == -1) { - return CAPTURE_ID_NOT_FOUND; + rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND); + return; } TrexStatelessCapture *capture = m_captures[index]; m_captures.erase(m_captures.begin() + index); + /* free memory */ delete capture; /* update global filter */ update_global_filter(); - - return id; } void TrexStatelessCaptureMngr::reset() { + TrexCaptureRCRemove dummy; + while (m_captures.size() > 0) { - remove(m_captures[0]->get_id()); + remove(m_captures[0]->get_id(), dummy); } } @@ -130,3 +239,14 @@ TrexStatelessCaptureMngr::handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port) } } +Json::Value +TrexStatelessCaptureMngr::to_json() const { + Json::Value lst = Json::arrayValue; + + for (TrexStatelessCapture *capture : m_captures) { + lst.append(capture->to_json()); + } + + return lst; +} + diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h index 4d0b6a78..6cd25a94 100644 --- a/src/stateless/rx/trex_stateless_capture.h +++ b/src/stateless/rx/trex_stateless_capture.h @@ -22,7 +22,146 @@ limitations under the License. #define __TREX_STATELESS_CAPTURE_H__ #include +#include + #include "trex_stateless_pkt.h" +#include "trex_stateless_capture_msg.h" + +typedef int64_t capture_id_t; + +class TrexCaptureRC { +public: + + TrexCaptureRC() { + m_rc = RC_INVALID; + m_pkt_buffer = NULL; + } + + enum rc_e { + RC_INVALID = 0, + RC_OK = 1, + RC_CAPTURE_NOT_FOUND, + RC_CAPTURE_LIMIT_REACHED, + RC_CAPTURE_FETCH_UNDER_ACTIVE + }; + + bool operator !() const { + return (m_rc != RC_OK); + } + + std::string get_err() const { + assert(m_rc != RC_INVALID); + + switch (m_rc) { + case RC_OK: + return ""; + case RC_CAPTURE_LIMIT_REACHED: + return "capture limit has reached"; + case RC_CAPTURE_NOT_FOUND: + return "capture ID not found"; + case RC_CAPTURE_FETCH_UNDER_ACTIVE: + return "fetch command cannot be executed on an active capture"; + default: + assert(0); + } + } + + void set_err(rc_e rc) { + m_rc = rc; + } + + Json::Value get_json() const { + return m_json_rc; + } + +public: + rc_e m_rc; + capture_id_t m_capture_id; + TrexPktBuffer *m_pkt_buffer; + Json::Value m_json_rc; +}; + +class TrexCaptureRCStart : public TrexCaptureRC { +public: + + void set_new_id(capture_id_t new_id) { + m_capture_id = new_id; + m_rc = RC_OK; + } + + capture_id_t get_new_id() const { + return m_capture_id; + } + +private: + capture_id_t m_capture_id; +}; + + +class TrexCaptureRCStop : public TrexCaptureRC { +public: + void set_count(uint32_t pkt_count) { + m_pkt_count = pkt_count; + m_rc = RC_OK; + } + + uint32_t get_pkt_count() const { + return m_pkt_count; + } + +private: + uint32_t m_pkt_count; +}; + +class TrexCaptureRCFetch : public TrexCaptureRC { +public: + + TrexCaptureRCFetch() { + m_pkt_buffer = nullptr; + m_pending = 0; + } + + void set_pkt_buffer(const TrexPktBuffer *pkt_buffer, uint32_t pending) { + m_pkt_buffer = pkt_buffer; + m_pending = pending; + m_rc = RC_OK; + } + + const TrexPktBuffer *get_pkt_buffer() const { + return m_pkt_buffer; + } + + uint32_t get_pending() const { + return m_pending; + } + +private: + const TrexPktBuffer *m_pkt_buffer; + uint32_t m_pending; +}; + +class TrexCaptureRCRemove : public TrexCaptureRC { +public: + void set_ok() { + m_rc = RC_OK; + } +}; + +class TrexCaptureRCStatus : public TrexCaptureRC { +public: + + void set_status(const Json::Value &json) { + m_json = json; + m_rc = RC_OK; + } + + const Json::Value & get_status() const { + return m_json; + } + +private: + Json::Value m_json; +}; /** * capture filter @@ -82,20 +221,27 @@ public: return *this; } + Json::Value to_json() const { + Json::Value output = Json::objectValue; + output["tx"] = Json::UInt64(m_tx_active); + output["rx"] = Json::UInt64(m_rx_active); + + return output; + } + private: uint64_t m_tx_active; uint64_t m_rx_active; }; -typedef int64_t capture_id_t; -enum { - CAPTURE_ID_NOT_FOUND = -1, - CAPTURE_TOO_MANY_CAPTURES = -2, -}; class TrexStatelessCapture { public: + enum state_e { + STATE_ACTIVE, + STATE_STOPPED, + }; TrexStatelessCapture(capture_id_t id, uint64_t limit, const CaptureFilter &filter); @@ -112,7 +258,24 @@ public: return m_filter; } + Json::Value to_json() const; + + void stop() { + m_state = STATE_STOPPED; + } + + TrexPktBuffer * fetch(uint32_t pkt_limit, uint32_t &pending); + + bool is_active() const { + return m_state == STATE_ACTIVE; + } + + uint32_t get_pkt_count() const { + return m_pkt_buffer->get_element_count(); + } + private: + state_e m_state; TrexPktBuffer *m_pkt_buffer; CaptureFilter m_filter; uint64_t m_id; @@ -134,18 +297,28 @@ public: } /** - * adds a capture buffer - * returns ID + * starts a new capture */ - capture_id_t add(uint64_t limit, const CaptureFilter &filter); + void start(const CaptureFilter &filter, uint64_t limit, TrexCaptureRCStart &rc); - /** - * stops capture mode - * on success, will return the ID of the removed one - * o.w it will be an error + * stops an existing capture + * */ - capture_id_t remove(capture_id_t id); + void stop(capture_id_t capture_id, TrexCaptureRCStop &rc); + + /** + * fetch packets from an existing capture + * + */ + void fetch(capture_id_t capture_id, uint32_t pkt_limit, TrexCaptureRCFetch &rc); + + /** + * removes an existing capture + * all packets captured will be detroyed + */ + void remove(capture_id_t capture_id, TrexCaptureRCRemove &rc); + /** * removes all captures @@ -153,6 +326,7 @@ public: */ void reset(); + /** * return true if any filter is active * @@ -182,6 +356,8 @@ public: handle_pkt_rx_slow_path(m, port); } + Json::Value to_json() const; + private: TrexStatelessCaptureMngr() { @@ -189,6 +365,9 @@ private: m_id_counter = 1; } + + TrexStatelessCapture * lookup(capture_id_t capture_id); + void handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port); void update_global_filter(); diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index f1ba303a..00c18082 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -270,10 +270,6 @@ void CRxCoreStateless::start() { m_monitor.disable(); } -void CRxCoreStateless::capture_pkt(rte_mbuf_t *m) { - -} - int CRxCoreStateless::process_all_pending_pkts(bool flush_rx) { int total_pkts = 0; @@ -332,16 +328,6 @@ double CRxCoreStateless::get_cpu_util() { } -capture_id_t -CRxCoreStateless::start_capture(uint64_t limit, const CaptureFilter &filter) { - return TrexStatelessCaptureMngr::getInstance().add(limit, filter); -} - -capture_id_t -CRxCoreStateless::stop_capture(capture_id_t capture_id) { - return TrexStatelessCaptureMngr::getInstance().remove(capture_id); -} - void CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size) { m_rx_port_mngr[port_id].start_queue(size); diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index 21ed51ba..954a5f04 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -131,14 +131,7 @@ class CRxCoreStateless { const TrexPktBuffer *get_rx_queue_pkts(uint8_t port_id) { return m_rx_port_mngr[port_id].get_pkt_buffer(); } - - /** - * start capturing packets - * - */ - capture_id_t start_capture(uint64_t limit, const CaptureFilter &filter); - capture_id_t stop_capture(capture_id_t capture_id); - + /** * start RX queueing of packets * @@ -175,7 +168,6 @@ class CRxCoreStateless { void recalculate_next_state(); bool are_any_features_active(); - void capture_pkt(rte_mbuf_t *m); void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r); void handle_work_stage(); void handle_grat_arp(); -- cgit From 951b09ef1b892594840f091f861f11ad274541ec Mon Sep 17 00:00:00 2001 From: imarom Date: Wed, 18 Jan 2017 13:08:41 +0200 Subject: many capture modes in Python console Signed-off-by: imarom --- src/rpc-server/commands/trex_rpc_cmd_general.cpp | 43 ++++++++++++++++++---- src/rpc-server/commands/trex_rpc_cmds.h | 1 + .../messaging/trex_stateless_messaging.cpp | 13 +++++++ src/stateless/messaging/trex_stateless_messaging.h | 15 ++++++++ src/stateless/rx/trex_stateless_capture.cpp | 9 ++--- src/stateless/rx/trex_stateless_capture.h | 2 +- 6 files changed, 68 insertions(+), 15 deletions(-) (limited to 'src') diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index 80f69fa3..8f7431e4 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -853,7 +853,7 @@ TrexRpcCmdSetL3::_run(const Json::Value ¶ms, Json::Value &result) { */ trex_rpc_cmd_rc_e TrexRpcCmdCapture::_run(const Json::Value ¶ms, Json::Value &result) { - const std::string cmd = parse_choice(params, "command", {"start", "stop", "fetch", "status"}, result); + const std::string cmd = parse_choice(params, "command", {"start", "stop", "fetch", "status", "remove"}, result); if (cmd == "start") { parse_cmd_start(params, result); @@ -863,6 +863,8 @@ TrexRpcCmdCapture::_run(const Json::Value ¶ms, Json::Value &result) { parse_cmd_fetch(params, result); } else if (cmd == "status") { parse_cmd_status(params, result); + } else if (cmd == "remove") { + parse_cmd_remove(params, result); } else { /* can't happen */ assert(0); @@ -878,10 +880,9 @@ TrexRpcCmdCapture::_run(const Json::Value ¶ms, Json::Value &result) { void TrexRpcCmdCapture::parse_cmd_start(const Json::Value ¶ms, Json::Value &result) { - uint32_t limit = parse_uint32(params, "limit", result); - const Json::Value &tx_json = parse_array(params, "tx", result); - const Json::Value &rx_json = parse_array(params, "rx", result); - + uint32_t limit = parse_uint32(params, "limit", result); + const Json::Value &tx_json = parse_array(params, "tx", result); + const Json::Value &rx_json = parse_array(params, "rx", result); CaptureFilter filter; std::set ports; @@ -909,7 +910,7 @@ TrexRpcCmdCapture::parse_cmd_start(const Json::Value ¶ms, Json::Value &resul static MsgReply reply; reply.reset(); - + TrexStatelessRxCaptureStart *start_msg = new TrexStatelessRxCaptureStart(filter, limit, reply); get_stateless_obj()->send_msg_to_rx(start_msg); @@ -918,7 +919,7 @@ TrexRpcCmdCapture::parse_cmd_start(const Json::Value ¶ms, Json::Value &resul generate_execute_err(result, rc.get_err()); } - result["result"] = Json::objectValue; + result["result"]["capture_id"] = rc.get_new_id(); } /** @@ -990,7 +991,33 @@ TrexRpcCmdCapture::parse_cmd_fetch(const Json::Value ¶ms, Json::Value &resul generate_execute_err(result, rc.get_err()); } - result["result"]["pkts"] = rc.get_pkt_buffer()->to_json(); + const TrexPktBuffer *pkt_buffer = rc.get_pkt_buffer(); + result["result"]["pending"] = rc.get_pending(); + result["result"]["pkts"] = pkt_buffer->to_json(); + + /* delete the buffer */ + delete pkt_buffer; +} + +void +TrexRpcCmdCapture::parse_cmd_remove(const Json::Value ¶ms, Json::Value &result) { + + uint32_t capture_id = parse_uint32(params, "capture_id", result); + + /* generate a remove command */ + + static MsgReply reply; + reply.reset(); + + TrexStatelessRxCaptureRemove *remove_msg = new TrexStatelessRxCaptureRemove(capture_id, reply); + get_stateless_obj()->send_msg_to_rx(remove_msg); + + TrexCaptureRCRemove rc = reply.wait_for_reply(); + if (!rc) { + generate_execute_err(result, rc.get_err()); + } + + result["result"] = Json::objectValue; } diff --git a/src/rpc-server/commands/trex_rpc_cmds.h b/src/rpc-server/commands/trex_rpc_cmds.h index bf78ff80..54797bdf 100644 --- a/src/rpc-server/commands/trex_rpc_cmds.h +++ b/src/rpc-server/commands/trex_rpc_cmds.h @@ -165,6 +165,7 @@ TREX_RPC_CMD_DEFINE_EXTENDED(TrexRpcCmdCapture, "capture", 1, false, APIClass:: void parse_cmd_stop(const Json::Value &msg, Json::Value &result); void parse_cmd_status(const Json::Value &msg, Json::Value &result); void parse_cmd_fetch(const Json::Value &msg, Json::Value &result); + void parse_cmd_remove(const Json::Value ¶ms, Json::Value &result); ); diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index b9bb1d1c..2452487c 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -313,6 +313,19 @@ TrexStatelessRxCaptureStatus::handle(CRxCoreStateless *rx_core) { return true; } +bool +TrexStatelessRxCaptureRemove::handle(CRxCoreStateless *rx_core) { + + TrexCaptureRCRemove remove_rc; + + TrexStatelessCaptureMngr::getInstance().remove(m_capture_id, remove_rc); + + /* mark as done */ + m_reply.set_reply(remove_rc); + + return true; +} + bool TrexStatelessRxStartQueue::handle(CRxCoreStateless *rx_core) { diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index 4027d075..3535ad4f 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -552,6 +552,21 @@ private: }; + +class TrexStatelessRxCaptureRemove : public TrexStatelessRxCapture { +public: + TrexStatelessRxCaptureRemove(capture_id_t capture_id, MsgReply &reply) : m_reply(reply) { + m_capture_id = capture_id; + } + + virtual bool handle(CRxCoreStateless *rx_core); + +private: + capture_id_t m_capture_id; + MsgReply &m_reply; +}; + + class TrexStatelessRxStartQueue : public TrexStatelessCpToRxMsgBase { public: TrexStatelessRxStartQueue(uint8_t port_id, diff --git a/src/stateless/rx/trex_stateless_capture.cpp b/src/stateless/rx/trex_stateless_capture.cpp index 85be7aef..5d43cede 100644 --- a/src/stateless/rx/trex_stateless_capture.cpp +++ b/src/stateless/rx/trex_stateless_capture.cpp @@ -62,7 +62,7 @@ TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) { return; } - m_pkt_buffer->push(m); + m_pkt_buffer->push(m, port, TrexPkt::ORIGIN_RX); } @@ -87,7 +87,6 @@ TrexStatelessCapture::to_json() const { default: assert(0); - } return output; @@ -178,10 +177,6 @@ TrexStatelessCaptureMngr::fetch(capture_id_t capture_id, uint32_t pkt_limit, Tre rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND); return; } - if (capture->is_active()) { - rc.set_err(TrexCaptureRC::RC_CAPTURE_FETCH_UNDER_ACTIVE); - return; - } uint32_t pending = 0; TrexPktBuffer *pkt_buffer = capture->fetch(pkt_limit, pending); @@ -214,6 +209,8 @@ TrexStatelessCaptureMngr::remove(capture_id_t capture_id, TrexCaptureRCRemove &r /* update global filter */ update_global_filter(); + + rc.set_ok(); } void diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h index 6cd25a94..4a9efea7 100644 --- a/src/stateless/rx/trex_stateless_capture.h +++ b/src/stateless/rx/trex_stateless_capture.h @@ -27,7 +27,7 @@ limitations under the License. #include "trex_stateless_pkt.h" #include "trex_stateless_capture_msg.h" -typedef int64_t capture_id_t; +typedef int32_t capture_id_t; class TrexCaptureRC { public: -- cgit From 641fed03d8e407b6dca94f5280b9a1b4c768f601 Mon Sep 17 00:00:00 2001 From: imarom Date: Thu, 19 Jan 2017 13:30:48 +0200 Subject: fine tune Signed-off-by: imarom --- src/rpc-server/commands/trex_rpc_cmd_general.cpp | 1 + 1 file changed, 1 insertion(+) (limited to 'src') diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index 8f7431e4..be261fbb 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -920,6 +920,7 @@ TrexRpcCmdCapture::parse_cmd_start(const Json::Value ¶ms, Json::Value &resul } result["result"]["capture_id"] = rc.get_new_id(); + result["result"]["ts"] = now_sec(); } /** -- cgit From f5f92b068561dcdf8414494e5daf6d285ea24135 Mon Sep 17 00:00:00 2001 From: imarom Date: Sun, 22 Jan 2017 15:36:20 +0200 Subject: few tweaks Signed-off-by: imarom --- src/rpc-server/commands/trex_rpc_cmd_general.cpp | 5 +++-- src/stateless/rx/trex_stateless_capture.cpp | 19 ++++++++++++++----- src/stateless/rx/trex_stateless_capture.h | 24 ++++++++++++++++++------ 3 files changed, 35 insertions(+), 13 deletions(-) (limited to 'src') diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index be261fbb..55249fc8 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -994,8 +994,9 @@ TrexRpcCmdCapture::parse_cmd_fetch(const Json::Value ¶ms, Json::Value &resul const TrexPktBuffer *pkt_buffer = rc.get_pkt_buffer(); - result["result"]["pending"] = rc.get_pending(); - result["result"]["pkts"] = pkt_buffer->to_json(); + result["result"]["pending"] = rc.get_pending(); + result["result"]["start_ts"] = rc.get_start_ts(); + result["result"]["pkts"] = pkt_buffer->to_json(); /* delete the buffer */ delete pkt_buffer; diff --git a/src/stateless/rx/trex_stateless_capture.cpp b/src/stateless/rx/trex_stateless_capture.cpp index 5d43cede..f0d4e806 100644 --- a/src/stateless/rx/trex_stateless_capture.cpp +++ b/src/stateless/rx/trex_stateless_capture.cpp @@ -26,6 +26,8 @@ TrexStatelessCapture::TrexStatelessCapture(capture_id_t id, uint64_t limit, cons m_pkt_buffer = new TrexPktBuffer(limit, TrexPktBuffer::MODE_DROP_TAIL); m_filter = filter; m_state = STATE_ACTIVE; + m_start_ts = now_sec(); + m_pkt_index = 0; } TrexStatelessCapture::~TrexStatelessCapture() { @@ -35,7 +37,7 @@ TrexStatelessCapture::~TrexStatelessCapture() { } void -TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) { +TrexStatelessCapture::handle_pkt_tx(TrexPkt *pkt) { if (m_state != STATE_ACTIVE) { delete pkt; @@ -48,6 +50,12 @@ TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) { return; } + if (pkt->get_ts() < m_start_ts) { + delete pkt; + return; + } + + pkt->set_index(++m_pkt_index); m_pkt_buffer->push(pkt); } @@ -62,7 +70,7 @@ TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) { return; } - m_pkt_buffer->push(m, port, TrexPkt::ORIGIN_RX); + m_pkt_buffer->push(m, port, TrexPkt::ORIGIN_RX, ++m_pkt_index); } @@ -110,7 +118,8 @@ TrexStatelessCapture::fetch(uint32_t pkt_limit, uint32_t &pending) { partial->push(pkt); } - pending = m_pkt_buffer->get_element_count(); + pending = m_pkt_buffer->get_element_count(); + return partial; } @@ -181,7 +190,7 @@ TrexStatelessCaptureMngr::fetch(capture_id_t capture_id, uint32_t pkt_limit, Tre uint32_t pending = 0; TrexPktBuffer *pkt_buffer = capture->fetch(pkt_limit, pending); - rc.set_pkt_buffer(pkt_buffer, pending); + rc.set_pkt_buffer(pkt_buffer, pending, capture->get_start_ts()); } void @@ -223,7 +232,7 @@ TrexStatelessCaptureMngr::reset() { } void -TrexStatelessCaptureMngr::handle_pkt_tx(const TrexPkt *pkt) { +TrexStatelessCaptureMngr::handle_pkt_tx(TrexPkt *pkt) { for (TrexStatelessCapture *capture : m_captures) { capture->handle_pkt_tx(pkt); } diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h index 4a9efea7..bc1b88c5 100644 --- a/src/stateless/rx/trex_stateless_capture.h +++ b/src/stateless/rx/trex_stateless_capture.h @@ -121,10 +121,11 @@ public: m_pending = 0; } - void set_pkt_buffer(const TrexPktBuffer *pkt_buffer, uint32_t pending) { - m_pkt_buffer = pkt_buffer; - m_pending = pending; - m_rc = RC_OK; + void set_pkt_buffer(const TrexPktBuffer *pkt_buffer, uint32_t pending, dsec_t start_ts) { + m_pkt_buffer = pkt_buffer; + m_pending = pending; + m_start_ts = start_ts; + m_rc = RC_OK; } const TrexPktBuffer *get_pkt_buffer() const { @@ -135,9 +136,14 @@ public: return m_pending; } + dsec_t get_start_ts() const { + return m_start_ts; + } + private: const TrexPktBuffer *m_pkt_buffer; uint32_t m_pending; + dsec_t m_start_ts; }; class TrexCaptureRCRemove : public TrexCaptureRC { @@ -245,7 +251,7 @@ public: TrexStatelessCapture(capture_id_t id, uint64_t limit, const CaptureFilter &filter); - void handle_pkt_tx(const TrexPkt *pkt); + void handle_pkt_tx(TrexPkt *pkt); void handle_pkt_rx(const rte_mbuf_t *m, int port); ~TrexStatelessCapture(); @@ -274,11 +280,17 @@ public: return m_pkt_buffer->get_element_count(); } + dsec_t get_start_ts() const { + return m_start_ts; + } + private: state_e m_state; TrexPktBuffer *m_pkt_buffer; + dsec_t m_start_ts; CaptureFilter m_filter; uint64_t m_id; + uint64_t m_pkt_index; }; class TrexStatelessCaptureMngr { @@ -341,7 +353,7 @@ public: /** * handle packet from TX */ - void handle_pkt_tx(const TrexPkt *pkt); + void handle_pkt_tx(TrexPkt *pkt); /** * handle packet from RX -- cgit From d2f1c8451e2e8ffc47b208f68f9b16697d706d60 Mon Sep 17 00:00:00 2001 From: imarom Date: Sun, 22 Jan 2017 16:09:46 +0200 Subject: Trex packet capture ds Signed-off-by: imarom --- src/stateless/common/trex_stateless_pkt.cpp | 182 +++++++++++++++++++++++++ src/stateless/common/trex_stateless_pkt.h | 201 ++++++++++++++++++++++++++++ 2 files changed, 383 insertions(+) create mode 100644 src/stateless/common/trex_stateless_pkt.cpp create mode 100644 src/stateless/common/trex_stateless_pkt.h (limited to 'src') diff --git a/src/stateless/common/trex_stateless_pkt.cpp b/src/stateless/common/trex_stateless_pkt.cpp new file mode 100644 index 00000000..f7d47ec0 --- /dev/null +++ b/src/stateless/common/trex_stateless_pkt.cpp @@ -0,0 +1,182 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* + Copyright (c) 2016-2016 Cisco Systems, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include "trex_stateless_pkt.h" +#include + + +/** + * copy MBUF to a flat buffer + * + * @author imarom (12/20/2016) + * + * @param dest - buffer with at least rte_pktmbuf_pkt_len(m) + * bytes + * @param m - MBUF to copy + * + * @return uint8_t* + */ +void copy_mbuf(uint8_t *dest, const rte_mbuf_t *m) { + + int index = 0; + for (const rte_mbuf_t *it = m; it != NULL; it = it->next) { + const uint8_t *src = rte_pktmbuf_mtod(it, const uint8_t *); + memcpy(dest + index, src, it->data_len); + index += it->data_len; + } +} + +/************************************** + * TRex packet + * + *************************************/ +TrexPkt::TrexPkt(const rte_mbuf_t *m, int port, origin_e origin, uint64_t index) { + + /* allocate buffer */ + m_size = m->pkt_len; + m_raw = new uint8_t[m_size]; + + /* copy data */ + copy_mbuf(m_raw, m); + + /* generate a packet timestamp */ + m_timestamp = now_sec(); + + m_port = port; + m_origin = origin; + m_index = index; +} + +TrexPkt::TrexPkt(const TrexPkt &other) { + m_size = other.m_size; + memcpy(m_raw, other.m_raw, m_size); + + m_timestamp = other.m_timestamp; + + m_port = other.m_port; + m_origin = other.m_origin; + m_index = other.m_index; +} + +TrexPktBuffer::TrexPktBuffer(uint64_t size, mode_e mode) { + m_mode = mode; + m_buffer = nullptr; + m_head = 0; + m_tail = 0; + m_bytes = 0; + m_size = (size + 1); // for the empty/full difference 1 slot reserved + + /* generate queue */ + m_buffer = new const TrexPkt*[m_size](); // zeroed +} + +TrexPktBuffer::~TrexPktBuffer() { + assert(m_buffer); + + while (!is_empty()) { + const TrexPkt *pkt = pop(); + delete pkt; + } + delete [] m_buffer; +} + +/** + * packet will be copied to an internal object + */ +void +TrexPktBuffer::push(const rte_mbuf_t *m, int port, TrexPkt::origin_e origin, uint64_t pkt_index) { + + /* if full - decide by the policy */ + if (is_full()) { + if (m_mode == MODE_DROP_HEAD) { + delete pop(); + } else { + /* drop the tail (current packet) */ + return; + } + } + + /* push packet */ + m_buffer[m_head] = new TrexPkt(m, port, origin, pkt_index); + m_bytes += m_buffer[m_head]->get_size(); + + m_head = next(m_head); + +} + +/** + * packet will be handled internally + */ +void +TrexPktBuffer::push(const TrexPkt *pkt) { + /* if full - decide by the policy */ + if (is_full()) { + if (m_mode == MODE_DROP_HEAD) { + delete pop(); + } else { + /* drop the tail (current packet) */ + delete pkt; + return; + } + } + + /* push packet */ + m_buffer[m_head] = pkt; + m_head = next(m_head); +} + +const TrexPkt * +TrexPktBuffer::pop() { + assert(!is_empty()); + + const TrexPkt *pkt = m_buffer[m_tail]; + m_tail = next(m_tail); + + m_bytes -= pkt->get_size(); + + return pkt; +} + +uint32_t +TrexPktBuffer::get_element_count() const { + if (m_head >= m_tail) { + return (m_head - m_tail); + } else { + return ( get_capacity() - (m_tail - m_head - 1) ); + } +} + +Json::Value +TrexPktBuffer::to_json() const { + + Json::Value output = Json::arrayValue; + + int tmp = m_tail; + while (tmp != m_head) { + const TrexPkt *pkt = m_buffer[tmp]; + output.append(pkt->to_json()); + tmp = next(tmp); + } + + return output; +} + + diff --git a/src/stateless/common/trex_stateless_pkt.h b/src/stateless/common/trex_stateless_pkt.h new file mode 100644 index 00000000..1b6bd2f8 --- /dev/null +++ b/src/stateless/common/trex_stateless_pkt.h @@ -0,0 +1,201 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* + Copyright (c) 2016-2016 Cisco Systems, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef __TREX_STATELESS_PKT_H__ +#define __TREX_STATELESS_PKT_H__ + +#include +#include +#include "mbuf.h" +#include "common/base64.h" +#include "os_time.h" + + +/** + * copies MBUF to a flat buffer + * + * @author imarom (1/1/2017) + * + * @param dest + * @param m + */ +void copy_mbuf(uint8_t *dest, const rte_mbuf_t *m); + +/** + * describes a single saved packet + * + */ +class TrexPkt { +public: + + enum origin_e { + ORIGIN_NONE = 1, + ORIGIN_TX, + ORIGIN_RX + }; + + TrexPkt(const rte_mbuf_t *m, int port = -1, origin_e origin = ORIGIN_NONE, uint64_t index = 0); + TrexPkt(const TrexPkt &other); + + void set_index(uint64_t index) { + m_index = index; + } + + /* slow path and also RVO - pass by value is ok */ + Json::Value to_json() const { + Json::Value output; + output["ts"] = m_timestamp; + output["binary"] = base64_encode(m_raw, m_size); + output["port"] = m_port; + output["index"] = Json::UInt64(m_index); + + switch (m_origin) { + case ORIGIN_TX: + output["origin"] = "TX"; + break; + case ORIGIN_RX: + output["origin"] = "RX"; + break; + default: + output["origin"] = "NONE"; + break; + } + + return output; + } + + ~TrexPkt() { + if (m_raw) { + delete [] m_raw; + } + } + + origin_e get_origin() const { + return m_origin; + } + + int get_port() const { + return m_port; + } + + uint16_t get_size() const { + return m_size; + } + + dsec_t get_ts() const { + return m_timestamp; + } + +private: + + uint8_t *m_raw; + uint16_t m_size; + dsec_t m_timestamp; + origin_e m_origin; + int m_port; + uint64_t m_index; +}; + + +class TrexPktBuffer { +public: + + /** + * two modes for operations: + * + * MODE_DROP_HEAD - when the buffer is full, packets will be + * dropped from the head (the oldest packet) + * + * MODE_DROP_TAIL - when the buffer is full, packets will be + * dropped from the tail (the current packet) + */ + enum mode_e { + MODE_DROP_HEAD = 1, + MODE_DROP_TAIL = 2, + }; + + TrexPktBuffer(uint64_t size, mode_e mode = MODE_DROP_TAIL); + ~TrexPktBuffer(); + + /** + * push a packet to the buffer + * + */ + void push(const rte_mbuf_t *m, int port = -1, TrexPkt::origin_e origin = TrexPkt::ORIGIN_NONE, uint64_t pkt_index = 0); + void push(const TrexPkt *pkt); + + /** + * pops a packet from the buffer + * usually for internal usage + */ + const TrexPkt * pop(); + + /** + * generate a JSON output of the queue + * + */ + Json::Value to_json() const; + + + bool is_empty() const { + return (m_head == m_tail); + } + + bool is_full() const { + return ( next(m_head) == m_tail); + } + + /** + * return the total amount of space possible + */ + uint32_t get_capacity() const { + /* one slot is used for diff between full/empty */ + return (m_size - 1); + } + + mode_e get_mode() const { + return m_mode; + } + + /** + * returns how many elements are in the queue + */ + uint32_t get_element_count() const; + + uint32_t get_bytes() const { + return m_bytes; + } + +private: + int next(int v) const { + return ( (v + 1) % m_size ); + } + + mode_e m_mode; + int m_head; + int m_tail; + int m_size; + uint32_t m_bytes; + const TrexPkt **m_buffer; +}; + + +#endif /* __TREX_STATELESS_PKT_H__*/ -- cgit From 2fd0f893a70649cfb71708c367724c5ba1ca0125 Mon Sep 17 00:00:00 2001 From: imarom Date: Sun, 22 Jan 2017 16:21:18 +0200 Subject: removed a non needed header include Signed-off-by: imarom --- src/stateless/rx/trex_stateless_capture.h | 1 - 1 file changed, 1 deletion(-) (limited to 'src') diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h index bc1b88c5..0f98fd95 100644 --- a/src/stateless/rx/trex_stateless_capture.h +++ b/src/stateless/rx/trex_stateless_capture.h @@ -25,7 +25,6 @@ limitations under the License. #include #include "trex_stateless_pkt.h" -#include "trex_stateless_capture_msg.h" typedef int32_t capture_id_t; -- cgit From 418fd3d0a7169f2d8934e8be82d11e1a388d681c Mon Sep 17 00:00:00 2001 From: imarom Date: Mon, 23 Jan 2017 16:45:01 +0200 Subject: service mode is now by message to RX core Signed-off-by: imarom --- src/stateless/cp/trex_stateless_port.cpp | 53 ++++++++++++--- src/stateless/cp/trex_stateless_port.h | 77 ++++------------------ .../messaging/trex_stateless_messaging.cpp | 37 +++++++++++ src/stateless/messaging/trex_stateless_messaging.h | 38 +++++++++++ src/stateless/rx/trex_stateless_rx_port_mngr.h | 11 ++-- 5 files changed, 137 insertions(+), 79 deletions(-) (limited to 'src') diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index 9cf048b0..bfc7dce4 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -162,13 +162,14 @@ private: * trex stateless port * **************************/ -TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this), m_service_mode(port_id, api) { +TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this) { std::vector> core_pair_list; - m_port_id = port_id; - m_port_state = PORT_STATE_IDLE; - m_platform_api = api; - + m_port_id = port_id; + m_port_state = PORT_STATE_IDLE; + m_platform_api = api; + m_is_service_mode_on = false; + /* get the platform specific data */ api->get_interface_info(port_id, m_api_info); @@ -948,6 +949,44 @@ TrexStatelessPort::remove_and_delete_all_streams() { } } +/** + * enable/disable service mode + * sends a query to the RX core + * + */ +void +TrexStatelessPort::set_service_mode(bool enabled) { + static MsgReply reply; + reply.reset(); + + TrexStatelessRxQuery::query_type_e query_type = (enabled ? TrexStatelessRxQuery::SERVICE_MODE_ON : TrexStatelessRxQuery::SERVICE_MODE_OFF); + + TrexStatelessRxQuery *msg = new TrexStatelessRxQuery(m_port_id, query_type, reply); + send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg ); + + TrexStatelessRxQuery::query_rc_e rc = reply.wait_for_reply(); + + switch (rc) { + case TrexStatelessRxQuery::RC_OK: + if (enabled) { + getPortAttrObj()->set_rx_filter_mode(RX_FILTER_MODE_ALL); + } else { + getPortAttrObj()->set_rx_filter_mode(RX_FILTER_MODE_HW); + } + m_is_service_mode_on = enabled; + return; + + case TrexStatelessRxQuery::RC_FAIL_RX_QUEUE_ACTIVE: + throw TrexException("unable to disable service mode - please remove RX queue"); + + case TrexStatelessRxQuery::RC_FAIL_CAPTURE_ACTIVE: + throw TrexException("unable to disable service mode - an active capture on port " + std::to_string(m_port_id) + " exists"); + + default: + assert(0); + } +} + void TrexStatelessPort::start_rx_queue(uint64_t size) { @@ -962,16 +1001,12 @@ TrexStatelessPort::start_rx_queue(uint64_t size) { this might cause the user to lose some packets from the queue */ reply.wait_for_reply(); - - m_service_mode.set_rx_queue(); } void TrexStatelessPort::stop_rx_queue() { TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopQueue(m_port_id); send_message_to_rx(msg); - - m_service_mode.unset_rx_queue(); } diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index 0ef8ae60..4b8ea3d9 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -115,57 +115,6 @@ private: static const std::string g_unowned_handler; }; -/** - * enforces in/out from service mode - * - * @author imarom (1/4/2017) - */ -class TrexServiceMode { -public: - TrexServiceMode(uint8_t port_id, const TrexPlatformApi *api) { - m_is_enabled = false; - m_has_rx_queue = false; - m_port_id = port_id; - m_port_attr = api->getPortAttrObj(port_id); - } - - void enable() { - m_port_attr->set_rx_filter_mode(RX_FILTER_MODE_ALL); - m_is_enabled = true; - } - - void disable() { - if (m_has_rx_queue) { - throw TrexException("unable to disable service mode - please remove RX queue"); - } - - if (TrexStatelessCaptureMngr::getInstance().is_active(m_port_id)) { - throw TrexException("unable to disable service mode - an active capture on port " + std::to_string(m_port_id) + " exists"); - } - - m_port_attr->set_rx_filter_mode(RX_FILTER_MODE_HW); - m_is_enabled = false; - } - - bool is_enabled() const { - return m_is_enabled; - } - - void set_rx_queue() { - m_has_rx_queue = true; - } - - void unset_rx_queue() { - m_has_rx_queue = false; - } - -private: - bool m_is_enabled; - bool m_has_rx_queue; - TRexPortAttr *m_port_attr; - uint8_t m_port_id; -}; - class AsyncStopEvent; /** @@ -287,20 +236,20 @@ public: double duration, bool is_dual); - /** - * moves port to / out service mode + + /** + * sets service mode + * + * @author imarom (1/22/2017) + * + * @param enabled */ - void set_service_mode(bool enabled) { - if (enabled) { - m_service_mode.enable(); - } else { - m_service_mode.disable(); - } - } + void set_service_mode(bool enabled); + bool is_service_mode_on() const { - return m_service_mode.is_enabled(); + return m_is_service_mode_on; } - + /** * get the port state * @@ -578,8 +527,8 @@ private: int m_pending_async_stop_event; - TrexServiceMode m_service_mode; - + bool m_is_service_mode_on; + static const uint32_t MAX_STREAMS = 20000; }; diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index 2452487c..21fe7a13 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -381,3 +381,40 @@ TrexStatelessRxSetL3Mode::handle(CRxCoreStateless *rx_core) { return true; } +bool +TrexStatelessRxQuery::handle(CRxCoreStateless *rx_core) { + + query_rc_e rc = RC_OK; + + switch (m_query_type) { + + case SERVICE_MODE_ON: + /* for service mode on - always allow this */ + rc = RC_OK; + break; + + case SERVICE_MODE_OFF: + /* cannot leave service mode when RX queue is active */ + if (rx_core->get_rx_port_mngr(m_port_id).is_feature_set(RXPortManager::QUEUE)) { + rc = RC_FAIL_RX_QUEUE_ACTIVE; + break; + } + + /* cannot leave service mode if PCAP capturing is active */ + if (TrexStatelessCaptureMngr::getInstance().is_active(m_port_id)) { + rc = RC_FAIL_CAPTURE_ACTIVE; + break; + } + + break; + + default: + assert(0); + break; + + } + + m_reply.set_reply(rc); + + return true; +} diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index 3535ad4f..ed14b100 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -677,4 +677,42 @@ private: MsgReply &m_reply; }; + +class TrexStatelessRxQuery : public TrexStatelessCpToRxMsgBase { +public: + + /** + * query type to request + */ + enum query_type_e { + SERVICE_MODE_ON, + SERVICE_MODE_OFF, + }; + + /** + * RC types for queries + */ + enum query_rc_e { + RC_OK, + RC_FAIL_RX_QUEUE_ACTIVE, + RC_FAIL_CAPTURE_ACTIVE, + }; + + TrexStatelessRxQuery(uint8_t port_id, query_type_e query_type, MsgReply &reply) : m_reply(reply) { + m_port_id = port_id; + m_query_type = query_type; + } + + /** + * virtual function to handle a message + * + */ + virtual bool handle(CRxCoreStateless *rx_core); + +private: + uint8_t m_port_id; + query_type_e m_query_type; + MsgReply &m_reply; +}; + #endif /* __TREX_STATELESS_MESSAGING_H__ */ diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h index 0cc60716..b318d973 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.h +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h @@ -305,11 +305,14 @@ public: return (m_features != NO_FEATURES); } - bool no_features_set() { return (!has_features_set()); } + bool is_feature_set(feature_t feature) const { + return ( (m_features & feature) == feature ); + } + /** * returns ignored set of stats * (grat ARP, PING response and etc.) @@ -334,11 +337,7 @@ private: void unset_feature(feature_t feature) { m_features &= (~feature); } - - bool is_feature_set(feature_t feature) const { - return ( (m_features & feature) == feature ); - } - + uint32_t m_features; uint8_t m_port_id; RXLatency m_latency; -- cgit From 19df06349d311377ca1ef10f91ef1f786b41418b Mon Sep 17 00:00:00 2001 From: imarom Date: Tue, 24 Jan 2017 14:11:32 +0200 Subject: code review cleanups - C++ Signed-off-by: imarom --- src/rpc-server/commands/trex_rpc_cmd_general.cpp | 7 ++- src/stateless/common/trex_stateless_pkt.cpp | 18 ++++++-- src/stateless/common/trex_stateless_pkt.h | 58 +++++++++++++++++++----- src/stateless/rx/trex_stateless_capture.cpp | 8 ++-- src/stateless/rx/trex_stateless_capture.h | 15 ++++-- src/stateless/rx/trex_stateless_rx_port_mngr.cpp | 2 +- 6 files changed, 82 insertions(+), 26 deletions(-) (limited to 'src') diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index 6f0ab09a..c20c77d4 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -901,12 +901,16 @@ TrexRpcCmdCapture::parse_cmd_start(const Json::Value ¶ms, Json::Value &resul /* populate the filter */ for (int i = 0; i < tx_json.size(); i++) { uint8_t tx_port = parse_byte(tx_json, i, result); + validate_port_id(tx_port, result); + filter.add_tx(tx_port); ports.insert(tx_port); } for (int i = 0; i < rx_json.size(); i++) { uint8_t rx_port = parse_byte(rx_json, i, result); + validate_port_id(rx_port, result); + filter.add_rx(rx_port); ports.insert(rx_port); } @@ -922,6 +926,7 @@ TrexRpcCmdCapture::parse_cmd_start(const Json::Value ¶ms, Json::Value &resul static MsgReply reply; reply.reset(); + /* send a start message to RX core */ TrexStatelessRxCaptureStart *start_msg = new TrexStatelessRxCaptureStart(filter, limit, reply); get_stateless_obj()->send_msg_to_rx(start_msg); @@ -931,7 +936,7 @@ TrexRpcCmdCapture::parse_cmd_start(const Json::Value ¶ms, Json::Value &resul } result["result"]["capture_id"] = rc.get_new_id(); - result["result"]["ts"] = now_sec(); + result["result"]["start_ts"] = rc.get_start_ts(); } /** diff --git a/src/stateless/common/trex_stateless_pkt.cpp b/src/stateless/common/trex_stateless_pkt.cpp index f7d47ec0..14c14462 100644 --- a/src/stateless/common/trex_stateless_pkt.cpp +++ b/src/stateless/common/trex_stateless_pkt.cpp @@ -34,7 +34,7 @@ * * @return uint8_t* */ -void copy_mbuf(uint8_t *dest, const rte_mbuf_t *m) { +void mbuf_to_buffer(uint8_t *dest, const rte_mbuf_t *m) { int index = 0; for (const rte_mbuf_t *it = m; it != NULL; it = it->next) { @@ -55,7 +55,7 @@ TrexPkt::TrexPkt(const rte_mbuf_t *m, int port, origin_e origin, uint64_t index) m_raw = new uint8_t[m_size]; /* copy data */ - copy_mbuf(m_raw, m); + mbuf_to_buffer(m_raw, m); /* generate a packet timestamp */ m_timestamp = now_sec(); @@ -76,6 +76,12 @@ TrexPkt::TrexPkt(const TrexPkt &other) { m_index = other.m_index; } + +/************************************** + * TRex packet buffer + * + *************************************/ + TrexPktBuffer::TrexPktBuffer(uint64_t size, mode_e mode) { m_mode = mode; m_buffer = nullptr; @@ -117,13 +123,14 @@ TrexPktBuffer::push(const rte_mbuf_t *m, int port, TrexPkt::origin_e origin, uin /* push packet */ m_buffer[m_head] = new TrexPkt(m, port, origin, pkt_index); m_bytes += m_buffer[m_head]->get_size(); - - m_head = next(m_head); + /* advance */ + m_head = next(m_head); } /** * packet will be handled internally + * packet pointer is invalid after this call */ void TrexPktBuffer::push(const TrexPkt *pkt) { @@ -140,6 +147,8 @@ TrexPktBuffer::push(const TrexPkt *pkt) { /* push packet */ m_buffer[m_head] = pkt; + m_bytes += pkt->get_size(); + m_head = next(m_head); } @@ -179,4 +188,3 @@ TrexPktBuffer::to_json() const { return output; } - diff --git a/src/stateless/common/trex_stateless_pkt.h b/src/stateless/common/trex_stateless_pkt.h index 1b6bd2f8..573f4950 100644 --- a/src/stateless/common/trex_stateless_pkt.h +++ b/src/stateless/common/trex_stateless_pkt.h @@ -32,33 +32,45 @@ /** * copies MBUF to a flat buffer * - * @author imarom (1/1/2017) - * - * @param dest - * @param m */ -void copy_mbuf(uint8_t *dest, const rte_mbuf_t *m); +void mbuf_to_buffer(uint8_t *dest, const rte_mbuf_t *m); -/** - * describes a single saved packet +/************************************** + * TRex packet * - */ + *************************************/ class TrexPkt { public: + /** + * origin of the created packet + */ enum origin_e { ORIGIN_NONE = 1, ORIGIN_TX, ORIGIN_RX }; + /** + * generate a packet from MBUF + */ TrexPkt(const rte_mbuf_t *m, int port = -1, origin_e origin = ORIGIN_NONE, uint64_t index = 0); + + /** + * duplicate an existing packet + */ TrexPkt(const TrexPkt &other); + + /** + * sets a packet index + * used by a buffer of packets + */ void set_index(uint64_t index) { m_index = index; } + /* slow path and also RVO - pass by value is ok */ Json::Value to_json() const { Json::Value output; @@ -115,6 +127,10 @@ private: }; +/************************************** + * TRex packet buffer + * + *************************************/ class TrexPktBuffer { public: @@ -136,10 +152,23 @@ public: ~TrexPktBuffer(); /** - * push a packet to the buffer - * + * push a packet to the buffer + * packet will be generated from a MBUF + * + */ + void push(const rte_mbuf_t *m, + int port = -1, + TrexPkt::origin_e origin = TrexPkt::ORIGIN_NONE, + uint64_t pkt_index = 0); + + /** + * push an existing packet structure + * packet will *not* be duplicated + * + * after calling this function + * the packet is no longer usable + * from caller prespective */ - void push(const rte_mbuf_t *m, int port = -1, TrexPkt::origin_e origin = TrexPkt::ORIGIN_NONE, uint64_t pkt_index = 0); void push(const TrexPkt *pkt); /** @@ -171,6 +200,10 @@ public: return (m_size - 1); } + /** + * see mode_e + * + */ mode_e get_mode() const { return m_mode; } @@ -180,6 +213,9 @@ public: */ uint32_t get_element_count() const; + /** + * current bytes holded by the buffer + */ uint32_t get_bytes() const { return m_bytes; } diff --git a/src/stateless/rx/trex_stateless_capture.cpp b/src/stateless/rx/trex_stateless_capture.cpp index f0d4e806..7b020444 100644 --- a/src/stateless/rx/trex_stateless_capture.cpp +++ b/src/stateless/rx/trex_stateless_capture.cpp @@ -157,14 +157,14 @@ TrexStatelessCaptureMngr::start(const CaptureFilter &filter, uint64_t limit, Tre int new_id = m_id_counter++; - TrexStatelessCapture *new_buffer = new TrexStatelessCapture(new_id, limit, filter); - m_captures.push_back(new_buffer); + TrexStatelessCapture *new_capture = new TrexStatelessCapture(new_id, limit, filter); + m_captures.push_back(new_capture); /* update global filter */ update_global_filter(); /* result */ - rc.set_new_id(new_id); + rc.set_rc(new_id, new_capture->get_start_ts()); } void @@ -176,7 +176,7 @@ TrexStatelessCaptureMngr::stop(capture_id_t capture_id, TrexCaptureRCStop &rc) { } capture->stop(); - rc.set_count(capture->get_pkt_count()); + rc.set_rc(capture->get_pkt_count()); } void diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h index 0f98fd95..852aee2a 100644 --- a/src/stateless/rx/trex_stateless_capture.h +++ b/src/stateless/rx/trex_stateless_capture.h @@ -83,23 +83,30 @@ public: class TrexCaptureRCStart : public TrexCaptureRC { public: - void set_new_id(capture_id_t new_id) { - m_capture_id = new_id; - m_rc = RC_OK; + void set_rc(capture_id_t new_id, dsec_t start_ts) { + m_capture_id = new_id; + m_start_ts = start_ts; + m_rc = RC_OK; + } capture_id_t get_new_id() const { return m_capture_id; } + dsec_t get_start_ts() const { + return m_start_ts; + } + private: capture_id_t m_capture_id; + dsec_t m_start_ts; }; class TrexCaptureRCStop : public TrexCaptureRC { public: - void set_count(uint32_t pkt_count) { + void set_rc(uint32_t pkt_count) { m_pkt_count = pkt_count; m_rc = RC_OK; } diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp index ede86062..b01665ec 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp @@ -492,7 +492,7 @@ RXServer::duplicate_mbuf(const rte_mbuf_t *m) { } /* copy data */ - copy_mbuf(dest, m); + mbuf_to_buffer(dest, m); return clone_mbuf; } -- cgit From 3689edf311778c8cb921db61f293db6cd43a9b14 Mon Sep 17 00:00:00 2001 From: imarom Date: Wed, 25 Jan 2017 13:54:51 +0200 Subject: capture - personal code review Signed-off-by: imarom --- src/rpc-server/commands/trex_rpc_cmd_general.cpp | 8 +- src/stateless/common/trex_stateless_pkt.cpp | 43 +++- src/stateless/common/trex_stateless_pkt.h | 23 ++- .../messaging/trex_stateless_messaging.cpp | 4 +- src/stateless/messaging/trex_stateless_messaging.h | 3 + src/stateless/rx/trex_stateless_capture.cpp | 98 ++++++--- src/stateless/rx/trex_stateless_capture.h | 229 +++++++-------------- src/stateless/rx/trex_stateless_capture_rc.h | 195 ++++++++++++++++++ 8 files changed, 393 insertions(+), 210 deletions(-) create mode 100644 src/stateless/rx/trex_stateless_capture_rc.h (limited to 'src') diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index c20c77d4..54798abb 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -892,6 +892,12 @@ void TrexRpcCmdCapture::parse_cmd_start(const Json::Value ¶ms, Json::Value &result) { uint32_t limit = parse_uint32(params, "limit", result); + + /* parse mode type */ + const std::string mode_str = parse_choice(params, "mode", {"fixed", "cyclic"}, result); + TrexPktBuffer::mode_e mode = ( (mode_str == "fixed") ? TrexPktBuffer::MODE_DROP_TAIL : TrexPktBuffer::MODE_DROP_HEAD); + + /* parse filters */ const Json::Value &tx_json = parse_array(params, "tx", result); const Json::Value &rx_json = parse_array(params, "rx", result); CaptureFilter filter; @@ -927,7 +933,7 @@ TrexRpcCmdCapture::parse_cmd_start(const Json::Value ¶ms, Json::Value &resul reply.reset(); /* send a start message to RX core */ - TrexStatelessRxCaptureStart *start_msg = new TrexStatelessRxCaptureStart(filter, limit, reply); + TrexStatelessRxCaptureStart *start_msg = new TrexStatelessRxCaptureStart(filter, limit, mode, reply); get_stateless_obj()->send_msg_to_rx(start_msg); TrexCaptureRCStart rc = reply.wait_for_reply(); diff --git a/src/stateless/common/trex_stateless_pkt.cpp b/src/stateless/common/trex_stateless_pkt.cpp index 14c14462..43cbbe1c 100644 --- a/src/stateless/common/trex_stateless_pkt.cpp +++ b/src/stateless/common/trex_stateless_pkt.cpp @@ -120,12 +120,7 @@ TrexPktBuffer::push(const rte_mbuf_t *m, int port, TrexPkt::origin_e origin, uin } } - /* push packet */ - m_buffer[m_head] = new TrexPkt(m, port, origin, pkt_index); - m_bytes += m_buffer[m_head]->get_size(); - - /* advance */ - m_head = next(m_head); + push_internal(new TrexPkt(m, port, origin, pkt_index)); } /** @@ -133,19 +128,32 @@ TrexPktBuffer::push(const rte_mbuf_t *m, int port, TrexPkt::origin_e origin, uin * packet pointer is invalid after this call */ void -TrexPktBuffer::push(const TrexPkt *pkt) { +TrexPktBuffer::push(const TrexPkt *pkt, uint64_t pkt_index) { /* if full - decide by the policy */ if (is_full()) { if (m_mode == MODE_DROP_HEAD) { delete pop(); } else { /* drop the tail (current packet) */ - delete pkt; return; } } - /* push packet */ + /* duplicate packet */ + TrexPkt *dup = new TrexPkt(*pkt); + + /* update packet index if given */ + if (pkt_index != 0) { + dup->set_index(pkt_index); + } + + push_internal(dup); +} + + +void +TrexPktBuffer::push_internal(const TrexPkt *pkt) { + /* push the packet */ m_buffer[m_head] = pkt; m_bytes += pkt->get_size(); @@ -188,3 +196,20 @@ TrexPktBuffer::to_json() const { return output; } +TrexPktBuffer * +TrexPktBuffer::pop_n(uint32_t count) { + /* can't pop more than total */ + assert(count <= get_element_count()); + + // TODO: consider returning NULL if no packets exists + // to avoid mallocing + + TrexPktBuffer *partial = new TrexPktBuffer(count); + + for (int i = 0; i < count; i++) { + const TrexPkt *pkt = pop(); + partial->push_internal(pkt); + } + + return partial; +} diff --git a/src/stateless/common/trex_stateless_pkt.h b/src/stateless/common/trex_stateless_pkt.h index 573f4950..f44355dc 100644 --- a/src/stateless/common/trex_stateless_pkt.h +++ b/src/stateless/common/trex_stateless_pkt.h @@ -70,6 +70,9 @@ public: m_index = index; } + uint64_t get_index() const { + return m_index; + } /* slow path and also RVO - pass by value is ok */ Json::Value to_json() const { @@ -163,13 +166,10 @@ public: /** * push an existing packet structure - * packet will *not* be duplicated - * - * after calling this function - * the packet is no longer usable - * from caller prespective + * packet will be duplicated + * if pkt_index is non zero - it will be updated */ - void push(const TrexPkt *pkt); + void push(const TrexPkt *pkt, uint64_t pkt_index = 0); /** * pops a packet from the buffer @@ -177,6 +177,15 @@ public: */ const TrexPkt * pop(); + /** + * pops N packets from the buffer + * N must be <= get_element_count() + * + * returns a new buffer + */ + TrexPktBuffer * pop_n(uint32_t count); + + /** * generate a JSON output of the queue * @@ -225,6 +234,8 @@ private: return ( (v + 1) % m_size ); } + void push_internal(const TrexPkt *pkt); + mode_e m_mode; int m_head; int m_tail; diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index 21fe7a13..f89ca343 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -266,7 +266,7 @@ TrexStatelessRxCaptureStart::handle(CRxCoreStateless *rx_core) { TrexCaptureRCStart start_rc; - TrexStatelessCaptureMngr::getInstance().start(m_filter, m_limit, start_rc); + TrexStatelessCaptureMngr::getInstance().start(m_filter, m_limit, m_mode, start_rc); /* mark as done */ m_reply.set_reply(start_rc); @@ -305,7 +305,7 @@ TrexStatelessRxCaptureStatus::handle(CRxCoreStateless *rx_core) { TrexCaptureRCStatus status_rc; - status_rc.set_status(TrexStatelessCaptureMngr::getInstance().to_json()); + status_rc.set_rc(TrexStatelessCaptureMngr::getInstance().to_json()); /* mark as done */ m_reply.set_reply(status_rc); diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index ed14b100..cd79d6e7 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -494,10 +494,12 @@ class TrexStatelessRxCaptureStart : public TrexStatelessRxCapture { public: TrexStatelessRxCaptureStart(const CaptureFilter& filter, uint64_t limit, + TrexPktBuffer::mode_e mode, MsgReply &reply) : m_reply(reply) { m_limit = limit; m_filter = filter; + m_mode = mode; } virtual bool handle(CRxCoreStateless *rx_core); @@ -506,6 +508,7 @@ private: uint8_t m_port_id; uint64_t m_limit; CaptureFilter m_filter; + TrexPktBuffer::mode_e m_mode; MsgReply &m_reply; }; diff --git a/src/stateless/rx/trex_stateless_capture.cpp b/src/stateless/rx/trex_stateless_capture.cpp index 7b020444..bf7623d5 100644 --- a/src/stateless/rx/trex_stateless_capture.cpp +++ b/src/stateless/rx/trex_stateless_capture.cpp @@ -21,9 +21,17 @@ limitations under the License. #include "trex_stateless_capture.h" #include "trex_exception.h" -TrexStatelessCapture::TrexStatelessCapture(capture_id_t id, uint64_t limit, const CaptureFilter &filter) { +/************************************** + * Capture + * + * A single instance of a capture + *************************************/ +TrexStatelessCapture::TrexStatelessCapture(capture_id_t id, + uint64_t limit, + const CaptureFilter &filter, + TrexPktBuffer::mode_e mode) { m_id = id; - m_pkt_buffer = new TrexPktBuffer(limit, TrexPktBuffer::MODE_DROP_TAIL); + m_pkt_buffer = new TrexPktBuffer(limit, mode); m_filter = filter; m_state = STATE_ACTIVE; m_start_ts = now_sec(); @@ -37,26 +45,22 @@ TrexStatelessCapture::~TrexStatelessCapture() { } void -TrexStatelessCapture::handle_pkt_tx(TrexPkt *pkt) { +TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) { if (m_state != STATE_ACTIVE) { - delete pkt; return; } /* if not in filter - back off */ if (!m_filter.in_filter(pkt)) { - delete pkt; return; } if (pkt->get_ts() < m_start_ts) { - delete pkt; return; } - pkt->set_index(++m_pkt_index); - m_pkt_buffer->push(pkt); + m_pkt_buffer->push(pkt, ++m_pkt_index); } void @@ -100,9 +104,13 @@ TrexStatelessCapture::to_json() const { return output; } +/** + * fetch up to 'pkt_limit' from the capture + * + */ TrexPktBuffer * TrexStatelessCapture::fetch(uint32_t pkt_limit, uint32_t &pending) { - + /* if the total sum of packets is within the limit range - take it */ if (m_pkt_buffer->get_element_count() <= pkt_limit) { TrexPktBuffer *current = m_pkt_buffer; @@ -111,22 +119,29 @@ TrexStatelessCapture::fetch(uint32_t pkt_limit, uint32_t &pending) { return current; } - /* harder part - partial fetch */ - TrexPktBuffer *partial = new TrexPktBuffer(pkt_limit); - for (int i = 0; i < pkt_limit; i++) { - const TrexPkt *pkt = m_pkt_buffer->pop(); - partial->push(pkt); - } - + /* partial fetch - take a partial list */ + TrexPktBuffer *partial = m_pkt_buffer->pop_n(pkt_limit); pending = m_pkt_buffer->get_element_count(); return partial; } + +/************************************** + * Capture Manager + * handles all the captures + * in the system + *************************************/ + +/** + * holds the global filter in the capture manager + * which ports in the entire system are monitored + */ void TrexStatelessCaptureMngr::update_global_filter() { CaptureFilter new_filter; + /* recalculates the global filter */ for (TrexStatelessCapture *capture : m_captures) { new_filter += capture->get_filter(); } @@ -134,6 +149,10 @@ TrexStatelessCaptureMngr::update_global_filter() { m_global_filter = new_filter; } + +/** + * lookup a specific capture by ID + */ TrexStatelessCapture * TrexStatelessCaptureMngr::lookup(capture_id_t capture_id) { @@ -147,17 +166,37 @@ TrexStatelessCaptureMngr::lookup(capture_id_t capture_id) { return nullptr; } + +int +TrexStatelessCaptureMngr::lookup_index(capture_id_t capture_id) { + for (int i = 0; i < m_captures.size(); i++) { + if (m_captures[i]->get_id() == capture_id) { + return i; + } + } + return -1; +} + + +/** + * starts a new capture + * + */ void -TrexStatelessCaptureMngr::start(const CaptureFilter &filter, uint64_t limit, TrexCaptureRCStart &rc) { +TrexStatelessCaptureMngr::start(const CaptureFilter &filter, + uint64_t limit, + TrexPktBuffer::mode_e mode, + TrexCaptureRCStart &rc) { - if (m_captures.size() > MAX_CAPTURE_SIZE) { + /* check for maximum active captures */ + if (m_captures.size() >= MAX_CAPTURE_SIZE) { rc.set_err(TrexCaptureRC::RC_CAPTURE_LIMIT_REACHED); return; } - + /* create a new capture*/ int new_id = m_id_counter++; - TrexStatelessCapture *new_capture = new TrexStatelessCapture(new_id, limit, filter); + TrexStatelessCapture *new_capture = new TrexStatelessCapture(new_id, limit, filter, mode); m_captures.push_back(new_capture); /* update global filter */ @@ -179,6 +218,7 @@ TrexStatelessCaptureMngr::stop(capture_id_t capture_id, TrexCaptureRCStop &rc) { rc.set_rc(capture->get_pkt_count()); } + void TrexStatelessCaptureMngr::fetch(capture_id_t capture_id, uint32_t pkt_limit, TrexCaptureRCFetch &rc) { TrexStatelessCapture *capture = lookup(capture_id); @@ -190,21 +230,14 @@ TrexStatelessCaptureMngr::fetch(capture_id_t capture_id, uint32_t pkt_limit, Tre uint32_t pending = 0; TrexPktBuffer *pkt_buffer = capture->fetch(pkt_limit, pending); - rc.set_pkt_buffer(pkt_buffer, pending, capture->get_start_ts()); + rc.set_rc(pkt_buffer, pending, capture->get_start_ts()); } void TrexStatelessCaptureMngr::remove(capture_id_t capture_id, TrexCaptureRCRemove &rc) { - - int index = -1; - for (int i = 0; i < m_captures.size(); i++) { - if (m_captures[i]->get_id() == capture_id) { - index = i; - break; - } - } - /* does not exist */ + /* lookup index */ + int index = lookup_index(capture_id); if (index == -1) { rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND); return; @@ -219,7 +252,7 @@ TrexStatelessCaptureMngr::remove(capture_id_t capture_id, TrexCaptureRCRemove &r /* update global filter */ update_global_filter(); - rc.set_ok(); + rc.set_rc(); } void @@ -228,11 +261,12 @@ TrexStatelessCaptureMngr::reset() { while (m_captures.size() > 0) { remove(m_captures[0]->get_id(), dummy); + assert(!!dummy); } } void -TrexStatelessCaptureMngr::handle_pkt_tx(TrexPkt *pkt) { +TrexStatelessCaptureMngr::handle_pkt_tx_slow_path(const TrexPkt *pkt) { for (TrexStatelessCapture *capture : m_captures) { capture->handle_pkt_tx(pkt); } diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h index 852aee2a..e4a2e632 100644 --- a/src/stateless/rx/trex_stateless_capture.h +++ b/src/stateless/rx/trex_stateless_capture.h @@ -25,160 +25,14 @@ limitations under the License. #include #include "trex_stateless_pkt.h" +#include "trex_stateless_capture_rc.h" -typedef int32_t capture_id_t; -class TrexCaptureRC { -public: - - TrexCaptureRC() { - m_rc = RC_INVALID; - m_pkt_buffer = NULL; - } - - enum rc_e { - RC_INVALID = 0, - RC_OK = 1, - RC_CAPTURE_NOT_FOUND, - RC_CAPTURE_LIMIT_REACHED, - RC_CAPTURE_FETCH_UNDER_ACTIVE - }; - - bool operator !() const { - return (m_rc != RC_OK); - } - - std::string get_err() const { - assert(m_rc != RC_INVALID); - - switch (m_rc) { - case RC_OK: - return ""; - case RC_CAPTURE_LIMIT_REACHED: - return "capture limit has reached"; - case RC_CAPTURE_NOT_FOUND: - return "capture ID not found"; - case RC_CAPTURE_FETCH_UNDER_ACTIVE: - return "fetch command cannot be executed on an active capture"; - default: - assert(0); - } - } - - void set_err(rc_e rc) { - m_rc = rc; - } - - Json::Value get_json() const { - return m_json_rc; - } - -public: - rc_e m_rc; - capture_id_t m_capture_id; - TrexPktBuffer *m_pkt_buffer; - Json::Value m_json_rc; -}; - -class TrexCaptureRCStart : public TrexCaptureRC { -public: - - void set_rc(capture_id_t new_id, dsec_t start_ts) { - m_capture_id = new_id; - m_start_ts = start_ts; - m_rc = RC_OK; - - } - - capture_id_t get_new_id() const { - return m_capture_id; - } - - dsec_t get_start_ts() const { - return m_start_ts; - } - -private: - capture_id_t m_capture_id; - dsec_t m_start_ts; -}; - - -class TrexCaptureRCStop : public TrexCaptureRC { -public: - void set_rc(uint32_t pkt_count) { - m_pkt_count = pkt_count; - m_rc = RC_OK; - } - - uint32_t get_pkt_count() const { - return m_pkt_count; - } - -private: - uint32_t m_pkt_count; -}; - -class TrexCaptureRCFetch : public TrexCaptureRC { -public: - - TrexCaptureRCFetch() { - m_pkt_buffer = nullptr; - m_pending = 0; - } - - void set_pkt_buffer(const TrexPktBuffer *pkt_buffer, uint32_t pending, dsec_t start_ts) { - m_pkt_buffer = pkt_buffer; - m_pending = pending; - m_start_ts = start_ts; - m_rc = RC_OK; - } - - const TrexPktBuffer *get_pkt_buffer() const { - return m_pkt_buffer; - } - - uint32_t get_pending() const { - return m_pending; - } - - dsec_t get_start_ts() const { - return m_start_ts; - } - -private: - const TrexPktBuffer *m_pkt_buffer; - uint32_t m_pending; - dsec_t m_start_ts; -}; - -class TrexCaptureRCRemove : public TrexCaptureRC { -public: - void set_ok() { - m_rc = RC_OK; - } -}; - -class TrexCaptureRCStatus : public TrexCaptureRC { -public: - - void set_status(const Json::Value &json) { - m_json = json; - m_rc = RC_OK; - } - - const Json::Value & get_status() const { - return m_json; - } - -private: - Json::Value m_json; -}; - -/** - * capture filter +/************************************** + * Capture Filter + * * specify which ports to capture and if TX/RX or both - */ + *************************************/ class CaptureFilter { public: CaptureFilter() { @@ -186,10 +40,16 @@ public: m_rx_active = 0; } + /** + * add a port to the active TX port list + */ void add_tx(uint8_t port_id) { m_tx_active |= (1LL << port_id); } + /** + * add a port to the active RX port list + */ void add_rx(uint8_t port_id) { m_rx_active |= (1LL << port_id); } @@ -226,6 +86,10 @@ public: return ( in_tx(port_id) || in_rx(port_id) ); } + /** + * updates the current filter with another filter + * the result is the aggregation of TX /RX active lists + */ CaptureFilter& operator +=(const CaptureFilter &other) { m_tx_active |= other.m_tx_active; m_rx_active |= other.m_rx_active; @@ -248,19 +112,36 @@ private: }; +/************************************** + * Capture + * + * A single instance of a capture + *************************************/ class TrexStatelessCapture { public: + enum state_e { STATE_ACTIVE, STATE_STOPPED, }; - TrexStatelessCapture(capture_id_t id, uint64_t limit, const CaptureFilter &filter); + TrexStatelessCapture(capture_id_t id, + uint64_t limit, + const CaptureFilter &filter, + TrexPktBuffer::mode_e mode); - void handle_pkt_tx(TrexPkt *pkt); + ~TrexStatelessCapture(); + + /** + * handles a packet from the TX side + */ + void handle_pkt_tx(const TrexPkt *pkt); + + /** + * handles a packet from the RX side + */ void handle_pkt_rx(const rte_mbuf_t *m, int port); - ~TrexStatelessCapture(); uint64_t get_id() const { return m_id; @@ -270,8 +151,12 @@ public: return m_filter; } - Json::Value to_json() const; + /** + * stop the capture - from now on all packets will be ignored + * + * @author imarom (1/24/2017) + */ void stop() { m_state = STATE_STOPPED; } @@ -290,6 +175,9 @@ public: return m_start_ts; } + + Json::Value to_json() const; + private: state_e m_state; TrexPktBuffer *m_pkt_buffer; @@ -299,6 +187,14 @@ private: uint64_t m_pkt_index; }; + +/************************************** + * Capture Manager + * Handles all the captures in + * the system + * + * the design is a singleton + *************************************/ class TrexStatelessCaptureMngr { public: @@ -317,7 +213,10 @@ public: /** * starts a new capture */ - void start(const CaptureFilter &filter, uint64_t limit, TrexCaptureRCStart &rc); + void start(const CaptureFilter &filter, + uint64_t limit, + TrexPktBuffer::mode_e mode, + TrexCaptureRCStart &rc); /** * stops an existing capture @@ -346,7 +245,8 @@ public: /** - * return true if any filter is active + * return true if any filter is active + * on a specific port * * @author imarom (1/3/2017) * @@ -359,14 +259,20 @@ public: /** * handle packet from TX */ - void handle_pkt_tx(TrexPkt *pkt); + void handle_pkt_tx(const TrexPkt *pkt) { + if (!m_global_filter.in_filter(pkt)) { + return; + } + + handle_pkt_tx_slow_path(pkt); + } /** * handle packet from RX */ void handle_pkt_rx(const rte_mbuf_t *m, int port) { - /* fast path */ - if (!is_active(port)) { + /* fast path - check the global filter */ + if (!m_global_filter.in_rx(port)) { return; } @@ -385,8 +291,11 @@ private: TrexStatelessCapture * lookup(capture_id_t capture_id); + int lookup_index(capture_id_t capture_id); void handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port); + void handle_pkt_tx_slow_path(const TrexPkt *pkt); + void update_global_filter(); std::vector m_captures; diff --git a/src/stateless/rx/trex_stateless_capture_rc.h b/src/stateless/rx/trex_stateless_capture_rc.h new file mode 100644 index 00000000..12b37c1d --- /dev/null +++ b/src/stateless/rx/trex_stateless_capture_rc.h @@ -0,0 +1,195 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2016 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_STATELESS_CAPTURE_RC_H__ +#define __TREX_STATELESS_CAPTURE_RC_H__ + +typedef int32_t capture_id_t; + +/** + * a base class for a capture command RC + * not to be used directly + */ +class TrexCaptureRC { + +protected: + /* cannot instantiate this object from outside */ + TrexCaptureRC() { + m_rc = RC_INVALID; + } + +public: + + /** + * error types for commands + */ + enum rc_e { + RC_INVALID = 0, + RC_OK = 1, + RC_CAPTURE_NOT_FOUND, + RC_CAPTURE_LIMIT_REACHED, + RC_CAPTURE_FETCH_UNDER_ACTIVE + }; + + bool operator !() const { + return (m_rc != RC_OK); + } + + std::string get_err() const { + assert(m_rc != RC_INVALID); + + switch (m_rc) { + case RC_OK: + return ""; + case RC_CAPTURE_LIMIT_REACHED: + return "capture limit has reached"; + case RC_CAPTURE_NOT_FOUND: + return "capture ID not found"; + case RC_CAPTURE_FETCH_UNDER_ACTIVE: + return "fetch command cannot be executed on an active capture"; + case RC_INVALID: + /* should never be called under invalid */ + assert(0); + + default: + assert(0); + } + } + + void set_err(rc_e rc) { + m_rc = rc; + } + + +protected: + rc_e m_rc; +}; + +/** + * return code for executing capture start + */ +class TrexCaptureRCStart : public TrexCaptureRC { +public: + + void set_rc(capture_id_t new_id, dsec_t start_ts) { + m_capture_id = new_id; + m_start_ts = start_ts; + m_rc = RC_OK; + } + + capture_id_t get_new_id() const { + assert(m_rc == RC_OK); + return m_capture_id; + } + + dsec_t get_start_ts() const { + assert(m_rc == RC_OK); + return m_start_ts; + } + +private: + capture_id_t m_capture_id; + dsec_t m_start_ts; +}; + +/** + * return code for exectuing capture stop + */ +class TrexCaptureRCStop : public TrexCaptureRC { +public: + + void set_rc(uint32_t pkt_count) { + m_pkt_count = pkt_count; + m_rc = RC_OK; + } + + uint32_t get_pkt_count() const { + assert(m_rc == RC_OK); + return m_pkt_count; + } + +private: + uint32_t m_pkt_count; +}; + +/** + * return code for executing capture fetch + */ +class TrexCaptureRCFetch : public TrexCaptureRC { +public: + + void set_rc(const TrexPktBuffer *pkt_buffer, uint32_t pending, dsec_t start_ts) { + m_pkt_buffer = pkt_buffer; + m_pending = pending; + m_start_ts = start_ts; + m_rc = RC_OK; + } + + const TrexPktBuffer *get_pkt_buffer() const { + assert(m_rc == RC_OK); + return m_pkt_buffer; + } + + uint32_t get_pending() const { + assert(m_rc == RC_OK); + return m_pending; + } + + dsec_t get_start_ts() const { + assert(m_rc == RC_OK); + return m_start_ts; + } + +private: + const TrexPktBuffer *m_pkt_buffer; + uint32_t m_pending; + dsec_t m_start_ts; +}; + + + +class TrexCaptureRCRemove : public TrexCaptureRC { +public: + void set_rc() { + m_rc = RC_OK; + } +}; + + +class TrexCaptureRCStatus : public TrexCaptureRC { +public: + + void set_rc(const Json::Value &json) { + m_json = json; + m_rc = RC_OK; + } + + const Json::Value & get_status() const { + assert(m_rc == RC_OK); + return m_json; + } + +private: + Json::Value m_json; +}; + + +#endif /* __TREX_STATELESS_CAPTURE_RC_H__ */ + -- cgit From 1369c6a44b622df3577223ce68ff16a1ea7cc8aa Mon Sep 17 00:00:00 2001 From: imarom Date: Mon, 30 Jan 2017 13:56:01 +0200 Subject: move service mode check to RX queue enable Signed-off-by: imarom --- src/rpc-server/commands/trex_rpc_cmd_general.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/rpc-server/commands/trex_rpc_cmd_general.cpp b/src/rpc-server/commands/trex_rpc_cmd_general.cpp index 54798abb..60180659 100644 --- a/src/rpc-server/commands/trex_rpc_cmd_general.cpp +++ b/src/rpc-server/commands/trex_rpc_cmd_general.cpp @@ -682,10 +682,6 @@ TrexRpcCmdSetRxFeature::_run(const Json::Value ¶ms, Json::Value &result) { uint8_t port_id = parse_port(params, result); TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); - if (!port->is_service_mode_on()) { - generate_execute_err(result, "rx_feature - available only under service mode"); - } - /* decide which feature is being set */ const std::string type = parse_choice(params, "type", {"queue", "server"}, result); @@ -707,6 +703,10 @@ TrexRpcCmdSetRxFeature::parse_queue_msg(const Json::Value &msg, TrexStatelessPor bool enabled = parse_bool(msg, "enabled", result); if (enabled) { + + if (!port->is_service_mode_on()) { + generate_execute_err(result, "setting RX queue is only available under service mode"); + } uint64_t size = parse_uint32(msg, "size", result); @@ -745,7 +745,7 @@ TrexRpcCmdGetRxQueuePkts::_run(const Json::Value ¶ms, Json::Value &result) { TrexStatelessPort *port = get_stateless_obj()->get_port_by_id(port_id); if (!port->is_service_mode_on()) { - generate_execute_err(result, "get_rx_queue_pkts - available only under service mode"); + generate_execute_err(result, "fetching RX queue packets is only available under service mode"); } -- cgit