diff options
author | imarom <imarom@cisco.com> | 2016-11-07 18:47:23 +0200 |
---|---|---|
committer | imarom <imarom@cisco.com> | 2016-11-07 18:47:23 +0200 |
commit | f9a0c5e2e1e1135cb0c0e6e192565e5b100c5a41 (patch) | |
tree | f09e19975f324d9c0eb717608473dcdbd334a608 /src/stateless | |
parent | e85ea75669ea39e4f99519138a3a84e4df6eed2d (diff) |
RX features - queueing
Signed-off-by: imarom <imarom@cisco.com>
Diffstat (limited to 'src/stateless')
-rw-r--r-- | src/stateless/cp/trex_stateless_port.cpp | 26 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless_port.h | 26 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.cpp | 14 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.h | 40 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_core.cpp | 10 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_core.h | 7 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_defs.h | 56 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_port_mngr.cpp | 2 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_port_mngr.h | 69 |
9 files changed, 205 insertions, 45 deletions
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp index 2318061d..691185cd 100644 --- a/src/stateless/cp/trex_stateless_port.cpp +++ b/src/stateless/cp/trex_stateless_port.cpp @@ -947,12 +947,9 @@ TrexStatelessPort::remove_and_delete_all_streams() { void TrexStatelessPort::start_rx_capture(const std::string &pcap_filename, uint64_t limit) { - m_rx_capture_info.enable(pcap_filename, limit); + m_rx_features_info.m_rx_capture_info.enable(pcap_filename, limit); - TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStartCapture(m_port_id, - pcap_filename, - limit, - &m_rx_capture_info.m_shared_counter); + TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStartCapture(m_port_id, m_rx_features_info.m_rx_capture_info); send_message_to_rx(msg); } @@ -960,12 +957,23 @@ void TrexStatelessPort::stop_rx_capture() { TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopCapture(m_port_id); send_message_to_rx(msg); - m_rx_capture_info.disable(); + m_rx_features_info.m_rx_capture_info.disable(); } -const RXCaptureInfo & -TrexStatelessPort::get_rx_capture_info() { - return m_rx_capture_info; +void +TrexStatelessPort::start_rx_queue(uint64_t size) { + + m_rx_features_info.m_rx_queue_info.enable(size); + + TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStartQueue(m_port_id, m_rx_features_info.m_rx_queue_info); + send_message_to_rx(msg); +} + +void +TrexStatelessPort::stop_rx_queue() { + TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopQueue(m_port_id); + send_message_to_rx(msg); + m_rx_features_info.m_rx_queue_info.disable(); } diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h index 973a95c6..36f17659 100644 --- a/src/stateless/cp/trex_stateless_port.h +++ b/src/stateless/cp/trex_stateless_port.h @@ -382,11 +382,30 @@ public: */ void stop_rx_capture(); + /** + * start RX queueing of packets + * + * @author imarom (11/7/2016) + * + * @param limit + */ + void start_rx_queue(uint64_t limit); + + /** + * stop RX queueing + * + * @author imarom (11/7/2016) + */ + void stop_rx_queue(); + + /** - * status of the RX capture + * get the RX features info object * */ - const RXCaptureInfo &get_rx_capture_info(); + const RXFeaturesInfo &get_rx_features() { + return m_rx_features_info; + } /** * fetch the RX software packets from the queue @@ -481,7 +500,8 @@ private: int m_pending_async_stop_event; - RXCaptureInfo m_rx_capture_info; + RXFeaturesInfo m_rx_features_info; + }; diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index bd444dff..95168c4d 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -276,6 +276,20 @@ TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) { return true; } +bool +TrexStatelessRxStartQueue::handle(CRxCoreStateless *rx_core) { + rx_core->start_queue(m_port_id, m_size, m_shared_counter); + + return true; +} + +bool +TrexStatelessRxStopQueue::handle(CRxCoreStateless *rx_core) { + rx_core->stop_queue(m_port_id); + + return true; +} + bool TrexStatelessRxSwGetPkts::handle(CRxCoreStateless *rx_core) { RxPacketBuffer *pkt_buffer = rx_core->get_rx_sw_pkt_buffer(m_port_id); diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index f35d9da6..b598a6d6 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -423,13 +423,11 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase { class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase { public: - TrexStatelessRxStartCapture(uint8_t port_id, - const std::string &pcap_filename, - uint64_t limit, - uint64_t *shared_counter) : m_pcap_filename(pcap_filename) { - m_port_id = port_id; - m_limit = limit; - m_shared_counter = shared_counter; + TrexStatelessRxStartCapture(uint8_t port_id, RXCaptureInfo &rx_capture_info) { + m_port_id = port_id; + m_limit = rx_capture_info.m_limit; + m_pcap_filename = rx_capture_info.m_pcap_filename; + m_shared_counter = &rx_capture_info.m_shared_counter; } virtual bool handle(CRxCoreStateless *rx_core); @@ -454,6 +452,34 @@ private: uint8_t m_port_id; }; +class TrexStatelessRxStartQueue : public TrexStatelessCpToRxMsgBase { +public: + TrexStatelessRxStartQueue(uint8_t port_id, RXQueueInfo &rx_queue_info) { + m_port_id = port_id; + m_size = rx_queue_info.m_size; + m_shared_counter = &rx_queue_info.m_shared_counter; + } + + virtual bool handle(CRxCoreStateless *rx_core); + +private: + uint8_t m_port_id; + uint64_t m_size; + uint64_t *m_shared_counter; +}; + +class TrexStatelessRxStopQueue : public TrexStatelessCpToRxMsgBase { +public: + TrexStatelessRxStopQueue(uint8_t port_id) { + m_port_id = port_id; + } + + virtual bool handle(CRxCoreStateless *rx_core); + +private: + uint8_t m_port_id; +}; + template<typename T> class TrexStatelessMsgReply { public: diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index 3fe72f54..2a678365 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -366,6 +366,16 @@ CRxCoreStateless::stop_capture(uint8_t port_id) { } void +CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size, uint64_t *shared_counter) { + m_rx_port_mngr[port_id].start_queue(size, shared_counter); +} + +void +CRxCoreStateless::stop_queue(uint8_t port_id) { + m_rx_port_mngr[port_id].stop_queue(); +} + +void CRxCoreStateless::enable_latency() { for (int i = 0; i < m_max_ports; i++) { m_rx_port_mngr[i].enable_latency(); diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index 689b28ec..b5844583 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -128,6 +128,13 @@ class CRxCoreStateless { void stop_capture(uint8_t port_id); /** + * start RX queueing of packets + * + */ + void start_queue(uint8_t port_id, uint64_t size, uint64_t *shared_counter); + void stop_queue(uint8_t port_id); + + /** * enable latency feature for RX packets * will be apply to all ports */ diff --git a/src/stateless/rx/trex_stateless_rx_defs.h b/src/stateless/rx/trex_stateless_rx_defs.h index ee124270..bdd86a72 100644 --- a/src/stateless/rx/trex_stateless_rx_defs.h +++ b/src/stateless/rx/trex_stateless_rx_defs.h @@ -51,8 +51,8 @@ class CRxSlCfg { * */ typedef enum rx_filter_mode_ { - RX_FILTER_MODE_HW, - RX_FILTER_MODE_ALL + RX_FILTER_MODE_HW, + RX_FILTER_MODE_ALL } rx_filter_mode_e; /** @@ -95,4 +95,56 @@ public: uint64_t m_shared_counter; }; + +class RXQueueInfo { +public: + + RXQueueInfo() { + m_is_active = false; + m_shared_counter = 0; + } + + void enable(uint64_t size) { + m_size = size; + m_is_active = true; + } + + void disable() { + m_is_active = false; + m_size = 0; + } + + + void to_json(Json::Value &output) const { + output["is_active"] = m_is_active; + if (m_is_active) { + output["size"] = Json::UInt64(m_size); + output["count"] = Json::UInt64(m_shared_counter); + } + } + +public: + bool m_is_active; + uint64_t m_size; + uint64_t m_shared_counter; +}; + + +/** + * holds all the RX features info + * + * @author imarom (11/7/2016) + */ +class RXFeaturesInfo { +public: + RXCaptureInfo m_rx_capture_info; + RXQueueInfo m_rx_queue_info; + + void to_json(Json::Value &msg) const { + m_rx_capture_info.to_json(msg["sniffer"]); + m_rx_queue_info.to_json(msg["queue"]); + } +}; + #endif /* __TREX_STATELESS_RX_DEFS_H__ */ + diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp index 7283f703..2683dbe1 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp @@ -216,7 +216,7 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) { } if (is_feature_set(QUEUE)) { - m_pkt_buffer->push(new RxPacket(m)); + m_pkt_buffer->handle_pkt(m); } } diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h index 90527f0c..aa8ba8e9 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.h +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h @@ -33,7 +33,6 @@ class CPortLatencyHWBase; class CRFC2544Info; class CRxCoreErrCntrs; - class RXLatency { public: @@ -126,13 +125,18 @@ private: class RxPacketBuffer { public: - RxPacketBuffer(int limit) { - m_buffer = nullptr; - m_head = 0; - m_tail = 0; - m_limit = limit; + RxPacketBuffer(uint64_t size, uint64_t *shared_counter) { + m_buffer = nullptr; + m_head = 0; + m_tail = 0; + m_size = (size + 1); // for the empty/full difference 1 slot reserved + m_shared_counter = shared_counter; + + *m_shared_counter = 0; + + m_buffer = new RxPacket*[m_size](); // zeroed - m_buffer = new RxPacket*[limit](); // zeroed + m_is_enabled = true; } ~RxPacketBuffer() { @@ -145,6 +149,18 @@ public: delete [] m_buffer; } + /* freeze the data structure - no more packets can be pushed / poped */ + RxPacketBuffer * freeze_and_clone() { + /* create a new one */ + RxPacketBuffer *new_buffer = new RxPacketBuffer(m_size, m_shared_counter); + + /* freeze the current */ + m_shared_counter = NULL; + m_is_enabled = false; + + return new_buffer; + } + bool is_empty() const { return (m_head == m_tail); } @@ -153,15 +169,17 @@ public: return ( next(m_head) == m_tail); } - int get_limit() const { - return m_limit; - } + void handle_pkt(const rte_mbuf_t *m) { + assert(m_is_enabled); - void push(RxPacket *pkt) { + /* if full - pop the oldest */ if (is_full()) { delete pop(); } - m_buffer[m_head] = pkt; + + (*m_shared_counter)++; + + m_buffer[m_head] = new RxPacket(m); m_head = next(m_head); } @@ -169,7 +187,7 @@ public: * generate a JSON output of the queue * */ - Json::Value to_json() { + Json::Value to_json() const { Json::Value output = Json::arrayValue; @@ -185,21 +203,26 @@ public: private: int next(int v) const { - return ( (v + 1) % m_limit ); + return ( (v + 1) % m_size ); } /* pop in case of full queue - internal usage */ RxPacket * pop() { + assert(m_is_enabled); assert(!is_empty()); + RxPacket *pkt = m_buffer[m_tail]; m_tail = next(m_tail); + (*m_shared_counter)--; return pkt; } - int m_head; - int m_tail; - int m_limit; - RxPacket **m_buffer; + int m_head; + int m_tail; + int m_size; + RxPacket **m_buffer; + uint64_t *m_shared_counter; + bool m_is_enabled; }; /************************ recoder ***************************/ @@ -294,11 +317,11 @@ public: * queueing packets * */ - void start_queue(uint32_t limit) { + void start_queue(uint32_t size, uint64_t *shared_counter) { if (m_pkt_buffer) { delete m_pkt_buffer; } - m_pkt_buffer = new RxPacketBuffer(limit); + m_pkt_buffer = new RxPacketBuffer(size, shared_counter); set_feature(QUEUE); } @@ -317,11 +340,11 @@ public: assert(m_pkt_buffer); - /* take the current */ + /* hold a pointer to the old one */ RxPacketBuffer *old_buffer = m_pkt_buffer; - /* allocate a new buffer */ - m_pkt_buffer = new RxPacketBuffer(old_buffer->get_limit()); + /* replace the old one with a new one and freeze the old */ + m_pkt_buffer = old_buffer->freeze_and_clone(); return old_buffer; } |