diff options
author | imarom <imarom@cisco.com> | 2017-01-04 18:46:45 +0200 |
---|---|---|
committer | imarom <imarom@cisco.com> | 2017-01-04 18:46:45 +0200 |
commit | 5257dbb8253fe5b70b75f9c064c4593ca7aee99f (patch) | |
tree | f7fa69be359ce165ae992ba0021fd15ca471b818 /src/stateless | |
parent | ea10422c22479c8e498d8efb5cb19882e70db9ff (diff) |
draft - unreviewed
Signed-off-by: imarom <imarom@cisco.com>
Diffstat (limited to 'src/stateless')
-rw-r--r-- | src/stateless/cp/trex_stateless.cpp | 83 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless.h | 30 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.cpp | 28 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.h | 95 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.cpp | 12 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.h | 32 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_capture.cpp | 28 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_capture.h | 53 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_core.cpp | 65 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_core.h | 17 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_defs.h | 2 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_port_mngr.cpp | 227 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_port_mngr.h | 149 |
13 files changed, 287 insertions, 534 deletions
diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index c31ba0a5..32babbf7 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -18,13 +18,15 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -#include <trex_stateless.h> -#include <trex_stateless_port.h> -#include <sched.h> +//#include <sched.h> #include <iostream> #include <unistd.h> +#include "trex_stateless.h" +#include "trex_stateless_port.h" +#include "trex_stateless_messaging.h" + using namespace std; /*********************************************************** @@ -140,54 +142,35 @@ TrexStateless::get_dp_core_count() { return m_platform_api->get_dp_core_count(); } -void -TrexStateless::encode_stats(Json::Value &global) { - - TrexPlatformGlobalStats stats; - m_platform_api->get_global_stats(stats); - - global["cpu_util"] = stats.m_stats.m_cpu_util; - global["rx_cpu_util"] = stats.m_stats.m_rx_cpu_util; - - global["tx_bps"] = stats.m_stats.m_tx_bps; - global["rx_bps"] = stats.m_stats.m_rx_bps; - - global["tx_pps"] = stats.m_stats.m_tx_pps; - global["rx_pps"] = stats.m_stats.m_rx_pps; - - global["total_tx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_tx_pkts); - global["total_rx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_rx_pkts); - - global["total_tx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_tx_bytes); - global["total_rx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_rx_bytes); - - global["tx_rx_errors"] = Json::Value::UInt64(stats.m_stats.m_tx_rx_errors); - - for (uint8_t i = 0; i < m_port_count; i++) { - std::stringstream ss; - - ss << "port " << i; - Json::Value &port_section = global[ss.str()]; - - m_ports[i]->encode_stats(port_section); - } +capture_id_t +TrexStateless::start_capture(const CaptureFilter &filter, uint64_t limit) { + static MsgReply<capture_id_t> reply; + + reply.reset(); + + CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0); + TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(filter, limit, reply); + + ring->Enqueue((CGenNode *)msg); + + capture_id_t new_id = reply.wait_for_reply(); + + return (new_id); } -/** - * generate a snapshot for publish (async publish) - * - */ -void -TrexStateless::generate_publish_snapshot(std::string &snapshot) { - Json::FastWriter writer; - Json::Value root; - - root["name"] = "trex-stateless-info"; - root["type"] = 0; - - /* stateless specific info goes here */ - root["data"] = Json::nullValue; - - snapshot = writer.write(root); +capture_id_t +TrexStateless::stop_capture(capture_id_t capture_id) { + static MsgReply<capture_id_t> reply; + + reply.reset(); + + CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0); + TrexStatelessRxStopCapture *msg = new TrexStatelessRxStopCapture(capture_id, reply); + + ring->Enqueue((CGenNode *)msg); + + capture_id_t rc = reply.wait_for_reply(); + + return (rc); } diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h index 3a1a2c24..33f16ce9 100644 --- a/src/stateless/cp/trex_stateless.h +++ b/src/stateless/cp/trex_stateless.h @@ -102,6 +102,7 @@ public: * defines the TRex stateless operation mode * */ +class CaptureFilter; class TrexStateless { public: @@ -132,32 +133,21 @@ public: /** - * shutdown the server - */ - void shutdown(); - - /** - * fetch xstats names (keys of dict) - * - */ - void encode_xstats_names(Json::Value &global); - - /** - * fetch xstats values - * + * starts a capture on a 'filter' of ports + * with a limit of packets */ - void encode_xstats_values(Json::Value &global); - + capture_id_t start_capture(const CaptureFilter &filter, uint64_t limit); + /** - * fetch all the stats + * stops an active capture * */ - void encode_stats(Json::Value &global); - + capture_id_t stop_capture(capture_id_t capture_id); + /** - * generate a snapshot for publish + * shutdown the server */ - void generate_publish_snapshot(std::string &snapshot); + void shutdown(); const TrexPlatformApi * get_platform_api() { return (m_platform_api); diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index 7d331c6e..9cf048b0 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -162,7 +162,7 @@ private: * trex stateless port * **************************/ -TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this) { +TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this), m_service_mode(port_id, api) { std::vector<std::pair<uint8_t, uint8_t>> core_pair_list; m_port_id = port_id; @@ -948,24 +948,6 @@ TrexStatelessPort::remove_and_delete_all_streams() { } } -void -TrexStatelessPort::start_rx_capture(const std::string &pcap_filename, uint64_t limit) { - static MsgReply<bool> reply; - - reply.reset(); - - TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(m_port_id, pcap_filename, limit, reply); - send_message_to_rx((TrexStatelessCpToRxMsgBase *)msg); - - /* as below, must wait for ACK from RX core before returning ACK */ - reply.wait_for_reply(); -} - -void -TrexStatelessPort::stop_rx_capture() { - TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopCapture(m_port_id); - send_message_to_rx(msg); -} void TrexStatelessPort::start_rx_queue(uint64_t size) { @@ -980,18 +962,22 @@ TrexStatelessPort::start_rx_queue(uint64_t size) { this might cause the user to lose some packets from the queue */ reply.wait_for_reply(); + + m_service_mode.set_rx_queue(); } void TrexStatelessPort::stop_rx_queue() { TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopQueue(m_port_id); send_message_to_rx(msg); + + m_service_mode.unset_rx_queue(); } -const RXPacketBuffer * +const TrexPktBuffer * TrexStatelessPort::get_rx_queue_pkts() { - static MsgReply<const RXPacketBuffer *> reply; + static MsgReply<const TrexPktBuffer *> reply; reply.reset(); diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index d4ac4018..2cc1b9ca 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -26,12 +26,14 @@ limitations under the License. #include "trex_dp_port_events.h" #include "trex_stateless_rx_defs.h" #include "trex_stream.h" +#include "trex_exception.h" +#include "trex_stateless_capture.h" class TrexStatelessCpToDpMsgBase; class TrexStatelessCpToRxMsgBase; class TrexStreamsGraphObj; class TrexPortMultiplier; -class RXPacketBuffer; +class TrexPktBuffer; /** @@ -113,6 +115,56 @@ private: static const std::string g_unowned_handler; }; +/** + * enforces in/out from service mode + * + * @author imarom (1/4/2017) + */ +class TrexServiceMode { +public: + TrexServiceMode(uint8_t port_id, const TrexPlatformApi *api) { + m_is_enabled = false; + m_has_rx_queue = false; + m_port_id = port_id; + m_port_attr = api->getPortAttrObj(port_id); + } + + void enable() { + m_port_attr->set_rx_filter_mode(RX_FILTER_MODE_ALL); + m_is_enabled = true; + } + + void disable() { + if (m_has_rx_queue) { + throw TrexException("unable to disable service mode - please remove RX queue"); + } + + if (TrexStatelessCaptureMngr::getInstance().is_active(m_port_id)) { + throw TrexException("unable to disable service - an active capture on port " + std::to_string(m_port_id) + " exists"); + } + + m_port_attr->set_rx_filter_mode(RX_FILTER_MODE_HW); + m_is_enabled = false; + } + + bool is_enabled() const { + return m_is_enabled; + } + + void set_rx_queue() { + m_has_rx_queue = true; + } + + void unset_rx_queue() { + m_has_rx_queue = false; + } + +private: + bool m_is_enabled; + bool m_has_rx_queue; + TRexPortAttr *m_port_attr; + uint8_t m_port_id; +}; class AsyncStopEvent; @@ -150,7 +202,15 @@ public: RC_ERR_FAILED_TO_COMPILE_STREAMS }; - + /** + * port capture mode + */ + enum capture_mode_e { + PORT_CAPTURE_NONE = 0, + PORT_CAPTURE_RX, + PORT_CAPTURE_ALL + }; + TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api); ~TrexStatelessPort(); @@ -227,6 +287,20 @@ public: double duration, bool is_dual); + /** + * moves port to / out service mode + */ + void set_service_mode(bool enabled) { + if (enabled) { + m_service_mode.enable(); + } else { + m_service_mode.disable(); + } + } + bool is_service_mode_on() const { + return m_service_mode.is_enabled(); + } + /** * get the port state * @@ -367,16 +441,16 @@ public: /** - * enable RX capture on port + * starts capturing packets * */ - void start_rx_capture(const std::string &pcap_filename, uint64_t limit); + void start_capture(capture_mode_e mode, uint64_t limit); /** - * disable RX capture if on + * stops capturing packets * */ - void stop_rx_capture(); + void stop_capture(); /** * start RX queueing of packets @@ -398,7 +472,7 @@ public: * fetch the RX queue packets from the queue * */ - const RXPacketBuffer *get_rx_queue_pkts(); + const TrexPktBuffer *get_rx_queue_pkts(); /** * configures port for L2 mode @@ -429,7 +503,9 @@ public: } private: - + void set_service_mode_on(); + void set_service_mode_off(); + bool is_core_active(int core_id); const std::vector<uint8_t> get_core_id_list () { @@ -514,6 +590,9 @@ private: TrexPortOwner m_owner; int m_pending_async_stop_event; + + TrexServiceMode m_service_mode; + static const uint32_t MAX_STREAMS = 20000; }; diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index aeb1e677..f441c692 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -263,18 +263,20 @@ bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) { bool TrexStatelessRxStartCapture::handle(CRxCoreStateless *rx_core) { - rx_core->start_recorder(m_port_id, m_pcap_filename, m_limit); + capture_id_t capture_id = rx_core->start_capture(m_limit, m_filter); /* mark as done */ - m_reply.set_reply(true); + m_reply.set_reply(capture_id); return true; } bool TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) { - rx_core->stop_recorder(m_port_id); - + capture_id_t rc = rx_core->stop_capture(m_capture_id); + + m_reply.set_reply(rc); + return true; } @@ -299,7 +301,7 @@ TrexStatelessRxStopQueue::handle(CRxCoreStateless *rx_core) { bool TrexStatelessRxQueueGetPkts::handle(CRxCoreStateless *rx_core) { - const RXPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id); + const TrexPktBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id); /* set the reply */ m_reply.set_reply(pkt_buffer); diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index 72b92d11..5f4978f5 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -28,12 +28,13 @@ limitations under the License. #include "trex_stateless_rx_defs.h" #include "os_time.h" #include "utl_ip.h" +#include "trex_stateless_capture.h" class TrexStatelessDpCore; class CRxCoreStateless; class TrexStreamsCompiledObj; class CFlowGenListPerThread; -class RXPacketBuffer; +class TrexPktBuffer; /** * Generic message reply object @@ -487,36 +488,35 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase { class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase { public: - TrexStatelessRxStartCapture(uint8_t port_id, - const std::string &pcap_filename, + TrexStatelessRxStartCapture(const CaptureFilter& filter, uint64_t limit, - MsgReply<bool> &reply) : m_reply(reply) { + MsgReply<capture_id_t> &reply) : m_reply(reply) { - m_port_id = port_id; - m_limit = limit; - m_pcap_filename = pcap_filename; + m_limit = limit; + m_filter = filter; } virtual bool handle(CRxCoreStateless *rx_core); private: - uint8_t m_port_id; - std::string m_pcap_filename; - uint64_t m_limit; - MsgReply<bool> &m_reply; + uint8_t m_port_id; + uint64_t m_limit; + CaptureFilter m_filter; + MsgReply<capture_id_t> &m_reply; }; class TrexStatelessRxStopCapture : public TrexStatelessCpToRxMsgBase { public: - TrexStatelessRxStopCapture(uint8_t port_id) { - m_port_id = port_id; + TrexStatelessRxStopCapture(capture_id_t capture_id, MsgReply<capture_id_t> &reply) : m_reply(reply) { + m_capture_id = capture_id; } virtual bool handle(CRxCoreStateless *rx_core); private: - uint8_t m_port_id; + capture_id_t m_capture_id; + MsgReply<capture_id_t> &m_reply; }; @@ -556,7 +556,7 @@ private: class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgBase { public: - TrexStatelessRxQueueGetPkts(uint8_t port_id, MsgReply<const RXPacketBuffer *> &reply) : m_reply(reply) { + TrexStatelessRxQueueGetPkts(uint8_t port_id, MsgReply<const TrexPktBuffer *> &reply) : m_reply(reply) { m_port_id = port_id; } @@ -568,7 +568,7 @@ public: private: uint8_t m_port_id; - MsgReply<const RXPacketBuffer *> &m_reply; + MsgReply<const TrexPktBuffer *> &m_reply; }; diff --git a/src/stateless/rx/trex_stateless_capture.cpp b/src/stateless/rx/trex_stateless_capture.cpp index 83bb2d38..4ed126cc 100644 --- a/src/stateless/rx/trex_stateless_capture.cpp +++ b/src/stateless/rx/trex_stateless_capture.cpp @@ -53,22 +53,36 @@ TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) { m_pkt_buffer->push(m); } +void +TrexStatelessCaptureMngr::update_global_filter() { + CaptureFilter new_filter; + + for (TrexStatelessCapture *capture : m_captures) { + new_filter += capture->get_filter(); + } + + m_global_filter = new_filter; +} + capture_id_t TrexStatelessCaptureMngr::add(uint64_t limit, const CaptureFilter &filter) { if (m_captures.size() > MAX_CAPTURE_SIZE) { - throw TrexException(TrexException::T_CAPTURE_MAX_INSTANCES); + return CAPTURE_TOO_MANY_CAPTURES; } int new_id = m_id_counter++; TrexStatelessCapture *new_buffer = new TrexStatelessCapture(new_id, limit, filter); m_captures.push_back(new_buffer); + + /* update global filter */ + update_global_filter(); return new_id; } -void +capture_id_t TrexStatelessCaptureMngr::remove(capture_id_t id) { int index = -1; @@ -81,12 +95,18 @@ TrexStatelessCaptureMngr::remove(capture_id_t id) { /* does not exist */ if (index == -1) { - throw TrexException(TrexException::T_CAPTURE_INVALID_ID); + return CAPTURE_ID_NOT_FOUND; } TrexStatelessCapture *capture = m_captures[index]; m_captures.erase(m_captures.begin() + index); - delete capture; + + delete capture; + + /* update global filter */ + update_global_filter(); + + return id; } void diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h index f7cd451f..4d0b6a78 100644 --- a/src/stateless/rx/trex_stateless_capture.h +++ b/src/stateless/rx/trex_stateless_capture.h @@ -31,16 +31,16 @@ limitations under the License. class CaptureFilter { public: CaptureFilter() { - tx_active = 0; - rx_active = 0; + m_tx_active = 0; + m_rx_active = 0; } void add_tx(uint8_t port_id) { - tx_active |= (1LL << port_id); + m_tx_active |= (1LL << port_id); } void add_rx(uint8_t port_id) { - rx_active |= (1LL << port_id); + m_rx_active |= (1LL << port_id); } void add(uint8_t port_id) { @@ -63,21 +63,36 @@ public: bool in_rx(uint8_t port_id) const { uint64_t bit = (1LL << port_id); - return ((rx_active & bit) == bit); + return ((m_rx_active & bit) == bit); } bool in_tx(uint8_t port_id) const { uint64_t bit = (1LL << port_id); - return ((tx_active & bit) == bit); + return ((m_tx_active & bit) == bit); + } + + bool in_any(uint8_t port_id) const { + return ( in_tx(port_id) || in_rx(port_id) ); + } + + CaptureFilter& operator +=(const CaptureFilter &other) { + m_tx_active |= other.m_tx_active; + m_rx_active |= other.m_rx_active; + + return *this; } private: - uint64_t tx_active; - uint64_t rx_active; + uint64_t m_tx_active; + uint64_t m_rx_active; }; -typedef uint64_t capture_id_t; +typedef int64_t capture_id_t; +enum { + CAPTURE_ID_NOT_FOUND = -1, + CAPTURE_TOO_MANY_CAPTURES = -2, +}; class TrexStatelessCapture { public: @@ -93,6 +108,10 @@ public: return m_id; } + const CaptureFilter & get_filter() const { + return m_filter; + } + private: TrexPktBuffer *m_pkt_buffer; CaptureFilter m_filter; @@ -122,9 +141,11 @@ public: /** - * stops capture mode + * stops capture mode + * on success, will return the ID of the removed one + * o.w it will be an error */ - void remove(capture_id_t id); + capture_id_t remove(capture_id_t id); /** * removes all captures @@ -139,8 +160,8 @@ public: * * @return bool */ - bool is_active() const { - return (m_captures.size() != 0); + bool is_active(uint8_t port) const { + return m_global_filter.in_any(port); } /** @@ -153,7 +174,7 @@ public: */ void handle_pkt_rx(const rte_mbuf_t *m, int port) { /* fast path */ - if (!is_active()) { + if (!is_active(port)) { return; } @@ -169,11 +190,15 @@ private: } void handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port); + void update_global_filter(); std::vector<TrexStatelessCapture *> m_captures; capture_id_t m_id_counter; + /* a union of all the filters curently active */ + CaptureFilter m_global_filter; + static const int MAX_CAPTURE_SIZE = 10; }; diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index d27485de..f1ba303a 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -69,11 +69,11 @@ void CRFC2544Info::export_data(rfc2544_info_t_ &obj) { void CRxCoreStateless::create(const CRxSlCfg &cfg) { m_capture = false; m_max_ports = cfg.m_max_ports; - + m_tx_cores = cfg.m_tx_cores; + CMessagingManager * cp_rx = CMsgIns::Ins()->getCpRx(); m_ring_from_cp = cp_rx->getRingCpToDp(0); - m_ring_to_cp = cp_rx->getRingDpToCp(0); m_state = STATE_IDLE; for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) { @@ -130,6 +130,36 @@ bool CRxCoreStateless::periodic_check_for_cp_messages() { } +void +CRxCoreStateless::periodic_check_for_dp_messages() { + + for (int i = 0; i < m_tx_cores; i++) { + periodic_check_for_dp_messages_core(i); + } + +} + +void +CRxCoreStateless::periodic_check_for_dp_messages_core(uint32_t core_id) { + + CNodeRing *ring = CMsgIns::Ins()->getRxDp()->getRingDpToCp(core_id); + + /* fast path */ + if ( likely ( ring->isEmpty() ) ) { + return; + } + + while (true) { + CGenNode *node = NULL; + + if (ring->Dequeue(node) != 0) { + break; + } + + //assert(node); + } +} + void CRxCoreStateless::recalculate_next_state() { if (m_state == STATE_QUIT) { return; @@ -176,16 +206,6 @@ void CRxCoreStateless::idle_state_loop() { } /** - * for each port give a tick (for flushing if needed) - * - */ -void CRxCoreStateless::port_manager_tick() { - for (int i = 0; i < m_max_ports; i++) { - m_rx_port_mngr[i].tick(); - } -} - -/** * for each port handle the grat ARP mechansim * */ @@ -199,7 +219,6 @@ void CRxCoreStateless::handle_work_stage() { /* set the next sync time to */ dsec_t sync_time_sec = now_sec() + (1.0 / 1000); - dsec_t tick_time_sec = now_sec() + 1.0; dsec_t grat_arp_sec = now_sec() + (double)CGlobalInfo::m_options.m_arp_ref_per; while (m_state == STATE_WORKING) { @@ -211,14 +230,10 @@ void CRxCoreStateless::handle_work_stage() { if ( (now - sync_time_sec) > 0 ) { periodic_check_for_cp_messages(); + //periodic_check_for_dp_messages(); sync_time_sec = now + (1.0 / 1000); } - if ( (now - tick_time_sec) > 0) { - port_manager_tick(); - tick_time_sec = now + 1.0; - } - if ( (now - grat_arp_sec) > 0) { handle_grat_arp(); grat_arp_sec = now + (double)CGlobalInfo::m_options.m_arp_ref_per; @@ -317,16 +332,14 @@ double CRxCoreStateless::get_cpu_util() { } -void -CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit) { - m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit); - recalculate_next_state(); +capture_id_t +CRxCoreStateless::start_capture(uint64_t limit, const CaptureFilter &filter) { + return TrexStatelessCaptureMngr::getInstance().add(limit, filter); } -void -CRxCoreStateless::stop_recorder(uint8_t port_id) { - m_rx_port_mngr[port_id].stop_recorder(); - recalculate_next_state(); +capture_id_t +CRxCoreStateless::stop_capture(capture_id_t capture_id) { + return TrexStatelessCaptureMngr::getInstance().remove(capture_id); } void diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index 4eed59a1..21ed51ba 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -27,6 +27,7 @@ #include "pal/linux/sanb_atomic.h" #include "utl_cpuu.h" #include "trex_stateless_rx_port_mngr.h" +#include "trex_stateless_capture.h" class TrexStatelessCpToRxMsgBase; @@ -127,16 +128,16 @@ class CRxCoreStateless { double get_cpu_util(); void update_cpu_util(); - const RXPacketBuffer *get_rx_queue_pkts(uint8_t port_id) { + const TrexPktBuffer *get_rx_queue_pkts(uint8_t port_id) { return m_rx_port_mngr[port_id].get_pkt_buffer(); } /** - * start capturing of RX packets on a specific port + * start capturing packets * */ - void start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit); - void stop_recorder(uint8_t port_id); + capture_id_t start_capture(uint64_t limit, const CaptureFilter &filter); + capture_id_t stop_capture(capture_id_t capture_id); /** * start RX queueing of packets @@ -162,7 +163,12 @@ class CRxCoreStateless { private: void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg); + bool periodic_check_for_cp_messages(); + + void periodic_check_for_dp_messages(); + void periodic_check_for_dp_messages_core(uint32_t core_id); + void tickle(); void idle_state_loop(); @@ -172,7 +178,6 @@ class CRxCoreStateless { void capture_pkt(rte_mbuf_t *m); void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r); void handle_work_stage(); - void port_manager_tick(); void handle_grat_arp(); int process_all_pending_pkts(bool flush_rx = false); @@ -186,10 +191,10 @@ class CRxCoreStateless { private: TrexMonitor m_monitor; uint32_t m_max_ports; + uint32_t m_tx_cores; bool m_capture; state_e m_state; CNodeRing *m_ring_from_cp; - CNodeRing *m_ring_to_cp; CCpuUtlDp m_cpu_dp_u; CCpuUtlCp m_cpu_cp_u; diff --git a/src/stateless/rx/trex_stateless_rx_defs.h b/src/stateless/rx/trex_stateless_rx_defs.h index aefcc133..367cf4e3 100644 --- a/src/stateless/rx/trex_stateless_rx_defs.h +++ b/src/stateless/rx/trex_stateless_rx_defs.h @@ -38,10 +38,12 @@ class CRxSlCfg { m_max_ports = 0; m_cps = 0.0; m_num_crc_fix_bytes = 0; + m_tx_cores = 0; } public: uint32_t m_max_ports; + uint32_t m_tx_cores; double m_cps; CPortLatencyHWBase * m_ports[TREX_MAX_PORTS]; uint8_t m_num_crc_fix_bytes; diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp index e16b3d0c..d2e0b4e8 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp @@ -20,48 +20,10 @@ */ #include "bp_sim.h" #include "trex_stateless_rx_port_mngr.h" -#include "common/captureFile.h" #include "trex_stateless_rx_core.h" #include "common/Network/Packet/Arp.h" #include "pkt_gen.h" - -/** - * copy MBUF to a flat buffer - * - * @author imarom (12/20/2016) - * - * @param dest - buffer with at least rte_pktmbuf_pkt_len(m) - * bytes - * @param m - MBUF to copy - * - * @return uint8_t* - */ -void copy_mbuf(uint8_t *dest, const rte_mbuf_t *m) { - - int index = 0; - for (const rte_mbuf_t *it = m; it != NULL; it = it->next) { - const uint8_t *src = rte_pktmbuf_mtod(it, const uint8_t *); - memcpy(dest + index, src, it->data_len); - index += it->data_len; - } -} - -/************************************** - * RX packet - * - *************************************/ -RXPacket::RXPacket(const rte_mbuf_t *m) { - - /* allocate buffer */ - m_size = m->pkt_len; - m_raw = new uint8_t[m_size]; - - /* copy data */ - copy_mbuf(m_raw, m); - - /* generate a packet timestamp */ - m_timestamp = now_sec(); -} +#include "trex_stateless_capture.h" /************************************** * latency RX feature @@ -231,79 +193,12 @@ RXLatency::to_json() const { * *************************************/ -RXPacketBuffer::RXPacketBuffer(uint64_t size) { - m_buffer = nullptr; - m_head = 0; - m_tail = 0; - m_size = (size + 1); // for the empty/full difference 1 slot reserved - - /* generate queue */ - m_buffer = new RXPacket*[m_size](); // zeroed -} - -RXPacketBuffer::~RXPacketBuffer() { - assert(m_buffer); - - while (!is_empty()) { - RXPacket *pkt = pop(); - delete pkt; - } - delete [] m_buffer; -} - -void -RXPacketBuffer::push(const rte_mbuf_t *m) { - /* if full - pop the oldest */ - if (is_full()) { - delete pop(); - } - - /* push packet */ - m_buffer[m_head] = new RXPacket(m); - m_head = next(m_head); -} - -RXPacket * -RXPacketBuffer::pop() { - assert(!is_empty()); - - RXPacket *pkt = m_buffer[m_tail]; - m_tail = next(m_tail); - - return pkt; -} - -uint64_t -RXPacketBuffer::get_element_count() const { - if (m_head >= m_tail) { - return (m_head - m_tail); - } else { - return ( get_capacity() - (m_tail - m_head - 1) ); - } -} - -Json::Value -RXPacketBuffer::to_json() const { - - Json::Value output = Json::arrayValue; - - int tmp = m_tail; - while (tmp != m_head) { - RXPacket *pkt = m_buffer[tmp]; - output.append(pkt->to_json()); - tmp = next(tmp); - } - - return output; -} - - void RXQueue::start(uint64_t size) { if (m_pkt_buffer) { delete m_pkt_buffer; } - m_pkt_buffer = new RXPacketBuffer(size); + m_pkt_buffer = new TrexPktBuffer(size, TrexPktBuffer::MODE_DROP_HEAD); } void @@ -314,7 +209,7 @@ RXQueue::stop() { } } -const RXPacketBuffer * +const TrexPktBuffer * RXQueue::fetch() { /* if no buffer or the buffer is empty - give a NULL one */ @@ -323,10 +218,10 @@ RXQueue::fetch() { } /* hold a pointer to the old one */ - RXPacketBuffer *old_buffer = m_pkt_buffer; + TrexPktBuffer *old_buffer = m_pkt_buffer; /* replace the old one with a new one and freeze the old */ - m_pkt_buffer = new RXPacketBuffer(old_buffer->get_capacity()); + m_pkt_buffer = new TrexPktBuffer(old_buffer->get_capacity(), old_buffer->get_mode()); return old_buffer; } @@ -348,97 +243,6 @@ RXQueue::to_json() const { return output; } -/************************************** - * RX feature recorder - * - *************************************/ - -RXPacketRecorder::RXPacketRecorder() { - m_writer = NULL; - m_count = 0; - m_limit = 0; - m_epoch = -1; - - m_pending_flush = false; -} - -void -RXPacketRecorder::start(const std::string &pcap, uint64_t limit) { - m_writer = CCapWriterFactory::CreateWriter(LIBPCAP, (char *)pcap.c_str()); - if (m_writer == NULL) { - std::stringstream ss; - ss << "unable to create PCAP file: " << pcap; - throw TrexException(ss.str()); - } - - assert(limit > 0); - - m_limit = limit; - m_count = 0; - m_pending_flush = false; - m_pcap_filename = pcap; -} - -void -RXPacketRecorder::stop() { - if (!m_writer) { - return; - } - - delete m_writer; - m_writer = NULL; -} - -void -RXPacketRecorder::flush_to_disk() { - - if (m_writer && m_pending_flush) { - m_writer->flush_to_disk(); - m_pending_flush = false; - } -} - -void -RXPacketRecorder::handle_pkt(const rte_mbuf_t *m) { - if (!m_writer) { - return; - } - - dsec_t now = now_sec(); - if (m_epoch < 0) { - m_epoch = now; - } - - dsec_t dt = now - m_epoch; - - CPktNsecTimeStamp t_c(dt); - m_pkt.time_nsec = t_c.m_time_nsec; - m_pkt.time_sec = t_c.m_time_sec; - - copy_mbuf((uint8_t *)m_pkt.raw, m); - m_pkt.pkt_len = m->pkt_len; - - m_writer->write_packet(&m_pkt); - m_count++; - m_pending_flush = true; - - if (m_count == m_limit) { - stop(); - } - -} - -Json::Value -RXPacketRecorder::to_json() const { - Json::Value output = Json::objectValue; - - output["pcap_filename"] = m_pcap_filename; - output["limit"] = Json::UInt64(m_limit); - output["count"] = Json::UInt64(m_count); - - return output; -} - /************************************** * RX feature server (ARP, ICMP) and etc. @@ -786,10 +590,6 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) { m_latency.handle_pkt(m); } - if (is_feature_set(RECORDER)) { - m_recorder.handle_pkt(m); - } - if (is_feature_set(QUEUE)) { m_queue.handle_pkt(m); } @@ -797,6 +597,9 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) { if (is_feature_set(SERVER)) { m_server.handle_pkt(m); } + + /* capture */ + TrexStatelessCaptureMngr::getInstance().handle_pkt_rx(m, m_port_id); } int RXPortManager::process_all_pending_pkts(bool flush_rx) { @@ -835,13 +638,6 @@ int RXPortManager::process_all_pending_pkts(bool flush_rx) { } void -RXPortManager::tick() { - if (is_feature_set(RECORDER)) { - m_recorder.flush_to_disk(); - } -} - -void RXPortManager::send_next_grat_arp() { if (is_feature_set(GRAT_ARP)) { m_grat_arp.send_next_grat_arp(); @@ -887,13 +683,6 @@ RXPortManager::to_json() const { output["latency"]["is_active"] = false; } - if (is_feature_set(RECORDER)) { - output["sniffer"] = m_recorder.to_json(); - output["sniffer"]["is_active"] = true; - } else { - output["sniffer"]["is_active"] = false; - } - if (is_feature_set(QUEUE)) { output["queue"] = m_queue.to_json(); output["queue"]["is_active"] = true; diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h index 6efdae64..0cc60716 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.h +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h @@ -25,8 +25,7 @@ #include <stdint.h> #include "common/base64.h" -#include "common/captureFile.h" - +#include "trex_stateless_pkt.h" class CPortLatencyHWBase; class CRFC2544Info; @@ -80,97 +79,12 @@ public: CRxCoreErrCntrs *m_err_cntrs; }; -/** - * describes a single saved RX packet - * - */ -class RXPacket { -public: - - RXPacket(const rte_mbuf_t *m); - - /* slow path and also RVO - pass by value is ok */ - Json::Value to_json() { - Json::Value output; - output["ts"] = m_timestamp; - output["binary"] = base64_encode(m_raw, m_size); - return output; - } - - ~RXPacket() { - if (m_raw) { - delete [] m_raw; - } - } - -private: - - uint8_t *m_raw; - uint16_t m_size; - dsec_t m_timestamp; -}; - /************************************** * RX feature queue * *************************************/ -class RXPacketBuffer { -public: - - RXPacketBuffer(uint64_t size); - ~RXPacketBuffer(); - - /** - * push a packet to the buffer - * - */ - void push(const rte_mbuf_t *m); - - /** - * generate a JSON output of the queue - * - */ - Json::Value to_json() const; - - - bool is_empty() const { - return (m_head == m_tail); - } - - bool is_full() const { - return ( next(m_head) == m_tail); - } - - /** - * return the total amount of space possible - */ - uint64_t get_capacity() const { - /* one slot is used for diff between full/empty */ - return (m_size - 1); - } - - /** - * returns how many elements are in the queue - */ - uint64_t get_element_count() const; - -private: - int next(int v) const { - return ( (v + 1) % m_size ); - } - - /* pop in case of full queue - internal usage */ - RXPacket * pop(); - - int m_head; - int m_tail; - int m_size; - RXPacket **m_buffer; -}; - - class RXQueue { public: RXQueue() { @@ -191,7 +105,7 @@ public: * fetch the current buffer * return NULL if no packets */ - const RXPacketBuffer * fetch(); + const TrexPktBuffer * fetch(); /** * stop RX queue @@ -204,42 +118,7 @@ public: Json::Value to_json() const; private: - RXPacketBuffer *m_pkt_buffer; -}; - -/************************************** - * RX feature PCAP recorder - * - *************************************/ - -class RXPacketRecorder { -public: - RXPacketRecorder(); - - ~RXPacketRecorder() { - stop(); - } - - void start(const std::string &pcap, uint64_t limit); - void stop(); - void handle_pkt(const rte_mbuf_t *m); - - /** - * flush any cached packets to disk - * - */ - void flush_to_disk(); - - Json::Value to_json() const; - -private: - CFileWriterBase *m_writer; - std::string m_pcap_filename; - CCapPktRaw m_pkt; - dsec_t m_epoch; - uint64_t m_limit; - uint64_t m_count; - bool m_pending_flush; + TrexPktBuffer *m_pkt_buffer; }; @@ -311,7 +190,6 @@ public: enum feature_t { NO_FEATURES = 0x0, LATENCY = 0x1, - RECORDER = 0x2, QUEUE = 0x4, SERVER = 0x8, GRAT_ARP = 0x10, @@ -354,17 +232,6 @@ public: unset_feature(LATENCY); } - /* recorder */ - void start_recorder(const std::string &pcap, uint64_t limit_pkts) { - m_recorder.start(pcap, limit_pkts); - set_feature(RECORDER); - } - - void stop_recorder() { - m_recorder.stop(); - unset_feature(RECORDER); - } - /* queue */ void start_queue(uint32_t size) { m_queue.start(size); @@ -376,7 +243,7 @@ public: unset_feature(QUEUE); } - const RXPacketBuffer *get_pkt_buffer() { + const TrexPktBuffer *get_pkt_buffer() { if (!is_feature_set(QUEUE)) { return nullptr; } @@ -415,13 +282,6 @@ public: void handle_pkt(const rte_mbuf_t *m); /** - * maintenance - * - * @author imarom (11/24/2016) - */ - void tick(); - - /** * send next grat arp (if on) * * @author imarom (12/13/2016) @@ -482,7 +342,6 @@ private: uint32_t m_features; uint8_t m_port_id; RXLatency m_latency; - RXPacketRecorder m_recorder; RXQueue m_queue; RXServer m_server; RXGratARP m_grat_arp; |