diff options
Diffstat (limited to 'src/stateless')
-rw-r--r-- | src/stateless/cp/trex_stateless_port.cpp | 52 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.h | 18 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.cpp | 29 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.h | 158 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_core.cpp | 14 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_core.h | 19 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_defs.h | 98 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_port_mngr.cpp | 137 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_port_mngr.h | 56 |
9 files changed, 300 insertions, 281 deletions
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index ca185754..7edf1a31 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -936,61 +936,67 @@ TrexStatelessPort::remove_and_delete_all_streams() { void TrexStatelessPort::start_rx_capture(const std::string &pcap_filename, uint64_t limit) { - - m_rx_features_info.m_rx_capture_info.enable(pcap_filename, limit); - - TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(m_port_id, m_rx_features_info.m_rx_capture_info); + static MsgReply<bool> reply; + + reply.reset(); + + TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(m_port_id, pcap_filename, limit, reply); send_message_to_rx((TrexStatelessCpToRxMsgBase *)msg); /* as below, must wait for ACK from RX core before returning ACK */ - msg->wait_for_reply(); + reply.wait_for_reply(); } void TrexStatelessPort::stop_rx_capture() { TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopCapture(m_port_id); send_message_to_rx(msg); - - /* update our cached data */ - m_rx_features_info.m_rx_capture_info.disable(); } void TrexStatelessPort::start_rx_queue(uint64_t size) { - - m_rx_features_info.m_rx_queue_info.enable(size); - - TrexStatelessRxStartQueue *msg = new TrexStatelessRxStartQueue(m_port_id, m_rx_features_info.m_rx_queue_info); + static MsgReply<bool> reply; + + reply.reset(); + + TrexStatelessRxStartQueue *msg = new TrexStatelessRxStartQueue(m_port_id, size, reply); send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg ); /* we cannot return ACK to the user until the RX core has approved this might cause the user to lose some packets from the queue */ - msg->wait_for_reply(); + reply.wait_for_reply(); } void TrexStatelessPort::stop_rx_queue() { TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopQueue(m_port_id); send_message_to_rx(msg); - - /* update our cached data */ - m_rx_features_info.m_rx_queue_info.disable(); } -RXPacketBuffer * +const RXPacketBuffer * TrexStatelessPort::get_rx_queue_pkts() { + static MsgReply<const RXPacketBuffer *> reply; + + reply.reset(); - if (m_rx_features_info.m_rx_queue_info.is_empty()) { - return NULL; - } + TrexStatelessRxQueueGetPkts *msg = new TrexStatelessRxQueueGetPkts(m_port_id, reply); + send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg ); + + return reply.wait_for_reply(); +} + +Json::Value +TrexStatelessPort::rx_features_to_json() { + static MsgReply<Json::Value> reply; + + reply.reset(); - TrexStatelessRxQueueGetPkts *msg = new TrexStatelessRxQueueGetPkts(m_port_id); + TrexStatelessRxFeaturesToJson *msg = new TrexStatelessRxFeaturesToJson(m_port_id, reply); send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg ); - RXPacketBuffer *pkt_buffer = msg->wait_for_reply(); - return pkt_buffer; + return reply.wait_for_reply(); } /************* Trex Port Owner **************/ diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index 4aa7ff36..f2829b8a 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -393,21 +393,19 @@ public: */ void stop_rx_queue(); - /** - * get the RX features info object + * fetch the RX queue packets from the queue * */ - const RXFeaturesInfo &get_rx_features() { - return m_rx_features_info; - } + const RXPacketBuffer *get_rx_queue_pkts(); /** - * fetch the RX queue packets from the queue + * generate a JSON describing the status + * of the RX features * */ - RXPacketBuffer *get_rx_queue_pkts(); - + Json::Value rx_features_to_json(); + /** * return the port attribute object * @@ -447,7 +445,7 @@ private: * */ void send_message_to_rx(TrexStatelessCpToRxMsgBase *msg); - + /** * when a port stops, perform various actions * @@ -503,8 +501,6 @@ private: int m_pending_async_stop_event; - RXFeaturesInfo m_rx_features_info; - }; diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index cad4fe7a..53d5a87e 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -259,9 +259,10 @@ bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) { bool TrexStatelessRxStartCapture::handle(CRxCoreStateless *rx_core) { - rx_core->start_recorder(m_port_id, m_pcap_filename, m_limit, m_shared_counter); + rx_core->start_recorder(m_port_id, m_pcap_filename, m_limit); - set_reply(true); + /* mark as done */ + m_reply.set_reply(true); return true; } @@ -275,10 +276,10 @@ TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) { bool TrexStatelessRxStartQueue::handle(CRxCoreStateless *rx_core) { - rx_core->start_queue(m_port_id, m_size, m_shared_counter); + rx_core->start_queue(m_port_id, m_size); /* mark as done */ - set_reply(true); + m_reply.set_reply(true); return true; } @@ -292,12 +293,24 @@ TrexStatelessRxStopQueue::handle(CRxCoreStateless *rx_core) { -bool TrexStatelessRxQueueGetPkts::handle(CRxCoreStateless *rx_core) { - RXPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id); - assert(pkt_buffer); +bool +TrexStatelessRxQueueGetPkts::handle(CRxCoreStateless *rx_core) { + const RXPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id); /* set the reply */ - set_reply(pkt_buffer); + m_reply.set_reply(pkt_buffer); return true; } + + +bool +TrexStatelessRxFeaturesToJson::handle(CRxCoreStateless *rx_core) { + Json::Value output = rx_core->get_rx_port_mngr(m_port_id).to_json(); + + /* set the reply */ + m_reply.set_reply(output); + + return true; +} + diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index b1d9117f..303548aa 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -35,6 +35,57 @@ class CFlowGenListPerThread; class RXPacketBuffer; /** + * Generic message reply object + * + * @author imarom (11/27/2016) + */ +template<typename T> class MsgReply { + +public: + + MsgReply() { + reset(); + } + + void reset() { + m_pending = true; + } + + bool is_pending() const { + return m_pending; + } + + void set_reply(const T &reply) { + m_reply = reply; + + /* before marking as done make sure all stores are committed */ + asm volatile("mfence" ::: "memory"); + m_pending = false; + } + + T wait_for_reply(int timeout_ms = 100, int backoff_ms = 1) { + int guard = timeout_ms; + + while (is_pending()) { + guard -= backoff_ms; + if (guard < 0) { + throw TrexException("timeout: failed to get reply from core"); + } + + delay(backoff_ms); + } + + return m_reply; + + } + +protected: + volatile bool m_pending; + T m_reply; +}; + + +/** * defines the base class for CP to DP messages * * @author imarom (27-Oct-15) @@ -408,52 +459,6 @@ public: }; -/** - * defines the base class for CP to RX with reply - * - * @author imarom (11/27/2016) - */ -template<typename T> class TrexStatelessCpToRxMsgReply : public TrexStatelessCpToRxMsgBase { - -public: - - TrexStatelessCpToRxMsgReply() { - m_pending = true; - } - - bool is_pending() const { - return m_pending; - } - - void set_reply(const T &reply) { - m_reply = reply; - - /* before marking as done - memory fence */ - asm volatile("mfence" ::: "memory"); - m_pending = false; - } - - T wait_for_reply(int timeout_ms = 100, int backoff_ms = 1) { - int guard = timeout_ms; - - while (is_pending()) { - guard -= backoff_ms; - if (guard < 0) { - throw TrexException("timeout: RX core has failed to reply"); - } - - delay(backoff_ms); - } - - return m_reply; - - } - -protected: - volatile bool m_pending; - T m_reply; -}; - class TrexStatelessRxEnableLatency : public TrexStatelessCpToRxMsgBase { bool handle (CRxCoreStateless *rx_core); @@ -469,22 +474,25 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase { -class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgReply<bool> { +class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase { public: - TrexStatelessRxStartCapture(uint8_t port_id, RXCaptureInfo &rx_capture_info) { + TrexStatelessRxStartCapture(uint8_t port_id, + const std::string &pcap_filename, + uint64_t limit, + MsgReply<bool> &reply) : m_reply(reply) { + m_port_id = port_id; - m_limit = rx_capture_info.m_limit; - m_pcap_filename = rx_capture_info.m_pcap_filename; - m_shared_counter = &rx_capture_info.m_shared_counter; + m_limit = limit; + m_pcap_filename = pcap_filename; } virtual bool handle(CRxCoreStateless *rx_core); private: - uint8_t m_port_id; - std::string m_pcap_filename; - uint64_t m_limit; - uint64_t *m_shared_counter; + uint8_t m_port_id; + std::string m_pcap_filename; + uint64_t m_limit; + MsgReply<bool> &m_reply; }; @@ -501,12 +509,14 @@ private: }; -class TrexStatelessRxStartQueue : public TrexStatelessCpToRxMsgReply<bool> { +class TrexStatelessRxStartQueue : public TrexStatelessCpToRxMsgBase { public: - TrexStatelessRxStartQueue(uint8_t port_id, RXQueueInfo &rx_queue_info) { + TrexStatelessRxStartQueue(uint8_t port_id, + uint64_t size, + MsgReply<bool> &reply) : m_reply(reply) { + m_port_id = port_id; - m_size = rx_queue_info.m_size; - m_shared_counter = &rx_queue_info.m_shared_counter; + m_size = size; } virtual bool handle(CRxCoreStateless *rx_core); @@ -514,7 +524,7 @@ public: private: uint8_t m_port_id; uint64_t m_size; - uint64_t *m_shared_counter; + MsgReply<bool> &m_reply; }; @@ -532,10 +542,10 @@ private: -class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgReply<RXPacketBuffer *> { +class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgBase { public: - TrexStatelessRxQueueGetPkts(uint8_t port_id) { + TrexStatelessRxQueueGetPkts(uint8_t port_id, MsgReply<const RXPacketBuffer *> &reply) : m_reply(reply) { m_port_id = port_id; } @@ -546,8 +556,30 @@ public: virtual bool handle(CRxCoreStateless *rx_core); private: - uint8_t m_port_id; + uint8_t m_port_id; + MsgReply<const RXPacketBuffer *> &m_reply; + }; +/** + * a request from RX core to dump to Json the RX features + */ +class TrexStatelessRxFeaturesToJson : public TrexStatelessCpToRxMsgBase { +public: + + TrexStatelessRxFeaturesToJson(uint8_t port_id, MsgReply<Json::Value> &reply) : m_reply(reply) { + m_port_id = port_id; + } + + /** + * virtual function to handle a message + * + */ + virtual bool handle(CRxCoreStateless *rx_core); + +private: + uint8_t m_port_id; + MsgReply<Json::Value> &m_reply; +}; #endif /* __TREX_STATELESS_MESSAGING_H__ */ diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index a1ff9c6a..b24fcb8f 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -374,8 +374,8 @@ double CRxCoreStateless::get_cpu_util() { void -CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter) { - m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit, shared_counter); +CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit) { + m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit); } void @@ -384,8 +384,8 @@ CRxCoreStateless::stop_recorder(uint8_t port_id) { } void -CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size, uint64_t *shared_counter) { - m_rx_port_mngr[port_id].start_queue(size, shared_counter); +CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size) { + m_rx_port_mngr[port_id].start_queue(size); } void @@ -407,3 +407,9 @@ CRxCoreStateless::disable_latency() { } } +const RXPortManager & +CRxCoreStateless::get_rx_port_mngr(uint8_t port_id) { + assert(port_id < m_max_ports); + return m_rx_port_mngr[port_id]; + +} diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index 8e50a46e..b27a7ca5 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -111,27 +111,22 @@ class CRxCoreStateless { double get_cpu_util(); void update_cpu_util(); - RXPacketBuffer *get_rx_queue_pkts(uint8_t port_id) { + const RXPacketBuffer *get_rx_queue_pkts(uint8_t port_id) { return m_rx_port_mngr[port_id].get_pkt_buffer(); } /** - * start capturing of RX packets on a specific port - * - * @author imarom (11/2/2016) - * - * @param port_id - * @param pcap_filename - * @param limit + * start capturing of RX packets on a specific port + * */ - void start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter); + void start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit); void stop_recorder(uint8_t port_id); /** * start RX queueing of packets * */ - void start_queue(uint8_t port_id, uint64_t size, uint64_t *shared_counter); + void start_queue(uint8_t port_id, uint64_t size); void stop_queue(uint8_t port_id); /** @@ -141,6 +136,8 @@ class CRxCoreStateless { void enable_latency(); void disable_latency(); + const RXPortManager &get_rx_port_mngr(uint8_t port_id); + private: void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg); bool periodic_check_for_cp_messages(); @@ -162,7 +159,7 @@ class CRxCoreStateless { } void try_rx_queues(); - + private: TrexMonitor m_monitor; uint32_t m_max_ports; diff --git a/src/stateless/rx/trex_stateless_rx_defs.h b/src/stateless/rx/trex_stateless_rx_defs.h index 7b1e0f32..3e5d2c38 100644 --- a/src/stateless/rx/trex_stateless_rx_defs.h +++ b/src/stateless/rx/trex_stateless_rx_defs.h @@ -55,103 +55,5 @@ typedef enum rx_filter_mode_ { RX_FILTER_MODE_ALL } rx_filter_mode_e; -/** - * holds RX capture info - * - */ -class RXCaptureInfo { -public: - RXCaptureInfo() { - m_is_active = false; - m_limit = 0; - m_shared_counter = 0; - } - - void enable(const std::string &pcap_filename, uint64_t limit) { - m_pcap_filename = pcap_filename; - m_limit = limit; - m_is_active = true; - } - - void disable() { - m_is_active = false; - m_pcap_filename = ""; - m_limit = 0; - } - - bool is_empty() const { - return (m_shared_counter == 0); - } - - void to_json(Json::Value &output) const { - output["is_active"] = m_is_active; - if (m_is_active) { - output["pcap_filename"] = m_pcap_filename; - output["limit"] = Json::UInt64(m_limit); - output["count"] = Json::UInt64(m_shared_counter); - } - } - -public: - bool m_is_active; - std::string m_pcap_filename; - uint64_t m_limit; - uint64_t m_shared_counter; -}; - - -class RXQueueInfo { -public: - - RXQueueInfo() { - m_is_active = false; - m_shared_counter = 0; - } - - void enable(uint64_t size) { - m_size = size; - m_is_active = true; - } - - void disable() { - m_is_active = false; - m_size = 0; - } - - bool is_empty() const { - return (m_shared_counter == 0); - } - - void to_json(Json::Value &output) const { - output["is_active"] = m_is_active; - if (m_is_active) { - output["size"] = Json::UInt64(m_size); - output["count"] = Json::UInt64(m_shared_counter); - } - } - -public: - bool m_is_active; - uint64_t m_size; - uint64_t m_shared_counter; -}; - - -/** - * holds all the RX features info - * - * @author imarom (11/7/2016) - */ -class RXFeaturesInfo { -public: - RXCaptureInfo m_rx_capture_info; - RXQueueInfo m_rx_queue_info; - - void to_json(Json::Value &msg) const { - m_rx_capture_info.to_json(msg["sniffer"]); - m_rx_queue_info.to_json(msg["queue"]); - } -}; - #endif /* __TREX_STATELESS_RX_DEFS_H__ */ diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp index 78f4ac5c..00032e8b 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp @@ -156,25 +156,24 @@ RXLatency::reset_stats() { } } +Json::Value +RXLatency::to_json() const { + return Json::objectValue; +} + /************************************** * RX feature queue * *************************************/ -RXPacketBuffer::RXPacketBuffer(uint64_t size, uint64_t *shared_counter) { +RXPacketBuffer::RXPacketBuffer(uint64_t size) { m_buffer = nullptr; m_head = 0; m_tail = 0; m_size = (size + 1); // for the empty/full difference 1 slot reserved - m_shared_counter = shared_counter; - - /* reset the counter */ - *m_shared_counter = 0; /* generate queue */ m_buffer = new RXPacket*[m_size](); // zeroed - - m_is_enabled = true; } RXPacketBuffer::~RXPacketBuffer() { @@ -187,22 +186,8 @@ RXPacketBuffer::~RXPacketBuffer() { delete [] m_buffer; } -RXPacketBuffer * -RXPacketBuffer::freeze_and_clone() { - /* create a new one - same size and shared counter 0 */ - RXPacketBuffer *new_buffer = new RXPacketBuffer(m_size, m_shared_counter); - - /* freeze the current */ - m_shared_counter = NULL; - m_is_enabled = false; - - return new_buffer; -} - void RXPacketBuffer::push(const rte_mbuf_t *m) { - assert(m_is_enabled); - /* if full - pop the oldest */ if (is_full()) { delete pop(); @@ -211,22 +196,27 @@ RXPacketBuffer::push(const rte_mbuf_t *m) { /* push packet */ m_buffer[m_head] = new RXPacket(m); m_head = next(m_head); - - /* update the shared counter - control plane memory */ - (*m_shared_counter)++; } RXPacket * RXPacketBuffer::pop() { - assert(m_is_enabled); assert(!is_empty()); RXPacket *pkt = m_buffer[m_tail]; m_tail = next(m_tail); - (*m_shared_counter)--; + return pkt; } +uint64_t +RXPacketBuffer::get_element_count() const { + if (m_head >= m_tail) { + return (m_head - m_tail); + } else { + return ( get_capacity() - (m_tail - m_head - 1) ); + } +} + Json::Value RXPacketBuffer::to_json() const { @@ -244,11 +234,11 @@ RXPacketBuffer::to_json() const { void -RXQueue::start(uint64_t size, uint64_t *shared_counter) { +RXQueue::start(uint64_t size) { if (m_pkt_buffer) { delete m_pkt_buffer; } - m_pkt_buffer = new RXPacketBuffer(size, shared_counter); + m_pkt_buffer = new RXPacketBuffer(size); } void @@ -259,10 +249,11 @@ RXQueue::stop() { } } -RXPacketBuffer * +const RXPacketBuffer * RXQueue::fetch() { - if (!m_pkt_buffer) { + /* if no buffer or the buffer is empty - give a NULL one */ + if ( (!m_pkt_buffer) || (m_pkt_buffer->get_element_count() == 0) ) { return nullptr; } @@ -270,7 +261,7 @@ RXQueue::fetch() { RXPacketBuffer *old_buffer = m_pkt_buffer; /* replace the old one with a new one and freeze the old */ - m_pkt_buffer = old_buffer->freeze_and_clone(); + m_pkt_buffer = new RXPacketBuffer(old_buffer->get_capacity()); return old_buffer; } @@ -280,6 +271,18 @@ RXQueue::handle_pkt(const rte_mbuf_t *m) { m_pkt_buffer->push(m); } +Json::Value +RXQueue::to_json() const { + assert(m_pkt_buffer != NULL); + + Json::Value output = Json::objectValue; + + output["size"] = Json::UInt64(m_pkt_buffer->get_capacity()); + output["count"] = Json::UInt64(m_pkt_buffer->get_element_count()); + + return output; +} + /************************************** * RX feature recorder * @@ -287,13 +290,15 @@ RXQueue::handle_pkt(const rte_mbuf_t *m) { RXPacketRecorder::RXPacketRecorder() { m_writer = NULL; - m_shared_counter = NULL; + m_count = 0; m_limit = 0; m_epoch = -1; + + m_pending_flush = false; } void -RXPacketRecorder::start(const std::string &pcap, uint64_t limit, uint64_t *shared_counter) { +RXPacketRecorder::start(const std::string &pcap, uint64_t limit) { m_writer = CCapWriterFactory::CreateWriter(LIBPCAP, (char *)pcap.c_str()); if (m_writer == NULL) { std::stringstream ss; @@ -302,23 +307,29 @@ RXPacketRecorder::start(const std::string &pcap, uint64_t limit, uint64_t *share } assert(limit > 0); + m_limit = limit; - m_shared_counter = shared_counter; - (*m_shared_counter) = 0; + m_count = 0; + m_pending_flush = false; + m_pcap_filename = pcap; } void RXPacketRecorder::stop() { - if (m_writer) { - delete m_writer; - m_writer = NULL; + if (!m_writer) { + return; } + + delete m_writer; + m_writer = NULL; } void RXPacketRecorder::flush_to_disk() { - if (m_writer) { + + if (m_writer && m_pending_flush) { m_writer->flush_to_disk(); + m_pending_flush = false; } } @@ -344,15 +355,25 @@ RXPacketRecorder::handle_pkt(const rte_mbuf_t *m) { memcpy(m_pkt.raw, p, m->pkt_len); m_writer->write_packet(&m_pkt); - - m_limit--; - (*m_shared_counter)++; - - if (m_limit == 0) { + m_count++; + m_pending_flush = true; + + if (m_count == m_limit) { stop(); } + } +Json::Value +RXPacketRecorder::to_json() const { + Json::Value output = Json::objectValue; + + output["pcap_filename"] = m_pcap_filename; + output["limit"] = Json::UInt64(m_limit); + output["count"] = Json::UInt64(m_count); + + return output; +} /************************************** * Port manager @@ -432,3 +453,31 @@ RXPortManager::tick() { m_recorder.flush_to_disk(); } } + +Json::Value +RXPortManager::to_json() const { + Json::Value output = Json::objectValue; + + if (is_feature_set(LATENCY)) { + output["latency"] = m_latency.to_json(); + output["latency"]["is_active"] = true; + } else { + output["latency"]["is_active"] = false; + } + + if (is_feature_set(RECORDER)) { + output["sniffer"] = m_recorder.to_json(); + output["sniffer"]["is_active"] = true; + } else { + output["sniffer"]["is_active"] = false; + } + + if (is_feature_set(QUEUE)) { + output["queue"] = m_queue.to_json(); + output["queue"]["is_active"] = true; + } else { + output["queue"]["is_active"] = false; + } + + return output; +} diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h index 564b15d4..bc34b5aa 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.h +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h @@ -47,6 +47,8 @@ public: void handle_pkt(const rte_mbuf_t *m); + Json::Value to_json() const; + private: bool is_flow_stat_id(uint32_t id) { if ((id & 0x000fff00) == IP_ID_RESERVE_BASE) return true; @@ -123,7 +125,7 @@ private: class RXPacketBuffer { public: - RXPacketBuffer(uint64_t size, uint64_t *shared_counter); + RXPacketBuffer(uint64_t size); ~RXPacketBuffer(); /** @@ -133,12 +135,6 @@ public: void push(const rte_mbuf_t *m); /** - * freezes the queue and clones a new one - * - */ - RXPacketBuffer * freeze_and_clone(); - - /** * generate a JSON output of the queue * */ @@ -153,6 +149,19 @@ public: return ( next(m_head) == m_tail); } + /** + * return the total amount of space possible + */ + uint64_t get_capacity() const { + /* one slot is used for diff between full/empty */ + return (m_size - 1); + } + + /** + * returns how many elements are in the queue + */ + uint64_t get_element_count() const; + private: int next(int v) const { return ( (v + 1) % m_size ); @@ -165,8 +174,6 @@ private: int m_tail; int m_size; RXPacket **m_buffer; - uint64_t *m_shared_counter; - bool m_is_enabled; }; @@ -184,13 +191,13 @@ public: * start RX queue * */ - void start(uint64_t size, uint64_t *shared_counter); + void start(uint64_t size); /** * fetch the current buffer - * + * return NULL if no packets */ - RXPacketBuffer * fetch(); + const RXPacketBuffer * fetch(); /** * stop RX queue @@ -200,6 +207,8 @@ public: void handle_pkt(const rte_mbuf_t *m); + Json::Value to_json() const; + private: RXPacketBuffer *m_pkt_buffer; }; @@ -217,7 +226,7 @@ public: stop(); } - void start(const std::string &pcap, uint64_t limit, uint64_t *shared_counter); + void start(const std::string &pcap, uint64_t limit); void stop(); void handle_pkt(const rte_mbuf_t *m); @@ -227,12 +236,16 @@ public: */ void flush_to_disk(); + Json::Value to_json() const; + private: CFileWriterBase *m_writer; + std::string m_pcap_filename; CCapPktRaw m_pkt; dsec_t m_epoch; uint64_t m_limit; - uint64_t *m_shared_counter; + uint64_t m_count; + bool m_pending_flush; }; @@ -277,8 +290,8 @@ public: } /* recorder */ - void start_recorder(const std::string &pcap, uint64_t limit_pkts, uint64_t *shared_counter) { - m_recorder.start(pcap, limit_pkts, shared_counter); + void start_recorder(const std::string &pcap, uint64_t limit_pkts) { + m_recorder.start(pcap, limit_pkts); set_feature(RECORDER); } @@ -288,8 +301,8 @@ public: } /* queue */ - void start_queue(uint32_t size, uint64_t *shared_counter) { - m_queue.start(size, shared_counter); + void start_queue(uint32_t size) { + m_queue.start(size); set_feature(QUEUE); } @@ -298,7 +311,7 @@ public: unset_feature(QUEUE); } - RXPacketBuffer *get_pkt_buffer() { + const RXPacketBuffer *get_pkt_buffer() { if (!is_feature_set(QUEUE)) { return nullptr; } @@ -346,6 +359,11 @@ public: return (!has_features_set()); } + /** + * write the status to a JSON format + */ + Json::Value to_json() const; + private: void clear_all_features() { |