From 537f5831c4400dea7fa15032c4cd6bd2fae86bb1 Mon Sep 17 00:00:00 2001 From: imarom Date: Mon, 28 Nov 2016 11:25:32 +0200 Subject: RX features - self code review Signed-off-by: imarom --- src/stateless/rx/trex_stateless_rx_core.cpp | 32 +++-- src/stateless/rx/trex_stateless_rx_core.h | 7 +- src/stateless/rx/trex_stateless_rx_port_mngr.cpp | 90 ++++++++++++-- src/stateless/rx/trex_stateless_rx_port_mngr.h | 147 +++++++++++++---------- 4 files changed, 197 insertions(+), 79 deletions(-) (limited to 'src/stateless/rx') diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index 2a678365..a1ff9c6a 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -82,7 +82,7 @@ void CRxCoreStateless::create(const CRxSlCfg &cfg) { m_cpu_cp_u.Create(&m_cpu_dp_u); - /* init per port manager */ + /* create per port manager */ for (int i = 0; i < m_max_ports; i++) { m_rx_port_mngr[i].create(cfg.m_ports[i], m_rfc2544, @@ -171,9 +171,20 @@ void CRxCoreStateless::idle_state_loop() { } } +/** + * for each port give a tick (for flushing if needed) + * + */ +void CRxCoreStateless::port_manager_tick() { + for (int i = 0; i < m_max_ports; i++) { + m_rx_port_mngr[i].tick(); + } +} + void CRxCoreStateless::handle_work_stage(bool do_try_rx_queue) { int i = 0; - + int j = 0; + while (m_state == STATE_WORKING) { if (do_try_rx_queue) { @@ -182,12 +193,19 @@ void CRxCoreStateless::handle_work_stage(bool do_try_rx_queue) { process_all_pending_pkts(); + /* TODO: with scheduler, this should be solved better */ i++; if (i == 100000) { // approx 10msec i = 0; periodic_check_for_cp_messages(); // m_state might change in here + + j++; + if (j == 100) { // approx 1 sec + j = 0; + port_manager_tick(); + } } - + rte_pause(); } } @@ -356,13 +374,13 @@ double CRxCoreStateless::get_cpu_util() { void -CRxCoreStateless::start_capture(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter) { - m_rx_port_mngr[port_id].start_capture(pcap_filename, limit, shared_counter); +CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter) { + m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit, shared_counter); } void -CRxCoreStateless::stop_capture(uint8_t port_id) { - m_rx_port_mngr[port_id].stop_capture(); +CRxCoreStateless::stop_recorder(uint8_t port_id) { + m_rx_port_mngr[port_id].stop_recorder(); } void diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index 9df36310..8e50a46e 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -124,8 +124,8 @@ class CRxCoreStateless { * @param pcap_filename * @param limit */ - void start_capture(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter); - void stop_capture(uint8_t port_id); + void start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter); + void stop_recorder(uint8_t port_id); /** * start RX queueing of packets @@ -153,7 +153,8 @@ class CRxCoreStateless { void capture_pkt(rte_mbuf_t *m); void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r); void handle_work_stage(bool do_try_rx_queue); - + void port_manager_tick(); + int process_all_pending_pkts(bool flush_rx = false); void flush_all_pending_pkts() { diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp index 46fec432..78f4ac5c 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp @@ -200,7 +200,7 @@ RXPacketBuffer::freeze_and_clone() { } void -RXPacketBuffer::handle_pkt(const rte_mbuf_t *m) { +RXPacketBuffer::push(const rte_mbuf_t *m) { assert(m_is_enabled); /* if full - pop the oldest */ @@ -242,7 +242,48 @@ RXPacketBuffer::to_json() const { return output; } -/****************************** packet recorder ****************************/ + +void +RXQueue::start(uint64_t size, uint64_t *shared_counter) { + if (m_pkt_buffer) { + delete m_pkt_buffer; + } + m_pkt_buffer = new RXPacketBuffer(size, shared_counter); +} + +void +RXQueue::stop() { + if (m_pkt_buffer) { + delete m_pkt_buffer; + m_pkt_buffer = NULL; + } +} + +RXPacketBuffer * +RXQueue::fetch() { + + if (!m_pkt_buffer) { + return nullptr; + } + + /* hold a pointer to the old one */ + RXPacketBuffer *old_buffer = m_pkt_buffer; + + /* replace the old one with a new one and freeze the old */ + m_pkt_buffer = old_buffer->freeze_and_clone(); + + return old_buffer; +} + +void +RXQueue::handle_pkt(const rte_mbuf_t *m) { + m_pkt_buffer->push(m); +} + +/************************************** + * RX feature recorder + * + *************************************/ RXPacketRecorder::RXPacketRecorder() { m_writer = NULL; @@ -251,10 +292,6 @@ RXPacketRecorder::RXPacketRecorder() { m_epoch = -1; } -RXPacketRecorder::~RXPacketRecorder() { - stop(); -} - void RXPacketRecorder::start(const std::string &pcap, uint64_t limit, uint64_t *shared_counter) { m_writer = CCapWriterFactory::CreateWriter(LIBPCAP, (char *)pcap.c_str()); @@ -278,6 +315,13 @@ RXPacketRecorder::stop() { } } +void +RXPacketRecorder::flush_to_disk() { + if (m_writer) { + m_writer->flush_to_disk(); + } +} + void RXPacketRecorder::handle_pkt(const rte_mbuf_t *m) { if (!m_writer) { @@ -310,6 +354,30 @@ RXPacketRecorder::handle_pkt(const rte_mbuf_t *m) { } +/************************************** + * Port manager + * + *************************************/ + +RXPortManager::RXPortManager() { + clear_all_features(); + m_io = NULL; + m_cpu_dp_u = NULL; +} + + +void +RXPortManager::create(CPortLatencyHWBase *io, + CRFC2544Info *rfc2544, + CRxCoreErrCntrs *err_cntrs, + CCpuUtlDp *cpu_util) { + m_io = io; + m_cpu_dp_u = cpu_util; + + /* init features */ + m_latency.create(rfc2544, err_cntrs); +} + void RXPortManager::handle_pkt(const rte_mbuf_t *m) { /* handle features */ @@ -318,12 +386,12 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) { m_latency.handle_pkt(m); } - if (is_feature_set(CAPTURE)) { + if (is_feature_set(RECORDER)) { m_recorder.handle_pkt(m); } if (is_feature_set(QUEUE)) { - m_pkt_buffer->handle_pkt(m); + m_queue.handle_pkt(m); } } @@ -358,3 +426,9 @@ int RXPortManager::process_all_pending_pkts(bool flush_rx) { return cnt_p; } +void +RXPortManager::tick() { + if (is_feature_set(RECORDER)) { + m_recorder.flush_to_disk(); + } +} diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h index 9df42039..564b15d4 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.h +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h @@ -127,10 +127,10 @@ public: ~RXPacketBuffer(); /** - * handle a new packet + * push a packet to the buffer * */ - void handle_pkt(const rte_mbuf_t *m); + void push(const rte_mbuf_t *m); /** * freezes the queue and clones a new one @@ -170,6 +170,40 @@ private: }; +class RXQueue { +public: + RXQueue() { + m_pkt_buffer = nullptr; + } + + ~RXQueue() { + stop(); + } + + /** + * start RX queue + * + */ + void start(uint64_t size, uint64_t *shared_counter); + + /** + * fetch the current buffer + * + */ + RXPacketBuffer * fetch(); + + /** + * stop RX queue + * + */ + void stop(); + + void handle_pkt(const rte_mbuf_t *m); + +private: + RXPacketBuffer *m_pkt_buffer; +}; + /************************************** * RX feature PCAP recorder * @@ -178,12 +212,21 @@ private: class RXPacketRecorder { public: RXPacketRecorder(); - ~RXPacketRecorder(); + + ~RXPacketRecorder() { + stop(); + } void start(const std::string &pcap, uint64_t limit, uint64_t *shared_counter); void stop(); void handle_pkt(const rte_mbuf_t *m); + /** + * flush any cached packets to disk + * + */ + void flush_to_disk(); + private: CFileWriterBase *m_writer; CCapPktRaw m_pkt; @@ -202,27 +245,19 @@ private: */ class RXPortManager { public: - enum features_t { - LATENCY = 0x1, - CAPTURE = 0x2, - QUEUE = 0x4 + enum feature_t { + NO_FEATURES = 0x0, + LATENCY = 0x1, + RECORDER = 0x2, + QUEUE = 0x4 }; - RXPortManager() { - m_features = 0; - m_pkt_buffer = NULL; - m_io = NULL; - m_cpu_dp_u = NULL; - } + RXPortManager(); void create(CPortLatencyHWBase *io, CRFC2544Info *rfc2544, CRxCoreErrCntrs *err_cntrs, - CCpuUtlDp *cpu_util) { - m_io = io; - m_cpu_dp_u = cpu_util; - m_latency.create(rfc2544, err_cntrs); - } + CCpuUtlDp *cpu_util); void clear_stats() { m_latency.reset_stats(); @@ -232,6 +267,7 @@ public: return m_latency; } + /* latency */ void enable_latency() { set_feature(LATENCY); } @@ -240,60 +276,37 @@ public: unset_feature(LATENCY); } - /** - * capturing of RX packets - * - * @author imarom (11/2/2016) - * - * @param pcap - * @param limit_pkts - */ - void start_capture(const std::string &pcap, uint64_t limit_pkts, uint64_t *shared_counter) { + /* recorder */ + void start_recorder(const std::string &pcap, uint64_t limit_pkts, uint64_t *shared_counter) { m_recorder.start(pcap, limit_pkts, shared_counter); - set_feature(CAPTURE); + set_feature(RECORDER); } - void stop_capture() { + void stop_recorder() { m_recorder.stop(); - unset_feature(CAPTURE); + unset_feature(RECORDER); } - /** - * queueing packets - * - */ + /* queue */ void start_queue(uint32_t size, uint64_t *shared_counter) { - if (m_pkt_buffer) { - delete m_pkt_buffer; - } - m_pkt_buffer = new RXPacketBuffer(size, shared_counter); + m_queue.start(size, shared_counter); set_feature(QUEUE); } void stop_queue() { - if (m_pkt_buffer) { - delete m_pkt_buffer; - m_pkt_buffer = NULL; - } - unset_feature(QUEUE); + m_queue.stop(); + unset_feature(QUEUE); } RXPacketBuffer *get_pkt_buffer() { if (!is_feature_set(QUEUE)) { - return NULL; + return nullptr; } - - assert(m_pkt_buffer); - - /* hold a pointer to the old one */ - RXPacketBuffer *old_buffer = m_pkt_buffer; - - /* replace the old one with a new one and freeze the old */ - m_pkt_buffer = old_buffer->freeze_and_clone(); - - return old_buffer; + + return m_queue.fetch(); } + /** * fetch and process all packets @@ -317,9 +330,15 @@ public: */ void handle_pkt(const rte_mbuf_t *m); - + /** + * maintenance + * + * @author imarom (11/24/2016) + */ + void tick(); + bool has_features_set() { - return (m_features != 0); + return (m_features != NO_FEATURES); } @@ -329,23 +348,29 @@ public: private: + void clear_all_features() { + m_features = NO_FEATURES; + } - void set_feature(features_t feature) { + void set_feature(feature_t feature) { m_features |= feature; } - void unset_feature(features_t feature) { + void unset_feature(feature_t feature) { m_features &= (~feature); } - bool is_feature_set(features_t feature) { + bool is_feature_set(feature_t feature) const { return ( (m_features & feature) == feature ); } uint32_t m_features; - RXPacketRecorder m_recorder; + RXLatency m_latency; - RXPacketBuffer *m_pkt_buffer; + RXPacketRecorder m_recorder; + RXQueue m_queue; + + CCpuUtlDp *m_cpu_dp_u; CPortLatencyHWBase *m_io; }; -- cgit 1.2.3-korg