diff options
Diffstat (limited to 'src/stateless/rx/trex_stateless_rx_port_mngr.h')
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_port_mngr.h | 69 |
1 files changed, 46 insertions, 23 deletions
diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h index 90527f0c..aa8ba8e9 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.h +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h @@ -33,7 +33,6 @@ class CPortLatencyHWBase; class CRFC2544Info; class CRxCoreErrCntrs; - class RXLatency { public: @@ -126,13 +125,18 @@ private: class RxPacketBuffer { public: - RxPacketBuffer(int limit) { - m_buffer = nullptr; - m_head = 0; - m_tail = 0; - m_limit = limit; + RxPacketBuffer(uint64_t size, uint64_t *shared_counter) { + m_buffer = nullptr; + m_head = 0; + m_tail = 0; + m_size = (size + 1); // for the empty/full difference 1 slot reserved + m_shared_counter = shared_counter; + + *m_shared_counter = 0; + + m_buffer = new RxPacket*[m_size](); // zeroed - m_buffer = new RxPacket*[limit](); // zeroed + m_is_enabled = true; } ~RxPacketBuffer() { @@ -145,6 +149,18 @@ public: delete [] m_buffer; } + /* freeze the data structure - no more packets can be pushed / poped */ + RxPacketBuffer * freeze_and_clone() { + /* create a new one */ + RxPacketBuffer *new_buffer = new RxPacketBuffer(m_size, m_shared_counter); + + /* freeze the current */ + m_shared_counter = NULL; + m_is_enabled = false; + + return new_buffer; + } + bool is_empty() const { return (m_head == m_tail); } @@ -153,15 +169,17 @@ public: return ( next(m_head) == m_tail); } - int get_limit() const { - return m_limit; - } + void handle_pkt(const rte_mbuf_t *m) { + assert(m_is_enabled); - void push(RxPacket *pkt) { + /* if full - pop the oldest */ if (is_full()) { delete pop(); } - m_buffer[m_head] = pkt; + + (*m_shared_counter)++; + + m_buffer[m_head] = new RxPacket(m); m_head = next(m_head); } @@ -169,7 +187,7 @@ public: * generate a JSON output of the queue * */ - Json::Value to_json() { + Json::Value to_json() const { Json::Value output = Json::arrayValue; @@ -185,21 +203,26 @@ public: private: int next(int v) const { - return ( (v + 1) % m_limit ); + return ( (v + 1) % m_size ); } /* pop in case of full queue - internal usage */ RxPacket * pop() { + assert(m_is_enabled); assert(!is_empty()); + RxPacket *pkt = m_buffer[m_tail]; m_tail = next(m_tail); + (*m_shared_counter)--; return pkt; } - int m_head; - int m_tail; - int m_limit; - RxPacket **m_buffer; + int m_head; + int m_tail; + int m_size; + RxPacket **m_buffer; + uint64_t *m_shared_counter; + bool m_is_enabled; }; /************************ recoder ***************************/ @@ -294,11 +317,11 @@ public: * queueing packets * */ - void start_queue(uint32_t limit) { + void start_queue(uint32_t size, uint64_t *shared_counter) { if (m_pkt_buffer) { delete m_pkt_buffer; } - m_pkt_buffer = new RxPacketBuffer(limit); + m_pkt_buffer = new RxPacketBuffer(size, shared_counter); set_feature(QUEUE); } @@ -317,11 +340,11 @@ public: assert(m_pkt_buffer); - /* take the current */ + /* hold a pointer to the old one */ RxPacketBuffer *old_buffer = m_pkt_buffer; - /* allocate a new buffer */ - m_pkt_buffer = new RxPacketBuffer(old_buffer->get_limit()); + /* replace the old one with a new one and freeze the old */ + m_pkt_buffer = old_buffer->freeze_and_clone(); return old_buffer; } |