summaryrefslogtreecommitdiffstats
path: root/src/stateless/rx
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2016-11-07 18:47:23 +0200
committerimarom <imarom@cisco.com>2016-11-07 18:47:23 +0200
commitf9a0c5e2e1e1135cb0c0e6e192565e5b100c5a41 (patch)
treef09e19975f324d9c0eb717608473dcdbd334a608 /src/stateless/rx
parente85ea75669ea39e4f99519138a3a84e4df6eed2d (diff)
RX features - queueing
Signed-off-by: imarom <imarom@cisco.com>
Diffstat (limited to 'src/stateless/rx')
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.cpp10
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.h7
-rw-r--r--src/stateless/rx/trex_stateless_rx_defs.h56
-rw-r--r--src/stateless/rx/trex_stateless_rx_port_mngr.cpp2
-rw-r--r--src/stateless/rx/trex_stateless_rx_port_mngr.h69
5 files changed, 118 insertions, 26 deletions
diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp
index 3fe72f54..2a678365 100644
--- a/src/stateless/rx/trex_stateless_rx_core.cpp
+++ b/src/stateless/rx/trex_stateless_rx_core.cpp
@@ -366,6 +366,16 @@ CRxCoreStateless::stop_capture(uint8_t port_id) {
}
void
+CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size, uint64_t *shared_counter) {
+ m_rx_port_mngr[port_id].start_queue(size, shared_counter);
+}
+
+void
+CRxCoreStateless::stop_queue(uint8_t port_id) {
+ m_rx_port_mngr[port_id].stop_queue();
+}
+
+void
CRxCoreStateless::enable_latency() {
for (int i = 0; i < m_max_ports; i++) {
m_rx_port_mngr[i].enable_latency();
diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h
index 689b28ec..b5844583 100644
--- a/src/stateless/rx/trex_stateless_rx_core.h
+++ b/src/stateless/rx/trex_stateless_rx_core.h
@@ -128,6 +128,13 @@ class CRxCoreStateless {
void stop_capture(uint8_t port_id);
/**
+ * start RX queueing of packets
+ *
+ */
+ void start_queue(uint8_t port_id, uint64_t size, uint64_t *shared_counter);
+ void stop_queue(uint8_t port_id);
+
+ /**
* enable latency feature for RX packets
* will be apply to all ports
*/
diff --git a/src/stateless/rx/trex_stateless_rx_defs.h b/src/stateless/rx/trex_stateless_rx_defs.h
index ee124270..bdd86a72 100644
--- a/src/stateless/rx/trex_stateless_rx_defs.h
+++ b/src/stateless/rx/trex_stateless_rx_defs.h
@@ -51,8 +51,8 @@ class CRxSlCfg {
*
*/
typedef enum rx_filter_mode_ {
- RX_FILTER_MODE_HW,
- RX_FILTER_MODE_ALL
+ RX_FILTER_MODE_HW,
+ RX_FILTER_MODE_ALL
} rx_filter_mode_e;
/**
@@ -95,4 +95,56 @@ public:
uint64_t m_shared_counter;
};
+
+class RXQueueInfo {
+public:
+
+ RXQueueInfo() {
+ m_is_active = false;
+ m_shared_counter = 0;
+ }
+
+ void enable(uint64_t size) {
+ m_size = size;
+ m_is_active = true;
+ }
+
+ void disable() {
+ m_is_active = false;
+ m_size = 0;
+ }
+
+
+ void to_json(Json::Value &output) const {
+ output["is_active"] = m_is_active;
+ if (m_is_active) {
+ output["size"] = Json::UInt64(m_size);
+ output["count"] = Json::UInt64(m_shared_counter);
+ }
+ }
+
+public:
+ bool m_is_active;
+ uint64_t m_size;
+ uint64_t m_shared_counter;
+};
+
+
+/**
+ * holds all the RX features info
+ *
+ * @author imarom (11/7/2016)
+ */
+class RXFeaturesInfo {
+public:
+ RXCaptureInfo m_rx_capture_info;
+ RXQueueInfo m_rx_queue_info;
+
+ void to_json(Json::Value &msg) const {
+ m_rx_capture_info.to_json(msg["sniffer"]);
+ m_rx_queue_info.to_json(msg["queue"]);
+ }
+};
+
#endif /* __TREX_STATELESS_RX_DEFS_H__ */
+
diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
index 7283f703..2683dbe1 100644
--- a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
+++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
@@ -216,7 +216,7 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) {
}
if (is_feature_set(QUEUE)) {
- m_pkt_buffer->push(new RxPacket(m));
+ m_pkt_buffer->handle_pkt(m);
}
}
diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h
index 90527f0c..aa8ba8e9 100644
--- a/src/stateless/rx/trex_stateless_rx_port_mngr.h
+++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h
@@ -33,7 +33,6 @@ class CPortLatencyHWBase;
class CRFC2544Info;
class CRxCoreErrCntrs;
-
class RXLatency {
public:
@@ -126,13 +125,18 @@ private:
class RxPacketBuffer {
public:
- RxPacketBuffer(int limit) {
- m_buffer = nullptr;
- m_head = 0;
- m_tail = 0;
- m_limit = limit;
+ RxPacketBuffer(uint64_t size, uint64_t *shared_counter) {
+ m_buffer = nullptr;
+ m_head = 0;
+ m_tail = 0;
+ m_size = (size + 1); // for the empty/full difference 1 slot reserved
+ m_shared_counter = shared_counter;
+
+ *m_shared_counter = 0;
+
+ m_buffer = new RxPacket*[m_size](); // zeroed
- m_buffer = new RxPacket*[limit](); // zeroed
+ m_is_enabled = true;
}
~RxPacketBuffer() {
@@ -145,6 +149,18 @@ public:
delete [] m_buffer;
}
+ /* freeze the data structure - no more packets can be pushed / poped */
+ RxPacketBuffer * freeze_and_clone() {
+ /* create a new one */
+ RxPacketBuffer *new_buffer = new RxPacketBuffer(m_size, m_shared_counter);
+
+ /* freeze the current */
+ m_shared_counter = NULL;
+ m_is_enabled = false;
+
+ return new_buffer;
+ }
+
bool is_empty() const {
return (m_head == m_tail);
}
@@ -153,15 +169,17 @@ public:
return ( next(m_head) == m_tail);
}
- int get_limit() const {
- return m_limit;
- }
+ void handle_pkt(const rte_mbuf_t *m) {
+ assert(m_is_enabled);
- void push(RxPacket *pkt) {
+ /* if full - pop the oldest */
if (is_full()) {
delete pop();
}
- m_buffer[m_head] = pkt;
+
+ (*m_shared_counter)++;
+
+ m_buffer[m_head] = new RxPacket(m);
m_head = next(m_head);
}
@@ -169,7 +187,7 @@ public:
* generate a JSON output of the queue
*
*/
- Json::Value to_json() {
+ Json::Value to_json() const {
Json::Value output = Json::arrayValue;
@@ -185,21 +203,26 @@ public:
private:
int next(int v) const {
- return ( (v + 1) % m_limit );
+ return ( (v + 1) % m_size );
}
/* pop in case of full queue - internal usage */
RxPacket * pop() {
+ assert(m_is_enabled);
assert(!is_empty());
+
RxPacket *pkt = m_buffer[m_tail];
m_tail = next(m_tail);
+ (*m_shared_counter)--;
return pkt;
}
- int m_head;
- int m_tail;
- int m_limit;
- RxPacket **m_buffer;
+ int m_head;
+ int m_tail;
+ int m_size;
+ RxPacket **m_buffer;
+ uint64_t *m_shared_counter;
+ bool m_is_enabled;
};
/************************ recoder ***************************/
@@ -294,11 +317,11 @@ public:
* queueing packets
*
*/
- void start_queue(uint32_t limit) {
+ void start_queue(uint32_t size, uint64_t *shared_counter) {
if (m_pkt_buffer) {
delete m_pkt_buffer;
}
- m_pkt_buffer = new RxPacketBuffer(limit);
+ m_pkt_buffer = new RxPacketBuffer(size, shared_counter);
set_feature(QUEUE);
}
@@ -317,11 +340,11 @@ public:
assert(m_pkt_buffer);
- /* take the current */
+ /* hold a pointer to the old one */
RxPacketBuffer *old_buffer = m_pkt_buffer;
- /* allocate a new buffer */
- m_pkt_buffer = new RxPacketBuffer(old_buffer->get_limit());
+ /* replace the old one with a new one and freeze the old */
+ m_pkt_buffer = old_buffer->freeze_and_clone();
return old_buffer;
}