From 537f5831c4400dea7fa15032c4cd6bd2fae86bb1 Mon Sep 17 00:00:00 2001 From: imarom Date: Mon, 28 Nov 2016 11:25:32 +0200 Subject: RX features - self code review Signed-off-by: imarom --- src/stateless/cp/trex_stateless_port.cpp | 29 ++-- .../messaging/trex_stateless_messaging.cpp | 15 ++- src/stateless/messaging/trex_stateless_messaging.h | 100 +++++++------- src/stateless/rx/trex_stateless_rx_core.cpp | 32 ++++- src/stateless/rx/trex_stateless_rx_core.h | 7 +- src/stateless/rx/trex_stateless_rx_port_mngr.cpp | 90 +++++++++++-- src/stateless/rx/trex_stateless_rx_port_mngr.h | 147 ++++++++++++--------- 7 files changed, 282 insertions(+), 138 deletions(-) (limited to 'src/stateless') diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index 75530ea3..ca185754 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -939,14 +939,19 @@ TrexStatelessPort::start_rx_capture(const std::string &pcap_filename, uint64_t l m_rx_features_info.m_rx_capture_info.enable(pcap_filename, limit); - TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStartCapture(m_port_id, m_rx_features_info.m_rx_capture_info); - send_message_to_rx(msg); + TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(m_port_id, m_rx_features_info.m_rx_capture_info); + send_message_to_rx((TrexStatelessCpToRxMsgBase *)msg); + + /* as below, must wait for ACK from RX core before returning ACK */ + msg->wait_for_reply(); } void TrexStatelessPort::stop_rx_capture() { TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopCapture(m_port_id); send_message_to_rx(msg); + + /* update our cached data */ m_rx_features_info.m_rx_capture_info.disable(); } @@ -955,14 +960,21 @@ TrexStatelessPort::start_rx_queue(uint64_t size) { m_rx_features_info.m_rx_queue_info.enable(size); - TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStartQueue(m_port_id, m_rx_features_info.m_rx_queue_info); - send_message_to_rx(msg); + TrexStatelessRxStartQueue *msg = new TrexStatelessRxStartQueue(m_port_id, m_rx_features_info.m_rx_queue_info); + send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg ); + + /* we cannot return ACK to the user until the RX core has approved + this might cause the user to lose some packets from the queue + */ + msg->wait_for_reply(); } void TrexStatelessPort::stop_rx_queue() { TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopQueue(m_port_id); send_message_to_rx(msg); + + /* update our cached data */ m_rx_features_info.m_rx_queue_info.disable(); } @@ -974,13 +986,10 @@ TrexStatelessPort::get_rx_queue_pkts() { return NULL; } - /* ask RX core for the pkt queue */ - TrexStatelessMsgReply msg_reply; - - TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxQueueGetPkts(m_port_id, msg_reply); - send_message_to_rx(msg); + TrexStatelessRxQueueGetPkts *msg = new TrexStatelessRxQueueGetPkts(m_port_id); + send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg ); - RXPacketBuffer *pkt_buffer = msg_reply.wait_for_reply(); + RXPacketBuffer *pkt_buffer = msg->wait_for_reply(); return pkt_buffer; } diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index a8fb7ba9..cad4fe7a 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -259,14 +259,16 @@ bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) { bool TrexStatelessRxStartCapture::handle(CRxCoreStateless *rx_core) { - rx_core->start_capture(m_port_id, m_pcap_filename, m_limit, m_shared_counter); + rx_core->start_recorder(m_port_id, m_pcap_filename, m_limit, m_shared_counter); + set_reply(true); + return true; } bool TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) { - rx_core->stop_capture(m_port_id); + rx_core->stop_recorder(m_port_id); return true; } @@ -274,7 +276,10 @@ TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) { bool TrexStatelessRxStartQueue::handle(CRxCoreStateless *rx_core) { rx_core->start_queue(m_port_id, m_size, m_shared_counter); - + + /* mark as done */ + set_reply(true); + return true; } @@ -290,7 +295,9 @@ TrexStatelessRxStopQueue::handle(CRxCoreStateless *rx_core) { bool TrexStatelessRxQueueGetPkts::handle(CRxCoreStateless *rx_core) { RXPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id); assert(pkt_buffer); - m_reply.set(pkt_buffer); + + /* set the reply */ + set_reply(pkt_buffer); return true; } diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index ed2ec90e..b1d9117f 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -408,6 +408,53 @@ public: }; +/** + * defines the base class for CP to RX with reply + * + * @author imarom (11/27/2016) + */ +template class TrexStatelessCpToRxMsgReply : public TrexStatelessCpToRxMsgBase { + +public: + + TrexStatelessCpToRxMsgReply() { + m_pending = true; + } + + bool is_pending() const { + return m_pending; + } + + void set_reply(const T &reply) { + m_reply = reply; + + /* before marking as done - memory fence */ + asm volatile("mfence" ::: "memory"); + m_pending = false; + } + + T wait_for_reply(int timeout_ms = 100, int backoff_ms = 1) { + int guard = timeout_ms; + + while (is_pending()) { + guard -= backoff_ms; + if (guard < 0) { + throw TrexException("timeout: RX core has failed to reply"); + } + + delay(backoff_ms); + } + + return m_reply; + + } + +protected: + volatile bool m_pending; + T m_reply; +}; + + class TrexStatelessRxEnableLatency : public TrexStatelessCpToRxMsgBase { bool handle (CRxCoreStateless *rx_core); }; @@ -421,7 +468,8 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase { }; -class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase { + +class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgReply { public: TrexStatelessRxStartCapture(uint8_t port_id, RXCaptureInfo &rx_capture_info) { m_port_id = port_id; @@ -452,7 +500,8 @@ private: uint8_t m_port_id; }; -class TrexStatelessRxStartQueue : public TrexStatelessCpToRxMsgBase { + +class TrexStatelessRxStartQueue : public TrexStatelessCpToRxMsgReply { public: TrexStatelessRxStartQueue(uint8_t port_id, RXQueueInfo &rx_queue_info) { m_port_id = port_id; @@ -468,6 +517,7 @@ private: uint64_t *m_shared_counter; }; + class TrexStatelessRxStopQueue : public TrexStatelessCpToRxMsgBase { public: TrexStatelessRxStopQueue(uint8_t port_id) { @@ -481,50 +531,11 @@ private: }; -template class TrexStatelessMsgReply { -public: - TrexStatelessMsgReply() { - m_pending = true; - } - - bool is_pending() const { - return m_pending; - } - - void set(T reply) { - m_reply = reply; - - /* before marking as done - memory fence */ - asm volatile("mfence" ::: "memory"); - m_pending = false; - } - - T wait_for_reply(int timeout_ms = 100, int backoff_ms = 1) { - int guard = timeout_ms; - - while (is_pending()) { - guard -= backoff_ms; - if (guard < 0) { - throw TrexException("timeout: RX core has failed to reply"); - } - - delay(backoff_ms); - - } - return m_reply; - - } -private: - bool m_pending; - T m_reply; -}; - - -class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgBase { +class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgReply { public: - TrexStatelessRxQueueGetPkts(uint8_t port_id, TrexStatelessMsgReply &reply) : m_reply(reply) { + TrexStatelessRxQueueGetPkts(uint8_t port_id) { m_port_id = port_id; } @@ -535,8 +546,7 @@ public: virtual bool handle(CRxCoreStateless *rx_core); private: - uint8_t m_port_id; - TrexStatelessMsgReply &m_reply; + uint8_t m_port_id; }; diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index 2a678365..a1ff9c6a 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -82,7 +82,7 @@ void CRxCoreStateless::create(const CRxSlCfg &cfg) { m_cpu_cp_u.Create(&m_cpu_dp_u); - /* init per port manager */ + /* create per port manager */ for (int i = 0; i < m_max_ports; i++) { m_rx_port_mngr[i].create(cfg.m_ports[i], m_rfc2544, @@ -171,9 +171,20 @@ void CRxCoreStateless::idle_state_loop() { } } +/** + * for each port give a tick (for flushing if needed) + * + */ +void CRxCoreStateless::port_manager_tick() { + for (int i = 0; i < m_max_ports; i++) { + m_rx_port_mngr[i].tick(); + } +} + void CRxCoreStateless::handle_work_stage(bool do_try_rx_queue) { int i = 0; - + int j = 0; + while (m_state == STATE_WORKING) { if (do_try_rx_queue) { @@ -182,12 +193,19 @@ void CRxCoreStateless::handle_work_stage(bool do_try_rx_queue) { process_all_pending_pkts(); + /* TODO: with scheduler, this should be solved better */ i++; if (i == 100000) { // approx 10msec i = 0; periodic_check_for_cp_messages(); // m_state might change in here + + j++; + if (j == 100) { // approx 1 sec + j = 0; + port_manager_tick(); + } } - + rte_pause(); } } @@ -356,13 +374,13 @@ double CRxCoreStateless::get_cpu_util() { void -CRxCoreStateless::start_capture(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter) { - m_rx_port_mngr[port_id].start_capture(pcap_filename, limit, shared_counter); +CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter) { + m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit, shared_counter); } void -CRxCoreStateless::stop_capture(uint8_t port_id) { - m_rx_port_mngr[port_id].stop_capture(); +CRxCoreStateless::stop_recorder(uint8_t port_id) { + m_rx_port_mngr[port_id].stop_recorder(); } void diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index 9df36310..8e50a46e 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -124,8 +124,8 @@ class CRxCoreStateless { * @param pcap_filename * @param limit */ - void start_capture(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter); - void stop_capture(uint8_t port_id); + void start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter); + void stop_recorder(uint8_t port_id); /** * start RX queueing of packets @@ -153,7 +153,8 @@ class CRxCoreStateless { void capture_pkt(rte_mbuf_t *m); void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r); void handle_work_stage(bool do_try_rx_queue); - + void port_manager_tick(); + int process_all_pending_pkts(bool flush_rx = false); void flush_all_pending_pkts() { diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp index 46fec432..78f4ac5c 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp @@ -200,7 +200,7 @@ RXPacketBuffer::freeze_and_clone() { } void -RXPacketBuffer::handle_pkt(const rte_mbuf_t *m) { +RXPacketBuffer::push(const rte_mbuf_t *m) { assert(m_is_enabled); /* if full - pop the oldest */ @@ -242,7 +242,48 @@ RXPacketBuffer::to_json() const { return output; } -/****************************** packet recorder ****************************/ + +void +RXQueue::start(uint64_t size, uint64_t *shared_counter) { + if (m_pkt_buffer) { + delete m_pkt_buffer; + } + m_pkt_buffer = new RXPacketBuffer(size, shared_counter); +} + +void +RXQueue::stop() { + if (m_pkt_buffer) { + delete m_pkt_buffer; + m_pkt_buffer = NULL; + } +} + +RXPacketBuffer * +RXQueue::fetch() { + + if (!m_pkt_buffer) { + return nullptr; + } + + /* hold a pointer to the old one */ + RXPacketBuffer *old_buffer = m_pkt_buffer; + + /* replace the old one with a new one and freeze the old */ + m_pkt_buffer = old_buffer->freeze_and_clone(); + + return old_buffer; +} + +void +RXQueue::handle_pkt(const rte_mbuf_t *m) { + m_pkt_buffer->push(m); +} + +/************************************** + * RX feature recorder + * + *************************************/ RXPacketRecorder::RXPacketRecorder() { m_writer = NULL; @@ -251,10 +292,6 @@ RXPacketRecorder::RXPacketRecorder() { m_epoch = -1; } -RXPacketRecorder::~RXPacketRecorder() { - stop(); -} - void RXPacketRecorder::start(const std::string &pcap, uint64_t limit, uint64_t *shared_counter) { m_writer = CCapWriterFactory::CreateWriter(LIBPCAP, (char *)pcap.c_str()); @@ -278,6 +315,13 @@ RXPacketRecorder::stop() { } } +void +RXPacketRecorder::flush_to_disk() { + if (m_writer) { + m_writer->flush_to_disk(); + } +} + void RXPacketRecorder::handle_pkt(const rte_mbuf_t *m) { if (!m_writer) { @@ -310,6 +354,30 @@ RXPacketRecorder::handle_pkt(const rte_mbuf_t *m) { } +/************************************** + * Port manager + * + *************************************/ + +RXPortManager::RXPortManager() { + clear_all_features(); + m_io = NULL; + m_cpu_dp_u = NULL; +} + + +void +RXPortManager::create(CPortLatencyHWBase *io, + CRFC2544Info *rfc2544, + CRxCoreErrCntrs *err_cntrs, + CCpuUtlDp *cpu_util) { + m_io = io; + m_cpu_dp_u = cpu_util; + + /* init features */ + m_latency.create(rfc2544, err_cntrs); +} + void RXPortManager::handle_pkt(const rte_mbuf_t *m) { /* handle features */ @@ -318,12 +386,12 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) { m_latency.handle_pkt(m); } - if (is_feature_set(CAPTURE)) { + if (is_feature_set(RECORDER)) { m_recorder.handle_pkt(m); } if (is_feature_set(QUEUE)) { - m_pkt_buffer->handle_pkt(m); + m_queue.handle_pkt(m); } } @@ -358,3 +426,9 @@ int RXPortManager::process_all_pending_pkts(bool flush_rx) { return cnt_p; } +void +RXPortManager::tick() { + if (is_feature_set(RECORDER)) { + m_recorder.flush_to_disk(); + } +} diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h index 9df42039..564b15d4 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.h +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h @@ -127,10 +127,10 @@ public: ~RXPacketBuffer(); /** - * handle a new packet + * push a packet to the buffer * */ - void handle_pkt(const rte_mbuf_t *m); + void push(const rte_mbuf_t *m); /** * freezes the queue and clones a new one @@ -170,6 +170,40 @@ private: }; +class RXQueue { +public: + RXQueue() { + m_pkt_buffer = nullptr; + } + + ~RXQueue() { + stop(); + } + + /** + * start RX queue + * + */ + void start(uint64_t size, uint64_t *shared_counter); + + /** + * fetch the current buffer + * + */ + RXPacketBuffer * fetch(); + + /** + * stop RX queue + * + */ + void stop(); + + void handle_pkt(const rte_mbuf_t *m); + +private: + RXPacketBuffer *m_pkt_buffer; +}; + /************************************** * RX feature PCAP recorder * @@ -178,12 +212,21 @@ private: class RXPacketRecorder { public: RXPacketRecorder(); - ~RXPacketRecorder(); + + ~RXPacketRecorder() { + stop(); + } void start(const std::string &pcap, uint64_t limit, uint64_t *shared_counter); void stop(); void handle_pkt(const rte_mbuf_t *m); + /** + * flush any cached packets to disk + * + */ + void flush_to_disk(); + private: CFileWriterBase *m_writer; CCapPktRaw m_pkt; @@ -202,27 +245,19 @@ private: */ class RXPortManager { public: - enum features_t { - LATENCY = 0x1, - CAPTURE = 0x2, - QUEUE = 0x4 + enum feature_t { + NO_FEATURES = 0x0, + LATENCY = 0x1, + RECORDER = 0x2, + QUEUE = 0x4 }; - RXPortManager() { - m_features = 0; - m_pkt_buffer = NULL; - m_io = NULL; - m_cpu_dp_u = NULL; - } + RXPortManager(); void create(CPortLatencyHWBase *io, CRFC2544Info *rfc2544, CRxCoreErrCntrs *err_cntrs, - CCpuUtlDp *cpu_util) { - m_io = io; - m_cpu_dp_u = cpu_util; - m_latency.create(rfc2544, err_cntrs); - } + CCpuUtlDp *cpu_util); void clear_stats() { m_latency.reset_stats(); @@ -232,6 +267,7 @@ public: return m_latency; } + /* latency */ void enable_latency() { set_feature(LATENCY); } @@ -240,60 +276,37 @@ public: unset_feature(LATENCY); } - /** - * capturing of RX packets - * - * @author imarom (11/2/2016) - * - * @param pcap - * @param limit_pkts - */ - void start_capture(const std::string &pcap, uint64_t limit_pkts, uint64_t *shared_counter) { + /* recorder */ + void start_recorder(const std::string &pcap, uint64_t limit_pkts, uint64_t *shared_counter) { m_recorder.start(pcap, limit_pkts, shared_counter); - set_feature(CAPTURE); + set_feature(RECORDER); } - void stop_capture() { + void stop_recorder() { m_recorder.stop(); - unset_feature(CAPTURE); + unset_feature(RECORDER); } - /** - * queueing packets - * - */ + /* queue */ void start_queue(uint32_t size, uint64_t *shared_counter) { - if (m_pkt_buffer) { - delete m_pkt_buffer; - } - m_pkt_buffer = new RXPacketBuffer(size, shared_counter); + m_queue.start(size, shared_counter); set_feature(QUEUE); } void stop_queue() { - if (m_pkt_buffer) { - delete m_pkt_buffer; - m_pkt_buffer = NULL; - } - unset_feature(QUEUE); + m_queue.stop(); + unset_feature(QUEUE); } RXPacketBuffer *get_pkt_buffer() { if (!is_feature_set(QUEUE)) { - return NULL; + return nullptr; } - - assert(m_pkt_buffer); - - /* hold a pointer to the old one */ - RXPacketBuffer *old_buffer = m_pkt_buffer; - - /* replace the old one with a new one and freeze the old */ - m_pkt_buffer = old_buffer->freeze_and_clone(); - - return old_buffer; + + return m_queue.fetch(); } + /** * fetch and process all packets @@ -317,9 +330,15 @@ public: */ void handle_pkt(const rte_mbuf_t *m); - + /** + * maintenance + * + * @author imarom (11/24/2016) + */ + void tick(); + bool has_features_set() { - return (m_features != 0); + return (m_features != NO_FEATURES); } @@ -329,23 +348,29 @@ public: private: + void clear_all_features() { + m_features = NO_FEATURES; + } - void set_feature(features_t feature) { + void set_feature(feature_t feature) { m_features |= feature; } - void unset_feature(features_t feature) { + void unset_feature(feature_t feature) { m_features &= (~feature); } - bool is_feature_set(features_t feature) { + bool is_feature_set(feature_t feature) const { return ( (m_features & feature) == feature ); } uint32_t m_features; - RXPacketRecorder m_recorder; + RXLatency m_latency; - RXPacketBuffer *m_pkt_buffer; + RXPacketRecorder m_recorder; + RXQueue m_queue; + + CCpuUtlDp *m_cpu_dp_u; CPortLatencyHWBase *m_io; }; -- cgit 1.2.3-korg