diff options
Diffstat (limited to 'src/stateless/rx')
-rw-r--r-- | src/stateless/rx/trex_stateless_capture.cpp | 292 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_capture.h | 312 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_capture_rc.h | 195 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_core.cpp | 67 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_core.h | 21 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_defs.h | 2 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_port_mngr.cpp | 229 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_port_mngr.h | 160 |
8 files changed, 861 insertions, 417 deletions
diff --git a/src/stateless/rx/trex_stateless_capture.cpp b/src/stateless/rx/trex_stateless_capture.cpp new file mode 100644 index 00000000..bf7623d5 --- /dev/null +++ b/src/stateless/rx/trex_stateless_capture.cpp @@ -0,0 +1,292 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2016 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#include "trex_stateless_capture.h" +#include "trex_exception.h" + +/************************************** + * Capture + * + * A single instance of a capture + *************************************/ +TrexStatelessCapture::TrexStatelessCapture(capture_id_t id, + uint64_t limit, + const CaptureFilter &filter, + TrexPktBuffer::mode_e mode) { + m_id = id; + m_pkt_buffer = new TrexPktBuffer(limit, mode); + m_filter = filter; + m_state = STATE_ACTIVE; + m_start_ts = now_sec(); + m_pkt_index = 0; +} + +TrexStatelessCapture::~TrexStatelessCapture() { + if (m_pkt_buffer) { + delete m_pkt_buffer; + } +} + +void +TrexStatelessCapture::handle_pkt_tx(const TrexPkt *pkt) { + + if (m_state != STATE_ACTIVE) { + return; + } + + /* if not in filter - back off */ + if (!m_filter.in_filter(pkt)) { + return; + } + + if (pkt->get_ts() < m_start_ts) { + return; + } + + m_pkt_buffer->push(pkt, ++m_pkt_index); +} + +void +TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) { + + if (m_state != STATE_ACTIVE) { + return; + } + + if (!m_filter.in_rx(port)) { + return; + } + + m_pkt_buffer->push(m, port, TrexPkt::ORIGIN_RX, ++m_pkt_index); +} + + +Json::Value +TrexStatelessCapture::to_json() const { + Json::Value output = Json::objectValue; + + output["id"] = Json::UInt64(m_id); + output["filter"] = m_filter.to_json(); + output["count"] = m_pkt_buffer->get_element_count(); + output["bytes"] = m_pkt_buffer->get_bytes(); + output["limit"] = m_pkt_buffer->get_capacity(); + + switch (m_state) { + case STATE_ACTIVE: + output["state"] = "ACTIVE"; + break; + + case STATE_STOPPED: + output["state"] = "STOPPED"; + break; + + default: + assert(0); + } + + return output; +} + +/** + * fetch up to 'pkt_limit' from the capture + * + */ +TrexPktBuffer * +TrexStatelessCapture::fetch(uint32_t pkt_limit, uint32_t &pending) { + + /* if the total sum of packets is within the limit range - take it */ + if (m_pkt_buffer->get_element_count() <= pkt_limit) { + TrexPktBuffer *current = m_pkt_buffer; + m_pkt_buffer = new TrexPktBuffer(m_pkt_buffer->get_capacity(), m_pkt_buffer->get_mode()); + pending = 0; + return current; + } + + /* partial fetch - take a partial list */ + TrexPktBuffer *partial = m_pkt_buffer->pop_n(pkt_limit); + pending = m_pkt_buffer->get_element_count(); + + return partial; +} + + +/************************************** + * Capture Manager + * handles all the captures + * in the system + *************************************/ + +/** + * holds the global filter in the capture manager + * which ports in the entire system are monitored + */ +void +TrexStatelessCaptureMngr::update_global_filter() { + CaptureFilter new_filter; + + /* recalculates the global filter */ + for (TrexStatelessCapture *capture : m_captures) { + new_filter += capture->get_filter(); + } + + m_global_filter = new_filter; +} + + +/** + * lookup a specific capture by ID + */ +TrexStatelessCapture * +TrexStatelessCaptureMngr::lookup(capture_id_t capture_id) { + + for (int i = 0; i < m_captures.size(); i++) { + if (m_captures[i]->get_id() == capture_id) { + return m_captures[i]; + } + } + + /* does not exist */ + return nullptr; +} + + +int +TrexStatelessCaptureMngr::lookup_index(capture_id_t capture_id) { + for (int i = 0; i < m_captures.size(); i++) { + if (m_captures[i]->get_id() == capture_id) { + return i; + } + } + return -1; +} + + +/** + * starts a new capture + * + */ +void +TrexStatelessCaptureMngr::start(const CaptureFilter &filter, + uint64_t limit, + TrexPktBuffer::mode_e mode, + TrexCaptureRCStart &rc) { + + /* check for maximum active captures */ + if (m_captures.size() >= MAX_CAPTURE_SIZE) { + rc.set_err(TrexCaptureRC::RC_CAPTURE_LIMIT_REACHED); + return; + } + + /* create a new capture*/ + int new_id = m_id_counter++; + TrexStatelessCapture *new_capture = new TrexStatelessCapture(new_id, limit, filter, mode); + m_captures.push_back(new_capture); + + /* update global filter */ + update_global_filter(); + + /* result */ + rc.set_rc(new_id, new_capture->get_start_ts()); +} + +void +TrexStatelessCaptureMngr::stop(capture_id_t capture_id, TrexCaptureRCStop &rc) { + TrexStatelessCapture *capture = lookup(capture_id); + if (!capture) { + rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND); + return; + } + + capture->stop(); + rc.set_rc(capture->get_pkt_count()); +} + + +void +TrexStatelessCaptureMngr::fetch(capture_id_t capture_id, uint32_t pkt_limit, TrexCaptureRCFetch &rc) { + TrexStatelessCapture *capture = lookup(capture_id); + if (!capture) { + rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND); + return; + } + + uint32_t pending = 0; + TrexPktBuffer *pkt_buffer = capture->fetch(pkt_limit, pending); + + rc.set_rc(pkt_buffer, pending, capture->get_start_ts()); +} + +void +TrexStatelessCaptureMngr::remove(capture_id_t capture_id, TrexCaptureRCRemove &rc) { + + /* lookup index */ + int index = lookup_index(capture_id); + if (index == -1) { + rc.set_err(TrexCaptureRC::RC_CAPTURE_NOT_FOUND); + return; + } + + TrexStatelessCapture *capture = m_captures[index]; + m_captures.erase(m_captures.begin() + index); + + /* free memory */ + delete capture; + + /* update global filter */ + update_global_filter(); + + rc.set_rc(); +} + +void +TrexStatelessCaptureMngr::reset() { + TrexCaptureRCRemove dummy; + + while (m_captures.size() > 0) { + remove(m_captures[0]->get_id(), dummy); + assert(!!dummy); + } +} + +void +TrexStatelessCaptureMngr::handle_pkt_tx_slow_path(const TrexPkt *pkt) { + for (TrexStatelessCapture *capture : m_captures) { + capture->handle_pkt_tx(pkt); + } +} + +void +TrexStatelessCaptureMngr::handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port) { + for (TrexStatelessCapture *capture : m_captures) { + capture->handle_pkt_rx(m, port); + } +} + +Json::Value +TrexStatelessCaptureMngr::to_json() const { + Json::Value lst = Json::arrayValue; + + for (TrexStatelessCapture *capture : m_captures) { + lst.append(capture->to_json()); + } + + return lst; +} + diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h new file mode 100644 index 00000000..e4a2e632 --- /dev/null +++ b/src/stateless/rx/trex_stateless_capture.h @@ -0,0 +1,312 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2016 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_STATELESS_CAPTURE_H__ +#define __TREX_STATELESS_CAPTURE_H__ + +#include <stdint.h> +#include <assert.h> + +#include "trex_stateless_pkt.h" +#include "trex_stateless_capture_rc.h" + + +/************************************** + * Capture Filter + * + * specify which ports to capture and if TX/RX or both + *************************************/ +class CaptureFilter { +public: + CaptureFilter() { + m_tx_active = 0; + m_rx_active = 0; + } + + /** + * add a port to the active TX port list + */ + void add_tx(uint8_t port_id) { + m_tx_active |= (1LL << port_id); + } + + /** + * add a port to the active RX port list + */ + void add_rx(uint8_t port_id) { + m_rx_active |= (1LL << port_id); + } + + void add(uint8_t port_id) { + add_tx(port_id); + add_rx(port_id); + } + + bool in_filter(const TrexPkt *pkt) { + switch (pkt->get_origin()) { + case TrexPkt::ORIGIN_TX: + return in_tx(pkt->get_port()); + + case TrexPkt::ORIGIN_RX: + return in_rx(pkt->get_port()); + + default: + return false; + } + } + + bool in_rx(uint8_t port_id) const { + uint64_t bit = (1LL << port_id); + return ((m_rx_active & bit) == bit); + } + + bool in_tx(uint8_t port_id) const { + uint64_t bit = (1LL << port_id); + return ((m_tx_active & bit) == bit); + } + + bool in_any(uint8_t port_id) const { + return ( in_tx(port_id) || in_rx(port_id) ); + } + + /** + * updates the current filter with another filter + * the result is the aggregation of TX /RX active lists + */ + CaptureFilter& operator +=(const CaptureFilter &other) { + m_tx_active |= other.m_tx_active; + m_rx_active |= other.m_rx_active; + + return *this; + } + + Json::Value to_json() const { + Json::Value output = Json::objectValue; + output["tx"] = Json::UInt64(m_tx_active); + output["rx"] = Json::UInt64(m_rx_active); + + return output; + } + +private: + + uint64_t m_tx_active; + uint64_t m_rx_active; +}; + + +/************************************** + * Capture + * + * A single instance of a capture + *************************************/ +class TrexStatelessCapture { +public: + + enum state_e { + STATE_ACTIVE, + STATE_STOPPED, + }; + + TrexStatelessCapture(capture_id_t id, + uint64_t limit, + const CaptureFilter &filter, + TrexPktBuffer::mode_e mode); + + ~TrexStatelessCapture(); + + /** + * handles a packet from the TX side + */ + void handle_pkt_tx(const TrexPkt *pkt); + + /** + * handles a packet from the RX side + */ + void handle_pkt_rx(const rte_mbuf_t *m, int port); + + + uint64_t get_id() const { + return m_id; + } + + const CaptureFilter & get_filter() const { + return m_filter; + } + + + /** + * stop the capture - from now on all packets will be ignored + * + * @author imarom (1/24/2017) + */ + void stop() { + m_state = STATE_STOPPED; + } + + TrexPktBuffer * fetch(uint32_t pkt_limit, uint32_t &pending); + + bool is_active() const { + return m_state == STATE_ACTIVE; + } + + uint32_t get_pkt_count() const { + return m_pkt_buffer->get_element_count(); + } + + dsec_t get_start_ts() const { + return m_start_ts; + } + + + Json::Value to_json() const; + +private: + state_e m_state; + TrexPktBuffer *m_pkt_buffer; + dsec_t m_start_ts; + CaptureFilter m_filter; + uint64_t m_id; + uint64_t m_pkt_index; +}; + + +/************************************** + * Capture Manager + * Handles all the captures in + * the system + * + * the design is a singleton + *************************************/ +class TrexStatelessCaptureMngr { + +public: + + static TrexStatelessCaptureMngr& getInstance() { + static TrexStatelessCaptureMngr instance; + + return instance; + } + + + ~TrexStatelessCaptureMngr() { + reset(); + } + + /** + * starts a new capture + */ + void start(const CaptureFilter &filter, + uint64_t limit, + TrexPktBuffer::mode_e mode, + TrexCaptureRCStart &rc); + + /** + * stops an existing capture + * + */ + void stop(capture_id_t capture_id, TrexCaptureRCStop &rc); + + /** + * fetch packets from an existing capture + * + */ + void fetch(capture_id_t capture_id, uint32_t pkt_limit, TrexCaptureRCFetch &rc); + + /** + * removes an existing capture + * all packets captured will be detroyed + */ + void remove(capture_id_t capture_id, TrexCaptureRCRemove &rc); + + + /** + * removes all captures + * + */ + void reset(); + + + /** + * return true if any filter is active + * on a specific port + * + * @author imarom (1/3/2017) + * + * @return bool + */ + bool is_active(uint8_t port) const { + return m_global_filter.in_any(port); + } + + /** + * handle packet from TX + */ + void handle_pkt_tx(const TrexPkt *pkt) { + if (!m_global_filter.in_filter(pkt)) { + return; + } + + handle_pkt_tx_slow_path(pkt); + } + + /** + * handle packet from RX + */ + void handle_pkt_rx(const rte_mbuf_t *m, int port) { + /* fast path - check the global filter */ + if (!m_global_filter.in_rx(port)) { + return; + } + + /* slow path */ + handle_pkt_rx_slow_path(m, port); + } + + Json::Value to_json() const; + +private: + + TrexStatelessCaptureMngr() { + /* init this to 1 */ + m_id_counter = 1; + } + + + TrexStatelessCapture * lookup(capture_id_t capture_id); + int lookup_index(capture_id_t capture_id); + + void handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port); + void handle_pkt_tx_slow_path(const TrexPkt *pkt); + + void update_global_filter(); + + std::vector<TrexStatelessCapture *> m_captures; + + capture_id_t m_id_counter; + + /* a union of all the filters curently active */ + CaptureFilter m_global_filter; + + static const int MAX_CAPTURE_SIZE = 10; +}; + +#endif /* __TREX_STATELESS_CAPTURE_H__ */ + diff --git a/src/stateless/rx/trex_stateless_capture_rc.h b/src/stateless/rx/trex_stateless_capture_rc.h new file mode 100644 index 00000000..12b37c1d --- /dev/null +++ b/src/stateless/rx/trex_stateless_capture_rc.h @@ -0,0 +1,195 @@ +/* + Itay Marom + Cisco Systems, Inc. +*/ + +/* +Copyright (c) 2015-2016 Cisco Systems, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +#ifndef __TREX_STATELESS_CAPTURE_RC_H__ +#define __TREX_STATELESS_CAPTURE_RC_H__ + +typedef int32_t capture_id_t; + +/** + * a base class for a capture command RC + * not to be used directly + */ +class TrexCaptureRC { + +protected: + /* cannot instantiate this object from outside */ + TrexCaptureRC() { + m_rc = RC_INVALID; + } + +public: + + /** + * error types for commands + */ + enum rc_e { + RC_INVALID = 0, + RC_OK = 1, + RC_CAPTURE_NOT_FOUND, + RC_CAPTURE_LIMIT_REACHED, + RC_CAPTURE_FETCH_UNDER_ACTIVE + }; + + bool operator !() const { + return (m_rc != RC_OK); + } + + std::string get_err() const { + assert(m_rc != RC_INVALID); + + switch (m_rc) { + case RC_OK: + return ""; + case RC_CAPTURE_LIMIT_REACHED: + return "capture limit has reached"; + case RC_CAPTURE_NOT_FOUND: + return "capture ID not found"; + case RC_CAPTURE_FETCH_UNDER_ACTIVE: + return "fetch command cannot be executed on an active capture"; + case RC_INVALID: + /* should never be called under invalid */ + assert(0); + + default: + assert(0); + } + } + + void set_err(rc_e rc) { + m_rc = rc; + } + + +protected: + rc_e m_rc; +}; + +/** + * return code for executing capture start + */ +class TrexCaptureRCStart : public TrexCaptureRC { +public: + + void set_rc(capture_id_t new_id, dsec_t start_ts) { + m_capture_id = new_id; + m_start_ts = start_ts; + m_rc = RC_OK; + } + + capture_id_t get_new_id() const { + assert(m_rc == RC_OK); + return m_capture_id; + } + + dsec_t get_start_ts() const { + assert(m_rc == RC_OK); + return m_start_ts; + } + +private: + capture_id_t m_capture_id; + dsec_t m_start_ts; +}; + +/** + * return code for exectuing capture stop + */ +class TrexCaptureRCStop : public TrexCaptureRC { +public: + + void set_rc(uint32_t pkt_count) { + m_pkt_count = pkt_count; + m_rc = RC_OK; + } + + uint32_t get_pkt_count() const { + assert(m_rc == RC_OK); + return m_pkt_count; + } + +private: + uint32_t m_pkt_count; +}; + +/** + * return code for executing capture fetch + */ +class TrexCaptureRCFetch : public TrexCaptureRC { +public: + + void set_rc(const TrexPktBuffer *pkt_buffer, uint32_t pending, dsec_t start_ts) { + m_pkt_buffer = pkt_buffer; + m_pending = pending; + m_start_ts = start_ts; + m_rc = RC_OK; + } + + const TrexPktBuffer *get_pkt_buffer() const { + assert(m_rc == RC_OK); + return m_pkt_buffer; + } + + uint32_t get_pending() const { + assert(m_rc == RC_OK); + return m_pending; + } + + dsec_t get_start_ts() const { + assert(m_rc == RC_OK); + return m_start_ts; + } + +private: + const TrexPktBuffer *m_pkt_buffer; + uint32_t m_pending; + dsec_t m_start_ts; +}; + + + +class TrexCaptureRCRemove : public TrexCaptureRC { +public: + void set_rc() { + m_rc = RC_OK; + } +}; + + +class TrexCaptureRCStatus : public TrexCaptureRC { +public: + + void set_rc(const Json::Value &json) { + m_json = json; + m_rc = RC_OK; + } + + const Json::Value & get_status() const { + assert(m_rc == RC_OK); + return m_json; + } + +private: + Json::Value m_json; +}; + + +#endif /* __TREX_STATELESS_CAPTURE_RC_H__ */ + diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index d27485de..00c18082 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -69,11 +69,11 @@ void CRFC2544Info::export_data(rfc2544_info_t_ &obj) { void CRxCoreStateless::create(const CRxSlCfg &cfg) { m_capture = false; m_max_ports = cfg.m_max_ports; - + m_tx_cores = cfg.m_tx_cores; + CMessagingManager * cp_rx = CMsgIns::Ins()->getCpRx(); m_ring_from_cp = cp_rx->getRingCpToDp(0); - m_ring_to_cp = cp_rx->getRingDpToCp(0); m_state = STATE_IDLE; for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) { @@ -130,6 +130,36 @@ bool CRxCoreStateless::periodic_check_for_cp_messages() { } +void +CRxCoreStateless::periodic_check_for_dp_messages() { + + for (int i = 0; i < m_tx_cores; i++) { + periodic_check_for_dp_messages_core(i); + } + +} + +void +CRxCoreStateless::periodic_check_for_dp_messages_core(uint32_t core_id) { + + CNodeRing *ring = CMsgIns::Ins()->getRxDp()->getRingDpToCp(core_id); + + /* fast path */ + if ( likely ( ring->isEmpty() ) ) { + return; + } + + while (true) { + CGenNode *node = NULL; + + if (ring->Dequeue(node) != 0) { + break; + } + + //assert(node); + } +} + void CRxCoreStateless::recalculate_next_state() { if (m_state == STATE_QUIT) { return; @@ -176,16 +206,6 @@ void CRxCoreStateless::idle_state_loop() { } /** - * for each port give a tick (for flushing if needed) - * - */ -void CRxCoreStateless::port_manager_tick() { - for (int i = 0; i < m_max_ports; i++) { - m_rx_port_mngr[i].tick(); - } -} - -/** * for each port handle the grat ARP mechansim * */ @@ -199,7 +219,6 @@ void CRxCoreStateless::handle_work_stage() { /* set the next sync time to */ dsec_t sync_time_sec = now_sec() + (1.0 / 1000); - dsec_t tick_time_sec = now_sec() + 1.0; dsec_t grat_arp_sec = now_sec() + (double)CGlobalInfo::m_options.m_arp_ref_per; while (m_state == STATE_WORKING) { @@ -211,14 +230,10 @@ void CRxCoreStateless::handle_work_stage() { if ( (now - sync_time_sec) > 0 ) { periodic_check_for_cp_messages(); + //periodic_check_for_dp_messages(); sync_time_sec = now + (1.0 / 1000); } - if ( (now - tick_time_sec) > 0) { - port_manager_tick(); - tick_time_sec = now + 1.0; - } - if ( (now - grat_arp_sec) > 0) { handle_grat_arp(); grat_arp_sec = now + (double)CGlobalInfo::m_options.m_arp_ref_per; @@ -255,10 +270,6 @@ void CRxCoreStateless::start() { m_monitor.disable(); } -void CRxCoreStateless::capture_pkt(rte_mbuf_t *m) { - -} - int CRxCoreStateless::process_all_pending_pkts(bool flush_rx) { int total_pkts = 0; @@ -318,18 +329,6 @@ double CRxCoreStateless::get_cpu_util() { void -CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit) { - m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit); - recalculate_next_state(); -} - -void -CRxCoreStateless::stop_recorder(uint8_t port_id) { - m_rx_port_mngr[port_id].stop_recorder(); - recalculate_next_state(); -} - -void CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size) { m_rx_port_mngr[port_id].start_queue(size); recalculate_next_state(); diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index 4eed59a1..954a5f04 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -27,6 +27,7 @@ #include "pal/linux/sanb_atomic.h" #include "utl_cpuu.h" #include "trex_stateless_rx_port_mngr.h" +#include "trex_stateless_capture.h" class TrexStatelessCpToRxMsgBase; @@ -127,17 +128,10 @@ class CRxCoreStateless { double get_cpu_util(); void update_cpu_util(); - const RXPacketBuffer *get_rx_queue_pkts(uint8_t port_id) { + const TrexPktBuffer *get_rx_queue_pkts(uint8_t port_id) { return m_rx_port_mngr[port_id].get_pkt_buffer(); } - - /** - * start capturing of RX packets on a specific port - * - */ - void start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit); - void stop_recorder(uint8_t port_id); - + /** * start RX queueing of packets * @@ -162,17 +156,20 @@ class CRxCoreStateless { private: void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg); + bool periodic_check_for_cp_messages(); + + void periodic_check_for_dp_messages(); + void periodic_check_for_dp_messages_core(uint32_t core_id); + void tickle(); void idle_state_loop(); void recalculate_next_state(); bool are_any_features_active(); - void capture_pkt(rte_mbuf_t *m); void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r); void handle_work_stage(); - void port_manager_tick(); void handle_grat_arp(); int process_all_pending_pkts(bool flush_rx = false); @@ -186,10 +183,10 @@ class CRxCoreStateless { private: TrexMonitor m_monitor; uint32_t m_max_ports; + uint32_t m_tx_cores; bool m_capture; state_e m_state; CNodeRing *m_ring_from_cp; - CNodeRing *m_ring_to_cp; CCpuUtlDp m_cpu_dp_u; CCpuUtlCp m_cpu_cp_u; diff --git a/src/stateless/rx/trex_stateless_rx_defs.h b/src/stateless/rx/trex_stateless_rx_defs.h index aefcc133..367cf4e3 100644 --- a/src/stateless/rx/trex_stateless_rx_defs.h +++ b/src/stateless/rx/trex_stateless_rx_defs.h @@ -38,10 +38,12 @@ class CRxSlCfg { m_max_ports = 0; m_cps = 0.0; m_num_crc_fix_bytes = 0; + m_tx_cores = 0; } public: uint32_t m_max_ports; + uint32_t m_tx_cores; double m_cps; CPortLatencyHWBase * m_ports[TREX_MAX_PORTS]; uint8_t m_num_crc_fix_bytes; diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp index caed2bee..b01665ec 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp @@ -20,48 +20,10 @@ */ #include "bp_sim.h" #include "trex_stateless_rx_port_mngr.h" -#include "common/captureFile.h" #include "trex_stateless_rx_core.h" #include "common/Network/Packet/Arp.h" #include "pkt_gen.h" - -/** - * copy MBUF to a flat buffer - * - * @author imarom (12/20/2016) - * - * @param dest - buffer with at least rte_pktmbuf_pkt_len(m) - * bytes - * @param m - MBUF to copy - * - * @return uint8_t* - */ -void copy_mbuf(uint8_t *dest, const rte_mbuf_t *m) { - - int index = 0; - for (const rte_mbuf_t *it = m; it != NULL; it = it->next) { - const uint8_t *src = rte_pktmbuf_mtod(it, const uint8_t *); - memcpy(dest + index, src, it->data_len); - index += it->data_len; - } -} - -/************************************** - * RX packet - * - *************************************/ -RXPacket::RXPacket(const rte_mbuf_t *m) { - - /* allocate buffer */ - m_size = m->pkt_len; - m_raw = new uint8_t[m_size]; - - /* copy data */ - copy_mbuf(m_raw, m); - - /* generate a packet timestamp */ - m_timestamp = now_sec(); -} +#include "trex_stateless_capture.h" /************************************** * latency RX feature @@ -231,79 +193,12 @@ RXLatency::to_json() const { * *************************************/ -RXPacketBuffer::RXPacketBuffer(uint64_t size) { - m_buffer = nullptr; - m_head = 0; - m_tail = 0; - m_size = (size + 1); // for the empty/full difference 1 slot reserved - - /* generate queue */ - m_buffer = new RXPacket*[m_size](); // zeroed -} - -RXPacketBuffer::~RXPacketBuffer() { - assert(m_buffer); - - while (!is_empty()) { - RXPacket *pkt = pop(); - delete pkt; - } - delete [] m_buffer; -} - -void -RXPacketBuffer::push(const rte_mbuf_t *m) { - /* if full - pop the oldest */ - if (is_full()) { - delete pop(); - } - - /* push packet */ - m_buffer[m_head] = new RXPacket(m); - m_head = next(m_head); -} - -RXPacket * -RXPacketBuffer::pop() { - assert(!is_empty()); - - RXPacket *pkt = m_buffer[m_tail]; - m_tail = next(m_tail); - - return pkt; -} - -uint64_t -RXPacketBuffer::get_element_count() const { - if (m_head >= m_tail) { - return (m_head - m_tail); - } else { - return ( get_capacity() - (m_tail - m_head - 1) ); - } -} - -Json::Value -RXPacketBuffer::to_json() const { - - Json::Value output = Json::arrayValue; - - int tmp = m_tail; - while (tmp != m_head) { - RXPacket *pkt = m_buffer[tmp]; - output.append(pkt->to_json()); - tmp = next(tmp); - } - - return output; -} - - void RXQueue::start(uint64_t size) { if (m_pkt_buffer) { delete m_pkt_buffer; } - m_pkt_buffer = new RXPacketBuffer(size); + m_pkt_buffer = new TrexPktBuffer(size, TrexPktBuffer::MODE_DROP_HEAD); } void @@ -314,7 +209,7 @@ RXQueue::stop() { } } -const RXPacketBuffer * +const TrexPktBuffer * RXQueue::fetch() { /* if no buffer or the buffer is empty - give a NULL one */ @@ -323,10 +218,10 @@ RXQueue::fetch() { } /* hold a pointer to the old one */ - RXPacketBuffer *old_buffer = m_pkt_buffer; + TrexPktBuffer *old_buffer = m_pkt_buffer; /* replace the old one with a new one and freeze the old */ - m_pkt_buffer = new RXPacketBuffer(old_buffer->get_capacity()); + m_pkt_buffer = new TrexPktBuffer(old_buffer->get_capacity(), old_buffer->get_mode()); return old_buffer; } @@ -348,97 +243,6 @@ RXQueue::to_json() const { return output; } -/************************************** - * RX feature recorder - * - *************************************/ - -RXPacketRecorder::RXPacketRecorder() { - m_writer = NULL; - m_count = 0; - m_limit = 0; - m_epoch = -1; - - m_pending_flush = false; -} - -void -RXPacketRecorder::start(const std::string &pcap, uint64_t limit) { - m_writer = CCapWriterFactory::CreateWriter(LIBPCAP, (char *)pcap.c_str()); - if (m_writer == NULL) { - std::stringstream ss; - ss << "unable to create PCAP file: " << pcap; - throw TrexException(ss.str()); - } - - assert(limit > 0); - - m_limit = limit; - m_count = 0; - m_pending_flush = false; - m_pcap_filename = pcap; -} - -void -RXPacketRecorder::stop() { - if (!m_writer) { - return; - } - - delete m_writer; - m_writer = NULL; -} - -void -RXPacketRecorder::flush_to_disk() { - - if (m_writer && m_pending_flush) { - m_writer->flush_to_disk(); - m_pending_flush = false; - } -} - -void -RXPacketRecorder::handle_pkt(const rte_mbuf_t *m) { - if (!m_writer) { - return; - } - - dsec_t now = now_sec(); - if (m_epoch < 0) { - m_epoch = now; - } - - dsec_t dt = now - m_epoch; - - CPktNsecTimeStamp t_c(dt); - m_pkt.time_nsec = t_c.m_time_nsec; - m_pkt.time_sec = t_c.m_time_sec; - - copy_mbuf((uint8_t *)m_pkt.raw, m); - m_pkt.pkt_len = m->pkt_len; - - m_writer->write_packet(&m_pkt); - m_count++; - m_pending_flush = true; - - if (m_count == m_limit) { - stop(); - } - -} - -Json::Value -RXPacketRecorder::to_json() const { - Json::Value output = Json::objectValue; - - output["pcap_filename"] = m_pcap_filename; - output["limit"] = Json::UInt64(m_limit); - output["count"] = Json::UInt64(m_count); - - return output; -} - /************************************** * RX feature server (ARP, ICMP) and etc. @@ -688,7 +492,7 @@ RXServer::duplicate_mbuf(const rte_mbuf_t *m) { } /* copy data */ - copy_mbuf(dest, m); + mbuf_to_buffer(dest, m); return clone_mbuf; } @@ -789,10 +593,6 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) { m_latency.handle_pkt(m); } - if (is_feature_set(RECORDER)) { - m_recorder.handle_pkt(m); - } - if (is_feature_set(QUEUE)) { m_queue.handle_pkt(m); } @@ -800,6 +600,9 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) { if (is_feature_set(SERVER)) { m_server.handle_pkt(m); } + + /* capture */ + TrexStatelessCaptureMngr::getInstance().handle_pkt_rx(m, m_port_id); } int RXPortManager::process_all_pending_pkts(bool flush_rx) { @@ -838,13 +641,6 @@ int RXPortManager::process_all_pending_pkts(bool flush_rx) { } void -RXPortManager::tick() { - if (is_feature_set(RECORDER)) { - m_recorder.flush_to_disk(); - } -} - -void RXPortManager::send_next_grat_arp() { if (is_feature_set(GRAT_ARP)) { m_grat_arp.send_next_grat_arp(); @@ -890,13 +686,6 @@ RXPortManager::to_json() const { output["latency"]["is_active"] = false; } - if (is_feature_set(RECORDER)) { - output["sniffer"] = m_recorder.to_json(); - output["sniffer"]["is_active"] = true; - } else { - output["sniffer"]["is_active"] = false; - } - if (is_feature_set(QUEUE)) { output["queue"] = m_queue.to_json(); output["queue"]["is_active"] = true; diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h index 6efdae64..b318d973 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.h +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h @@ -25,8 +25,7 @@ #include <stdint.h> #include "common/base64.h" -#include "common/captureFile.h" - +#include "trex_stateless_pkt.h" class CPortLatencyHWBase; class CRFC2544Info; @@ -80,97 +79,12 @@ public: CRxCoreErrCntrs *m_err_cntrs; }; -/** - * describes a single saved RX packet - * - */ -class RXPacket { -public: - - RXPacket(const rte_mbuf_t *m); - - /* slow path and also RVO - pass by value is ok */ - Json::Value to_json() { - Json::Value output; - output["ts"] = m_timestamp; - output["binary"] = base64_encode(m_raw, m_size); - return output; - } - - ~RXPacket() { - if (m_raw) { - delete [] m_raw; - } - } - -private: - - uint8_t *m_raw; - uint16_t m_size; - dsec_t m_timestamp; -}; - /************************************** * RX feature queue * *************************************/ -class RXPacketBuffer { -public: - - RXPacketBuffer(uint64_t size); - ~RXPacketBuffer(); - - /** - * push a packet to the buffer - * - */ - void push(const rte_mbuf_t *m); - - /** - * generate a JSON output of the queue - * - */ - Json::Value to_json() const; - - - bool is_empty() const { - return (m_head == m_tail); - } - - bool is_full() const { - return ( next(m_head) == m_tail); - } - - /** - * return the total amount of space possible - */ - uint64_t get_capacity() const { - /* one slot is used for diff between full/empty */ - return (m_size - 1); - } - - /** - * returns how many elements are in the queue - */ - uint64_t get_element_count() const; - -private: - int next(int v) const { - return ( (v + 1) % m_size ); - } - - /* pop in case of full queue - internal usage */ - RXPacket * pop(); - - int m_head; - int m_tail; - int m_size; - RXPacket **m_buffer; -}; - - class RXQueue { public: RXQueue() { @@ -191,7 +105,7 @@ public: * fetch the current buffer * return NULL if no packets */ - const RXPacketBuffer * fetch(); + const TrexPktBuffer * fetch(); /** * stop RX queue @@ -204,42 +118,7 @@ public: Json::Value to_json() const; private: - RXPacketBuffer *m_pkt_buffer; -}; - -/************************************** - * RX feature PCAP recorder - * - *************************************/ - -class RXPacketRecorder { -public: - RXPacketRecorder(); - - ~RXPacketRecorder() { - stop(); - } - - void start(const std::string &pcap, uint64_t limit); - void stop(); - void handle_pkt(const rte_mbuf_t *m); - - /** - * flush any cached packets to disk - * - */ - void flush_to_disk(); - - Json::Value to_json() const; - -private: - CFileWriterBase *m_writer; - std::string m_pcap_filename; - CCapPktRaw m_pkt; - dsec_t m_epoch; - uint64_t m_limit; - uint64_t m_count; - bool m_pending_flush; + TrexPktBuffer *m_pkt_buffer; }; @@ -311,7 +190,6 @@ public: enum feature_t { NO_FEATURES = 0x0, LATENCY = 0x1, - RECORDER = 0x2, QUEUE = 0x4, SERVER = 0x8, GRAT_ARP = 0x10, @@ -354,17 +232,6 @@ public: unset_feature(LATENCY); } - /* recorder */ - void start_recorder(const std::string &pcap, uint64_t limit_pkts) { - m_recorder.start(pcap, limit_pkts); - set_feature(RECORDER); - } - - void stop_recorder() { - m_recorder.stop(); - unset_feature(RECORDER); - } - /* queue */ void start_queue(uint32_t size) { m_queue.start(size); @@ -376,7 +243,7 @@ public: unset_feature(QUEUE); } - const RXPacketBuffer *get_pkt_buffer() { + const TrexPktBuffer *get_pkt_buffer() { if (!is_feature_set(QUEUE)) { return nullptr; } @@ -415,13 +282,6 @@ public: void handle_pkt(const rte_mbuf_t *m); /** - * maintenance - * - * @author imarom (11/24/2016) - */ - void tick(); - - /** * send next grat arp (if on) * * @author imarom (12/13/2016) @@ -445,11 +305,14 @@ public: return (m_features != NO_FEATURES); } - bool no_features_set() { return (!has_features_set()); } + bool is_feature_set(feature_t feature) const { + return ( (m_features & feature) == feature ); + } + /** * returns ignored set of stats * (grat ARP, PING response and etc.) @@ -474,15 +337,10 @@ private: void unset_feature(feature_t feature) { m_features &= (~feature); } - - bool is_feature_set(feature_t feature) const { - return ( (m_features & feature) == feature ); - } - + uint32_t m_features; uint8_t m_port_id; RXLatency m_latency; - RXPacketRecorder m_recorder; RXQueue m_queue; RXServer m_server; RXGratARP m_grat_arp; |