summaryrefslogtreecommitdiffstats
path: root/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/stateless/rx/trex_stateless_rx_port_mngr.cpp')
-rw-r--r--src/stateless/rx/trex_stateless_rx_port_mngr.cpp111
1 files changed, 109 insertions, 2 deletions
diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
index 2683dbe1..46fec432 100644
--- a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
+++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
@@ -23,8 +23,29 @@
#include "common/captureFile.h"
#include "trex_stateless_rx_core.h"
-/************************** latency feature ************/
-void RXLatency::handle_pkt(const rte_mbuf_t *m) {
+/**************************************
+ * latency RX feature
+ *
+ *************************************/
+RXLatency::RXLatency() {
+ m_rcv_all = false;
+ m_rfc2544 = NULL;
+ m_err_cntrs = NULL;
+
+ for (int i = 0; i < MAX_FLOW_STATS; i++) {
+ m_rx_pg_stat[i].clear();
+ m_rx_pg_stat_payload[i].clear();
+ }
+}
+
+void
+RXLatency::create(CRFC2544Info *rfc2544, CRxCoreErrCntrs *err_cntrs) {
+ m_rfc2544 = rfc2544;
+ m_err_cntrs = err_cntrs;
+}
+
+void
+RXLatency::handle_pkt(const rte_mbuf_t *m) {
CFlowStatParser parser;
if (m_rcv_all || parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) {
@@ -135,6 +156,92 @@ RXLatency::reset_stats() {
}
}
+/**************************************
+ * RX feature queue
+ *
+ *************************************/
+
+RXPacketBuffer::RXPacketBuffer(uint64_t size, uint64_t *shared_counter) {
+ m_buffer = nullptr;
+ m_head = 0;
+ m_tail = 0;
+ m_size = (size + 1); // for the empty/full difference 1 slot reserved
+ m_shared_counter = shared_counter;
+
+ /* reset the counter */
+ *m_shared_counter = 0;
+
+ /* generate queue */
+ m_buffer = new RXPacket*[m_size](); // zeroed
+
+ m_is_enabled = true;
+}
+
+RXPacketBuffer::~RXPacketBuffer() {
+ assert(m_buffer);
+
+ while (!is_empty()) {
+ RXPacket *pkt = pop();
+ delete pkt;
+ }
+ delete [] m_buffer;
+}
+
+RXPacketBuffer *
+RXPacketBuffer::freeze_and_clone() {
+ /* create a new one - same size and shared counter 0 */
+ RXPacketBuffer *new_buffer = new RXPacketBuffer(m_size, m_shared_counter);
+
+ /* freeze the current */
+ m_shared_counter = NULL;
+ m_is_enabled = false;
+
+ return new_buffer;
+}
+
+void
+RXPacketBuffer::handle_pkt(const rte_mbuf_t *m) {
+ assert(m_is_enabled);
+
+ /* if full - pop the oldest */
+ if (is_full()) {
+ delete pop();
+ }
+
+ /* push packet */
+ m_buffer[m_head] = new RXPacket(m);
+ m_head = next(m_head);
+
+ /* update the shared counter - control plane memory */
+ (*m_shared_counter)++;
+}
+
+RXPacket *
+RXPacketBuffer::pop() {
+ assert(m_is_enabled);
+ assert(!is_empty());
+
+ RXPacket *pkt = m_buffer[m_tail];
+ m_tail = next(m_tail);
+ (*m_shared_counter)--;
+ return pkt;
+}
+
+Json::Value
+RXPacketBuffer::to_json() const {
+
+ Json::Value output = Json::arrayValue;
+
+ int tmp = m_tail;
+ while (tmp != m_head) {
+ RXPacket *pkt = m_buffer[tmp];
+ output.append(pkt->to_json());
+ tmp = next(tmp);
+ }
+
+ return output;
+}
+
/****************************** packet recorder ****************************/
RXPacketRecorder::RXPacketRecorder() {