summaryrefslogtreecommitdiffstats
path: root/src/stateless/rx
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2016-11-24 16:58:34 +0200
committerimarom <imarom@cisco.com>2016-11-24 16:58:34 +0200
commitd0c838e0abd0b39df54547623b2fa87fcc8aa807 (patch)
treef6e49bc3289065006ba0972b23f7b69a4796edd6 /src/stateless/rx
parentd6423d0e2aa13f03bfee59c6101e539c8989df73 (diff)
self code review
Signed-off-by: imarom <imarom@cisco.com>
Diffstat (limited to 'src/stateless/rx')
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.h2
-rw-r--r--src/stateless/rx/trex_stateless_rx_port_mngr.cpp111
-rw-r--r--src/stateless/rx/trex_stateless_rx_port_mngr.h162
3 files changed, 160 insertions, 115 deletions
diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h
index 519724d8..9df36310 100644
--- a/src/stateless/rx/trex_stateless_rx_core.h
+++ b/src/stateless/rx/trex_stateless_rx_core.h
@@ -111,7 +111,7 @@ class CRxCoreStateless {
double get_cpu_util();
void update_cpu_util();
- RxPacketBuffer *get_rx_queue_pkts(uint8_t port_id) {
+ RXPacketBuffer *get_rx_queue_pkts(uint8_t port_id) {
return m_rx_port_mngr[port_id].get_pkt_buffer();
}
diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
index 2683dbe1..46fec432 100644
--- a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
+++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
@@ -23,8 +23,29 @@
#include "common/captureFile.h"
#include "trex_stateless_rx_core.h"
-/************************** latency feature ************/
-void RXLatency::handle_pkt(const rte_mbuf_t *m) {
+/**************************************
+ * latency RX feature
+ *
+ *************************************/
+RXLatency::RXLatency() {
+ m_rcv_all = false;
+ m_rfc2544 = NULL;
+ m_err_cntrs = NULL;
+
+ for (int i = 0; i < MAX_FLOW_STATS; i++) {
+ m_rx_pg_stat[i].clear();
+ m_rx_pg_stat_payload[i].clear();
+ }
+}
+
+void
+RXLatency::create(CRFC2544Info *rfc2544, CRxCoreErrCntrs *err_cntrs) {
+ m_rfc2544 = rfc2544;
+ m_err_cntrs = err_cntrs;
+}
+
+void
+RXLatency::handle_pkt(const rte_mbuf_t *m) {
CFlowStatParser parser;
if (m_rcv_all || parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) {
@@ -135,6 +156,92 @@ RXLatency::reset_stats() {
}
}
+/**************************************
+ * RX feature queue
+ *
+ *************************************/
+
+RXPacketBuffer::RXPacketBuffer(uint64_t size, uint64_t *shared_counter) {
+ m_buffer = nullptr;
+ m_head = 0;
+ m_tail = 0;
+ m_size = (size + 1); // for the empty/full difference 1 slot reserved
+ m_shared_counter = shared_counter;
+
+ /* reset the counter */
+ *m_shared_counter = 0;
+
+ /* generate queue */
+ m_buffer = new RXPacket*[m_size](); // zeroed
+
+ m_is_enabled = true;
+}
+
+RXPacketBuffer::~RXPacketBuffer() {
+ assert(m_buffer);
+
+ while (!is_empty()) {
+ RXPacket *pkt = pop();
+ delete pkt;
+ }
+ delete [] m_buffer;
+}
+
+RXPacketBuffer *
+RXPacketBuffer::freeze_and_clone() {
+ /* create a new one - same size and shared counter 0 */
+ RXPacketBuffer *new_buffer = new RXPacketBuffer(m_size, m_shared_counter);
+
+ /* freeze the current */
+ m_shared_counter = NULL;
+ m_is_enabled = false;
+
+ return new_buffer;
+}
+
+void
+RXPacketBuffer::handle_pkt(const rte_mbuf_t *m) {
+ assert(m_is_enabled);
+
+ /* if full - pop the oldest */
+ if (is_full()) {
+ delete pop();
+ }
+
+ /* push packet */
+ m_buffer[m_head] = new RXPacket(m);
+ m_head = next(m_head);
+
+ /* update the shared counter - control plane memory */
+ (*m_shared_counter)++;
+}
+
+RXPacket *
+RXPacketBuffer::pop() {
+ assert(m_is_enabled);
+ assert(!is_empty());
+
+ RXPacket *pkt = m_buffer[m_tail];
+ m_tail = next(m_tail);
+ (*m_shared_counter)--;
+ return pkt;
+}
+
+Json::Value
+RXPacketBuffer::to_json() const {
+
+ Json::Value output = Json::arrayValue;
+
+ int tmp = m_tail;
+ while (tmp != m_head) {
+ RXPacket *pkt = m_buffer[tmp];
+ output.append(pkt->to_json());
+ tmp = next(tmp);
+ }
+
+ return output;
+}
+
/****************************** packet recorder ****************************/
RXPacketRecorder::RXPacketRecorder() {
diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h
index fd023ea9..9df42039 100644
--- a/src/stateless/rx/trex_stateless_rx_port_mngr.h
+++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h
@@ -27,30 +27,21 @@
#include "common/captureFile.h"
-/************************* latency ***********************/
class CPortLatencyHWBase;
class CRFC2544Info;
class CRxCoreErrCntrs;
+/**************************************
+ * RX feature latency
+ *
+ *************************************/
class RXLatency {
public:
- RXLatency() {
- m_rcv_all = false;
- m_rfc2544 = NULL;
- m_err_cntrs = NULL;
+ RXLatency();
- for (int i = 0; i < MAX_FLOW_STATS; i++) {
- m_rx_pg_stat[i].clear();
- m_rx_pg_stat_payload[i].clear();
- }
- }
-
- void create(CRFC2544Info *rfc2544, CRxCoreErrCntrs *err_cntrs) {
- m_rfc2544 = rfc2544;
- m_err_cntrs = err_cntrs;
- }
+ void create(CRFC2544Info *rfc2544, CRxCoreErrCntrs *err_cntrs);
void reset_stats();
@@ -81,26 +72,24 @@ public:
CRxCoreErrCntrs *m_err_cntrs;
};
-/************************ queue ***************************/
-
/**
* describes a single saved RX packet
*
*/
-class RxPacket {
+class RXPacket {
public:
- RxPacket(const rte_mbuf_t *m) {
+ RXPacket(const rte_mbuf_t *m) {
/* assume single part packet */
assert(m->nb_segs == 1);
m_size = m->pkt_len;
const uint8_t *p = rte_pktmbuf_mtod(m, uint8_t *);
- m_raw = (uint8_t *)malloc(m_size);
+ m_raw = new uint8_t[m_size];
memcpy(m_raw, p, m_size);
- /* save the packet timestamp */
+ /* generate a packet timestamp */
m_timestamp = now_sec();
}
@@ -112,9 +101,9 @@ public:
return output;
}
- ~RxPacket() {
+ ~RXPacket() {
if (m_raw) {
- delete m_raw;
+ delete [] m_raw;
}
}
@@ -125,48 +114,36 @@ private:
dsec_t m_timestamp;
};
-/**
- * a simple cyclic buffer to hold RX packets
- *
- */
-class RxPacketBuffer {
-public:
-
- RxPacketBuffer(uint64_t size, uint64_t *shared_counter) {
- m_buffer = nullptr;
- m_head = 0;
- m_tail = 0;
- m_size = (size + 1); // for the empty/full difference 1 slot reserved
- m_shared_counter = shared_counter;
- *m_shared_counter = 0;
-
- m_buffer = new RxPacket*[m_size](); // zeroed
-
- m_is_enabled = true;
- }
+/**************************************
+ * RX feature queue
+ *
+ *************************************/
- ~RxPacketBuffer() {
- assert(m_buffer);
+class RXPacketBuffer {
+public:
- while (!is_empty()) {
- RxPacket *pkt = pop();
- delete pkt;
- }
- delete [] m_buffer;
- }
+ RXPacketBuffer(uint64_t size, uint64_t *shared_counter);
+ ~RXPacketBuffer();
- /* freeze the data structure - no more packets can be pushed / poped */
- RxPacketBuffer * freeze_and_clone() {
- /* create a new one */
- RxPacketBuffer *new_buffer = new RxPacketBuffer(m_size, m_shared_counter);
+ /**
+ * handle a new packet
+ *
+ */
+ void handle_pkt(const rte_mbuf_t *m);
+
+ /**
+ * freezes the queue and clones a new one
+ *
+ */
+ RXPacketBuffer * freeze_and_clone();
- /* freeze the current */
- m_shared_counter = NULL;
- m_is_enabled = false;
+ /**
+ * generate a JSON output of the queue
+ *
+ */
+ Json::Value to_json() const;
- return new_buffer;
- }
bool is_empty() const {
return (m_head == m_tail);
@@ -176,72 +153,33 @@ public:
return ( next(m_head) == m_tail);
}
- void handle_pkt(const rte_mbuf_t *m) {
- assert(m_is_enabled);
-
- /* if full - pop the oldest */
- if (is_full()) {
- delete pop();
- }
-
- (*m_shared_counter)++;
-
- m_buffer[m_head] = new RxPacket(m);
- m_head = next(m_head);
- }
-
- /**
- * generate a JSON output of the queue
- *
- */
- Json::Value to_json() const {
-
- Json::Value output = Json::arrayValue;
-
- int tmp = m_tail;
- while (tmp != m_head) {
- RxPacket *pkt = m_buffer[tmp];
- output.append(pkt->to_json());
- tmp = next(tmp);
- }
-
- return output;
- }
-
private:
int next(int v) const {
return ( (v + 1) % m_size );
}
/* pop in case of full queue - internal usage */
- RxPacket * pop() {
- assert(m_is_enabled);
- assert(!is_empty());
-
- RxPacket *pkt = m_buffer[m_tail];
- m_tail = next(m_tail);
- (*m_shared_counter)--;
- return pkt;
- }
+ RXPacket * pop();
int m_head;
int m_tail;
int m_size;
- RxPacket **m_buffer;
+ RXPacket **m_buffer;
uint64_t *m_shared_counter;
bool m_is_enabled;
};
-/************************ recoder ***************************/
-/**
- * RX packet recorder to PCAP file
+/**************************************
+ * RX feature PCAP recorder
*
- */
+ *************************************/
+
class RXPacketRecorder {
public:
RXPacketRecorder();
~RXPacketRecorder();
+
void start(const std::string &pcap, uint64_t limit, uint64_t *shared_counter);
void stop();
void handle_pkt(const rte_mbuf_t *m);
@@ -251,7 +189,7 @@ private:
CCapPktRaw m_pkt;
dsec_t m_epoch;
uint64_t m_limit;
- uint64_t *m_shared_counter;
+ uint64_t *m_shared_counter;
};
@@ -265,9 +203,9 @@ private:
class RXPortManager {
public:
enum features_t {
- LATENCY = 0x1,
+ LATENCY = 0x1,
CAPTURE = 0x2,
- QUEUE = 0x4
+ QUEUE = 0x4
};
RXPortManager() {
@@ -328,7 +266,7 @@ public:
if (m_pkt_buffer) {
delete m_pkt_buffer;
}
- m_pkt_buffer = new RxPacketBuffer(size, shared_counter);
+ m_pkt_buffer = new RXPacketBuffer(size, shared_counter);
set_feature(QUEUE);
}
@@ -340,7 +278,7 @@ public:
unset_feature(QUEUE);
}
- RxPacketBuffer *get_pkt_buffer() {
+ RXPacketBuffer *get_pkt_buffer() {
if (!is_feature_set(QUEUE)) {
return NULL;
}
@@ -348,7 +286,7 @@ public:
assert(m_pkt_buffer);
/* hold a pointer to the old one */
- RxPacketBuffer *old_buffer = m_pkt_buffer;
+ RXPacketBuffer *old_buffer = m_pkt_buffer;
/* replace the old one with a new one and freeze the old */
m_pkt_buffer = old_buffer->freeze_and_clone();
@@ -407,7 +345,7 @@ private:
uint32_t m_features;
RXPacketRecorder m_recorder;
RXLatency m_latency;
- RxPacketBuffer *m_pkt_buffer;
+ RXPacketBuffer *m_pkt_buffer;
CCpuUtlDp *m_cpu_dp_u;
CPortLatencyHWBase *m_io;
};