From 17d58dba43eeae9f1519248c1fd62e9e4d2dc302 Mon Sep 17 00:00:00 2001 From: imarom Date: Wed, 15 Feb 2017 18:26:41 +0200 Subject: TX packet capture - zero impact on fast path (using wrapper when service mode is active) Signed-off-by: imarom --- src/bp_sim.h | 7 ++ src/main_dpdk.cpp | 106 +++++++++++++-------- src/stateless/cp/trex_stateless_port.cpp | 6 +- src/stateless/dp/trex_stateless_dp_core.cpp | 86 +++++++++++++++++ src/stateless/dp/trex_stateless_dp_core.h | 19 ++-- .../messaging/trex_stateless_messaging.cpp | 18 ++++ src/stateless/messaging/trex_stateless_messaging.h | 25 ++++- src/stateless/rx/trex_stateless_capture.cpp | 61 +++++++++--- src/stateless/rx/trex_stateless_capture.h | 52 ++++++++-- 9 files changed, 313 insertions(+), 67 deletions(-) diff --git a/src/bp_sim.h b/src/bp_sim.h index 27fb8d07..22968b2c 100755 --- a/src/bp_sim.h +++ b/src/bp_sim.h @@ -319,8 +319,15 @@ public: virtual ~CVirtualIF() {} virtual int open_file(std::string file_name)=0; virtual int close_file(void)=0; + /* send one packet */ virtual int send_node(CGenNode * node)=0; + + /* by default does the same */ + virtual int send_node_service_mode(CGenNode *node) { + return send_node(node); + } + /* send one packet to a specific dir. flush all packets */ virtual void send_one_pkt(pkt_dir_t dir, rte_mbuf_t *m) {} /* flush all pending packets into the stream */ diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index f227b2d8..7371b363 100644 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -2166,10 +2166,27 @@ class CCoreEthIFStateless : public CCoreEthIF { public: virtual int send_node_flow_stat(rte_mbuf *m, CGenNodeStateless * node_sl, CCorePerPort * lp_port , CVirtualIFPerSideStats * lp_stats, bool is_const); - virtual int send_node(CGenNode * node); + + /** + * fast path version + */ + virtual int send_node(CGenNode *node); + + /** + * slow path version + */ + virtual int send_node_service_mode(CGenNode *node); + protected: - int handle_slow_path_node(CGenNode *node); - int send_pcap_node(CGenNodePCAP *pcap_node); + template inline int send_node_common(CGenNode *no); + + inline rte_mbuf_t * generate_node_pkt(CGenNodeStateless *node_sl) __attribute__ ((always_inline)); + inline int send_node_packet(CGenNodeStateless *node_sl, + rte_mbuf_t *m, + CCorePerPort *lp_port, + CVirtualIFPerSideStats *lp_stats) __attribute__ ((always_inline)); + + rte_mbuf_t * generate_slow_path_node_pkt(CGenNodeStateless *node_sl); }; bool CCoreEthIF::Create(uint8_t core_id, @@ -2311,8 +2328,6 @@ int CCoreEthIF::send_pkt(CCorePerPort * lp_port, lp_port->m_table[len]=m; len++; - TrexStatelessCaptureMngr::getInstance().handle_pkt_tx(m, lp_port->m_port->get_port_id()); - /* enough pkts to be sent */ if (unlikely(len == MAX_PKT_BURST)) { send_burst(lp_port, MAX_PKT_BURST,lp_stats); @@ -2414,25 +2429,20 @@ int CCoreEthIFStateless::send_node_flow_stat(rte_mbuf *m, CGenNodeStateless * no return 0; } -int CCoreEthIFStateless::send_node(CGenNode * no) { - /* if a node is marked as slow path - single IF to redirect it to slow path */ - if (no->get_is_slow_path()) { - return handle_slow_path_node(no); +inline rte_mbuf_t * +CCoreEthIFStateless::generate_node_pkt(CGenNodeStateless *node_sl) { + if (unlikely(node_sl->get_is_slow_path())) { + return generate_slow_path_node_pkt(node_sl); } - - CGenNodeStateless * node_sl=(CGenNodeStateless *) no; - + /* check that we have mbuf */ - rte_mbuf_t * m; - - pkt_dir_t dir=(pkt_dir_t)node_sl->get_mbuf_cache_dir(); - CCorePerPort * lp_port=&m_ports[dir]; - CVirtualIFPerSideStats * lp_stats = &m_stats[dir]; + rte_mbuf_t *m; + if ( likely(node_sl->is_cache_mbuf_array()) ) { - m=node_sl->cache_mbuf_array_get_cur(); + m = node_sl->cache_mbuf_array_get_cur(); rte_pktmbuf_refcnt_update(m,1); }else{ - m=node_sl->get_cache_mbuf(); + m = node_sl->get_cache_mbuf(); if (m) { /* cache case */ @@ -2443,6 +2453,15 @@ int CCoreEthIFStateless::send_node(CGenNode * no) { } } + return m; +} + +inline int +CCoreEthIFStateless::send_node_packet(CGenNodeStateless *node_sl, + rte_mbuf_t *m, + CCorePerPort *lp_port, + CVirtualIFPerSideStats *lp_stats) { + if (unlikely(node_sl->is_stat_needed())) { if ( unlikely(node_sl->is_cache_mbuf_array()) ) { // No support for latency + cache. If user asks for cache on latency stream, we change cache to 0. @@ -2451,38 +2470,49 @@ int CCoreEthIFStateless::send_node(CGenNode * no) { } return send_node_flow_stat(m, node_sl, lp_port, lp_stats, (node_sl->get_cache_mbuf()) ? true : false); } else { - send_pkt(lp_port,m,lp_stats); - } - - return (0); -}; - -int CCoreEthIFStateless::send_pcap_node(CGenNodePCAP *pcap_node) { - rte_mbuf_t *m = pcap_node->get_pkt(); - if (!m) { - return (-1); + return send_pkt(lp_port, m, lp_stats); } +} - pkt_dir_t dir = (pkt_dir_t)pcap_node->get_mbuf_dir(); - CCorePerPort *lp_port=&m_ports[dir]; - CVirtualIFPerSideStats *lp_stats = &m_stats[dir]; +int CCoreEthIFStateless::send_node(CGenNode *no) { + return send_node_common(no); +} - send_pkt(lp_port, m, lp_stats); +int CCoreEthIFStateless::send_node_service_mode(CGenNode *no) { + return send_node_common(no); +} - return (0); +template +int CCoreEthIFStateless::send_node_common(CGenNode *no) { + CGenNodeStateless * node_sl = (CGenNodeStateless *) no; + + pkt_dir_t dir = (pkt_dir_t)node_sl->get_mbuf_cache_dir(); + CCorePerPort *lp_port = &m_ports[dir]; + CVirtualIFPerSideStats *lp_stats = &m_stats[dir]; + + rte_mbuf_t *m = generate_node_pkt(node_sl); + + /* template boolean - this will be removed at compile time */ + if (SERVICE_MODE) { + TrexStatelessCaptureMngr::getInstance().handle_pkt_tx(m, lp_port->m_port->get_port_id()); + } + + return send_node_packet(node_sl, m, lp_port, lp_stats); } /** * slow path code goes here * */ -int CCoreEthIFStateless::handle_slow_path_node(CGenNode * no) { +rte_mbuf_t * +CCoreEthIFStateless::generate_slow_path_node_pkt(CGenNodeStateless *node_sl) { - if (no->m_type == CGenNode::PCAP_PKT) { - return send_pcap_node((CGenNodePCAP *)no); + if (node_sl->m_type == CGenNode::PCAP_PKT) { + CGenNodePCAP *pcap_node = (CGenNodePCAP *)node_sl; + return pcap_node->get_pkt(); } - return (-1); + return (NULL); } void CCoreEthIF::apply_client_cfg(const ClientCfgBase *cfg, rte_mbuf_t *m, pkt_dir_t dir, uint8_t *p) { diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index b0366fb5..598577cc 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -989,7 +989,7 @@ TrexStatelessPort::set_service_mode(bool enabled) { getPortAttrObj()->set_rx_filter_mode(RX_FILTER_MODE_HW); } m_is_service_mode_on = enabled; - return; + break; case TrexStatelessRxQuery::RC_FAIL_RX_QUEUE_ACTIVE: throw TrexException("unable to disable service mode - please remove RX queue"); @@ -1000,6 +1000,10 @@ TrexStatelessPort::set_service_mode(bool enabled) { default: assert(0); } + + /* update the dp cores */ + TrexStatelessDpServiceMode *dp_msg = new TrexStatelessDpServiceMode(m_port_id, enabled); + send_message_to_all_dp(dp_msg); } diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp index d8563e95..56184aeb 100644 --- a/src/stateless/dp/trex_stateless_dp_core.cpp +++ b/src/stateless/dp/trex_stateless_dp_core.cpp @@ -28,6 +28,53 @@ limitations under the License. #include "mbuf.h" +class DPCoreWrapper : public CVirtualIF { +public: + + DPCoreWrapper() { + m_wrapped = nullptr; + } + + void set_wrapped_object(CVirtualIF *wrapped) { + m_wrapped = wrapped; + } + + CVirtualIF *get_wrapped_object() const { + return m_wrapped; + } + + virtual int close_file(void) { + return m_wrapped->close_file(); + } + + virtual int flush_tx_queue(void) { + return m_wrapped->flush_tx_queue(); + } + + virtual int open_file(std::string file_name) { + return m_wrapped->open_file(file_name); + } + + /* move to service mode */ + virtual int send_node(CGenNode *node) { + return m_wrapped->send_node_service_mode(node); + } + + virtual int update_mac_addr_from_global_cfg(pkt_dir_t dir, uint8_t *p) { + return m_wrapped->update_mac_addr_from_global_cfg(dir, p); + } + + virtual pkt_dir_t port_id_to_dir(uint8_t port_id) { + return m_wrapped->port_id_to_dir(port_id); + } + + virtual void send_one_pkt(pkt_dir_t dir, rte_mbuf_t *m) { + m_wrapped->send_one_pkt(dir, m); + } + +private: + CVirtualIF *m_wrapped; +}; void CGenNodeStateless::cache_mbuf_array_init(){ @@ -592,6 +639,18 @@ void TrexStatelessDpPerPort::create(CFlowGenListPerThread * core){ } +TrexStatelessDpCore::TrexStatelessDpCore() { + m_thread_id = 0; + m_core = NULL; + m_duration = -1; + m_is_service_mode = NULL; + m_wrapper = new DPCoreWrapper(); +} + +TrexStatelessDpCore::~TrexStatelessDpCore() { + delete m_wrapper; +} + void TrexStatelessDpCore::create(uint8_t thread_id, CFlowGenListPerThread *core) { @@ -717,6 +776,7 @@ void TrexStatelessDpCore::quit_main_loop(){ */ void TrexStatelessDpCore::start_scheduler() { + /* creates a maintenace job using the scheduler */ CGenNode * node_sync = m_core->create_node() ; node_sync->m_type = CGenNode::FLOW_SYNC; @@ -1255,6 +1315,32 @@ TrexStatelessDpCore::barrier(uint8_t port_id, int event_id) { ring->Enqueue((CGenNode *)event_msg); } +void +TrexStatelessDpCore::set_service_mode(uint8_t port_id, bool enabled) { + /* ignore the same message */ + if (enabled == m_is_service_mode) { + return; + } + + if (enabled) { + /* sanity */ + assert(m_core->m_node_gen.m_v_if != m_wrapper); + + /* set the wrapper object and make the VIF point to it */ + m_wrapper->set_wrapped_object(m_core->m_node_gen.m_v_if); + m_core->m_node_gen.m_v_if = m_wrapper; + m_is_service_mode = true; + + } else { + /* sanity */ + assert(m_core->m_node_gen.m_v_if == m_wrapper); + + /* restore the wrapped object and make the VIF point to it */ + m_core->m_node_gen.m_v_if = m_wrapper->get_wrapped_object(); + m_is_service_mode = false; + } +} + /** * PCAP node diff --git a/src/stateless/dp/trex_stateless_dp_core.h b/src/stateless/dp/trex_stateless_dp_core.h index e880a6eb..93128906 100644 --- a/src/stateless/dp/trex_stateless_dp_core.h +++ b/src/stateless/dp/trex_stateless_dp_core.h @@ -34,6 +34,7 @@ class CGenNodeStateless; class TrexStreamsCompiledObj; class TrexStream; class CGenNodePCAP; +class DPCoreWrapper; class CDpOneStream { public: @@ -116,6 +117,7 @@ public: /* for now */ #define NUM_PORTS_PER_CORE 2 + class TrexStatelessDpCore { public: @@ -131,12 +133,10 @@ public: }; - TrexStatelessDpCore() { - m_thread_id = 0; - m_core = NULL; - m_duration = -1; - } - + TrexStatelessDpCore(); + ~TrexStatelessDpCore(); + + /** * "static constructor" * @@ -273,6 +273,10 @@ public: return get_port_db(port_id)->is_active(); } + /** + * enabled/disable service mode + */ + void set_service_mode(uint8_t port_id, bool enabled); private: @@ -335,6 +339,9 @@ private: CFlowGenListPerThread * m_core; double m_duration; + + DPCoreWrapper *m_wrapper; + bool m_is_service_mode; }; #endif /* __TREX_STATELESS_DP_CORE_H__ */ diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index f89ca343..0a3fbfde 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -233,6 +233,24 @@ TrexStatelessDpBarrier::clone() { return new_msg; } +/************************* + service mode message + ************************/ + +bool +TrexStatelessDpServiceMode::handle(TrexStatelessDpCore *dp_core) { + dp_core->set_service_mode(m_port_id, m_enabled); + return true; +} + +TrexStatelessCpToDpMsgBase * +TrexStatelessDpServiceMode::clone() { + + TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpServiceMode(m_port_id, m_enabled); + + return new_msg; +} + /************************* messages from DP to CP **********************/ bool TrexDpPortEventMsg::handle() { diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index cd79d6e7..7871a754 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -140,7 +140,7 @@ protected: class TrexStatelessDpStart : public TrexStatelessCpToDpMsgBase { public: - TrexStatelessDpStart(uint8_t m_port_id, int m_event_id, TrexStreamsCompiledObj *obj, double duration); + TrexStatelessDpStart(uint8_t port_id, int event_id, TrexStreamsCompiledObj *obj, double duration); ~TrexStatelessDpStart(); @@ -369,6 +369,29 @@ private: }; +/** + * move a DP core in/out of service mode (slower as it might do + * capturing and etc.) + * + */ +class TrexStatelessDpServiceMode : public TrexStatelessCpToDpMsgBase { +public: + + TrexStatelessDpServiceMode(uint8_t port_id, bool enabled) { + m_port_id = port_id; + m_enabled = enabled; + } + + virtual TrexStatelessCpToDpMsgBase * clone(); + + virtual bool handle(TrexStatelessDpCore *dp_core); + +private: + + uint8_t m_port_id; + bool m_enabled; +}; + /************************* messages from DP to CP **********************/ /** diff --git a/src/stateless/rx/trex_stateless_capture.cpp b/src/stateless/rx/trex_stateless_capture.cpp index d9813cac..3b0273a4 100644 --- a/src/stateless/rx/trex_stateless_capture.cpp +++ b/src/stateless/rx/trex_stateless_capture.cpp @@ -46,7 +46,7 @@ TrexStatelessCapture::~TrexStatelessCapture() { void TrexStatelessCapture::handle_pkt(const rte_mbuf_t *m, int port, TrexPkt::origin_e origin) { - + if (m_state != STATE_ACTIVE) { return; } @@ -127,7 +127,7 @@ TrexStatelessCaptureMngr::update_global_filter() { for (TrexStatelessCapture *capture : m_captures) { new_filter += capture->get_filter(); } - + m_global_filter = new_filter; } @@ -136,7 +136,7 @@ TrexStatelessCaptureMngr::update_global_filter() { * lookup a specific capture by ID */ TrexStatelessCapture * -TrexStatelessCaptureMngr::lookup(capture_id_t capture_id) { +TrexStatelessCaptureMngr::lookup(capture_id_t capture_id) const { for (int i = 0; i < m_captures.size(); i++) { if (m_captures[i]->get_id() == capture_id) { @@ -150,7 +150,7 @@ TrexStatelessCaptureMngr::lookup(capture_id_t capture_id) { int -TrexStatelessCaptureMngr::lookup_index(capture_id_t capture_id) { +TrexStatelessCaptureMngr::lookup_index(capture_id_t capture_id) const { for (int i = 0; i < m_captures.size(); i++) { if (m_captures[i]->get_id() == capture_id) { return i; @@ -179,11 +179,20 @@ TrexStatelessCaptureMngr::start(const CaptureFilter &filter, /* create a new capture*/ int new_id = m_id_counter++; TrexStatelessCapture *new_capture = new TrexStatelessCapture(new_id, limit, filter, mode); + + /** + * add the new capture in a safe mode + * (TX might be active) + */ + std::unique_lock ulock(m_lock); m_captures.push_back(new_capture); - + /* update global filter */ update_global_filter(); + /* done with critical section */ + ulock.unlock(); + /* result */ rc.set_rc(new_id, new_capture->get_start_ts()); } @@ -196,7 +205,9 @@ TrexStatelessCaptureMngr::stop(capture_id_t capture_id, TrexCaptureRCStop &rc) { return; } + std::unique_lock ulock(m_lock); capture->stop(); + rc.set_rc(capture->get_pkt_count()); } @@ -210,7 +221,11 @@ TrexStatelessCaptureMngr::fetch(capture_id_t capture_id, uint32_t pkt_limit, Tre } uint32_t pending = 0; + + /* take a lock before fetching all the packets */ + std::unique_lock ulock(m_lock); TrexPktBuffer *pkt_buffer = capture->fetch(pkt_limit, pending); + ulock.unlock(); rc.set_rc(pkt_buffer, pending, capture->get_start_ts()); } @@ -226,14 +241,21 @@ TrexStatelessCaptureMngr::remove(capture_id_t capture_id, TrexCaptureRCRemove &r } TrexStatelessCapture *capture = m_captures[index]; - m_captures.erase(m_captures.begin() + index); - /* free memory */ - delete capture; + /* remove from list under lock */ + std::unique_lock ulock(m_lock); - /* update global filter */ + m_captures.erase(m_captures.begin() + index); + + /* update global filter under lock (for barrier) */ update_global_filter(); + /* done with critical section */ + ulock.unlock(); + + /* free memory */ + delete capture; + rc.set_rc(); } @@ -247,24 +269,39 @@ TrexStatelessCaptureMngr::reset() { } } +//#define STRESS_TEST void TrexStatelessCaptureMngr::handle_pkt_slow_path(const rte_mbuf_t *m, int port, TrexPkt::origin_e origin) { - std::unique_lock lock(m_lock); + + #ifdef STRESS_TEST + static int sanity = 0; + assert(__sync_fetch_and_add(&sanity, 1) == 0); + #endif for (TrexStatelessCapture *capture : m_captures) { capture->handle_pkt(m, port, origin); } + + #ifdef STRESS_TEST + assert(__sync_fetch_and_sub(&sanity, 1) == 1); + #endif } + Json::Value -TrexStatelessCaptureMngr::to_json() const { +TrexStatelessCaptureMngr::to_json() { Json::Value lst = Json::arrayValue; - + + std::unique_lock ulock(m_lock); + for (TrexStatelessCapture *capture : m_captures) { lst.append(capture->to_json()); } + ulock.unlock(); + return lst; } TrexStatelessCaptureMngr TrexStatelessCaptureMngr::g_instance; + diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h index 6288ac56..8af2510e 100644 --- a/src/stateless/rx/trex_stateless_capture.h +++ b/src/stateless/rx/trex_stateless_capture.h @@ -101,6 +101,14 @@ public: return output; } + uint64_t get_tx_active_map() const { + return m_tx_active; + } + + uint64_t get_rx_active_map() const { + return m_rx_active; + } + private: uint64_t m_tx_active; @@ -247,30 +255,56 @@ public: } /** - * handle packet on TX side + * handle packet on TX side + * always with a lock */ inline void handle_pkt_tx(const rte_mbuf_t *m, int port) { + + /* fast path */ + if (likely(!m_global_filter.in_tx(port))) { + return; + } - /* fast bail out IF */ - if (unlikely(m_global_filter.in_tx(port))) { + /* TX core always locks */ + std::unique_lock ulock(m_lock); + + /* check again the global filter (because of RX fast path might not lock) */ + if (m_global_filter.in_tx(port)) { handle_pkt_slow_path(m, port, TrexPkt::ORIGIN_TX); } + ulock.unlock(); + } /** - * handle packet on RX side + * handle packet on RX side + * RX side might or might not use a lock - depends if there are + * other TX cores being captured */ inline void handle_pkt_rx(const rte_mbuf_t *m, int port) { + /* fast path */ + if (likely(!m_global_filter.in_rx(port))) { + return; + } + + /* create a RAII object lock but do not lock yet */ + std::unique_lock ulock(m_lock, std::defer_lock); + + /* if we are not alone - lock */ + if (m_global_filter.get_tx_active_map() != 0) { + ulock.lock(); + } + /* fast bail out IF */ - if (unlikely(m_global_filter.in_rx(port))) { + if (m_global_filter.in_rx(port)) { handle_pkt_slow_path(m, port, TrexPkt::ORIGIN_RX); } } - Json::Value to_json() const; + Json::Value to_json(); private: @@ -280,10 +314,10 @@ private: } - TrexStatelessCapture * lookup(capture_id_t capture_id); - int lookup_index(capture_id_t capture_id); + TrexStatelessCapture * lookup(capture_id_t capture_id) const; + int lookup_index(capture_id_t capture_id) const; - void handle_pkt_slow_path(const rte_mbuf_t *m, int port, TrexPkt::origin_e origin) __attribute__ ((noinline)); + void handle_pkt_slow_path(const rte_mbuf_t *m, int port, TrexPkt::origin_e origin); void update_global_filter(); -- cgit 1.2.3-korg