summaryrefslogtreecommitdiffstats
path: root/src/stateless
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2016-11-30 13:00:54 +0200
committerimarom <imarom@cisco.com>2016-11-30 13:35:40 +0200
commit051a334b6f57280faa9dd90eeab922fb51f3c89e (patch)
treed7b1784e99edc8b71befc7f6a84d0cedb80f7667 /src/stateless
parentba3a6e1edd85873be62f17881e4a95df7daf098d (diff)
reply to messages
Signed-off-by: imarom <imarom@cisco.com>
Diffstat (limited to 'src/stateless')
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp52
-rw-r--r--src/stateless/cp/trex_stateless_port.h18
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp29
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h158
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.cpp14
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.h19
-rw-r--r--src/stateless/rx/trex_stateless_rx_defs.h98
-rw-r--r--src/stateless/rx/trex_stateless_rx_port_mngr.cpp137
-rw-r--r--src/stateless/rx/trex_stateless_rx_port_mngr.h56
9 files changed, 300 insertions, 281 deletions
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index ca185754..7edf1a31 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -936,61 +936,67 @@ TrexStatelessPort::remove_and_delete_all_streams() {
void
TrexStatelessPort::start_rx_capture(const std::string &pcap_filename, uint64_t limit) {
-
- m_rx_features_info.m_rx_capture_info.enable(pcap_filename, limit);
-
- TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(m_port_id, m_rx_features_info.m_rx_capture_info);
+ static MsgReply<bool> reply;
+
+ reply.reset();
+
+ TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(m_port_id, pcap_filename, limit, reply);
send_message_to_rx((TrexStatelessCpToRxMsgBase *)msg);
/* as below, must wait for ACK from RX core before returning ACK */
- msg->wait_for_reply();
+ reply.wait_for_reply();
}
void
TrexStatelessPort::stop_rx_capture() {
TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopCapture(m_port_id);
send_message_to_rx(msg);
-
- /* update our cached data */
- m_rx_features_info.m_rx_capture_info.disable();
}
void
TrexStatelessPort::start_rx_queue(uint64_t size) {
-
- m_rx_features_info.m_rx_queue_info.enable(size);
-
- TrexStatelessRxStartQueue *msg = new TrexStatelessRxStartQueue(m_port_id, m_rx_features_info.m_rx_queue_info);
+ static MsgReply<bool> reply;
+
+ reply.reset();
+
+ TrexStatelessRxStartQueue *msg = new TrexStatelessRxStartQueue(m_port_id, size, reply);
send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
/* we cannot return ACK to the user until the RX core has approved
this might cause the user to lose some packets from the queue
*/
- msg->wait_for_reply();
+ reply.wait_for_reply();
}
void
TrexStatelessPort::stop_rx_queue() {
TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopQueue(m_port_id);
send_message_to_rx(msg);
-
- /* update our cached data */
- m_rx_features_info.m_rx_queue_info.disable();
}
-RXPacketBuffer *
+const RXPacketBuffer *
TrexStatelessPort::get_rx_queue_pkts() {
+ static MsgReply<const RXPacketBuffer *> reply;
+
+ reply.reset();
- if (m_rx_features_info.m_rx_queue_info.is_empty()) {
- return NULL;
- }
+ TrexStatelessRxQueueGetPkts *msg = new TrexStatelessRxQueueGetPkts(m_port_id, reply);
+ send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
+
+ return reply.wait_for_reply();
+}
+
+Json::Value
+TrexStatelessPort::rx_features_to_json() {
+ static MsgReply<Json::Value> reply;
+
+ reply.reset();
- TrexStatelessRxQueueGetPkts *msg = new TrexStatelessRxQueueGetPkts(m_port_id);
+ TrexStatelessRxFeaturesToJson *msg = new TrexStatelessRxFeaturesToJson(m_port_id, reply);
send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
- RXPacketBuffer *pkt_buffer = msg->wait_for_reply();
- return pkt_buffer;
+ return reply.wait_for_reply();
}
/************* Trex Port Owner **************/
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index 4aa7ff36..f2829b8a 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -393,21 +393,19 @@ public:
*/
void stop_rx_queue();
-
/**
- * get the RX features info object
+ * fetch the RX queue packets from the queue
*
*/
- const RXFeaturesInfo &get_rx_features() {
- return m_rx_features_info;
- }
+ const RXPacketBuffer *get_rx_queue_pkts();
/**
- * fetch the RX queue packets from the queue
+ * generate a JSON describing the status
+ * of the RX features
*
*/
- RXPacketBuffer *get_rx_queue_pkts();
-
+ Json::Value rx_features_to_json();
+
/**
* return the port attribute object
*
@@ -447,7 +445,7 @@ private:
*
*/
void send_message_to_rx(TrexStatelessCpToRxMsgBase *msg);
-
+
/**
* when a port stops, perform various actions
*
@@ -503,8 +501,6 @@ private:
int m_pending_async_stop_event;
- RXFeaturesInfo m_rx_features_info;
-
};
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index cad4fe7a..53d5a87e 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -259,9 +259,10 @@ bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) {
bool
TrexStatelessRxStartCapture::handle(CRxCoreStateless *rx_core) {
- rx_core->start_recorder(m_port_id, m_pcap_filename, m_limit, m_shared_counter);
+ rx_core->start_recorder(m_port_id, m_pcap_filename, m_limit);
- set_reply(true);
+ /* mark as done */
+ m_reply.set_reply(true);
return true;
}
@@ -275,10 +276,10 @@ TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) {
bool
TrexStatelessRxStartQueue::handle(CRxCoreStateless *rx_core) {
- rx_core->start_queue(m_port_id, m_size, m_shared_counter);
+ rx_core->start_queue(m_port_id, m_size);
/* mark as done */
- set_reply(true);
+ m_reply.set_reply(true);
return true;
}
@@ -292,12 +293,24 @@ TrexStatelessRxStopQueue::handle(CRxCoreStateless *rx_core) {
-bool TrexStatelessRxQueueGetPkts::handle(CRxCoreStateless *rx_core) {
- RXPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id);
- assert(pkt_buffer);
+bool
+TrexStatelessRxQueueGetPkts::handle(CRxCoreStateless *rx_core) {
+ const RXPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id);
/* set the reply */
- set_reply(pkt_buffer);
+ m_reply.set_reply(pkt_buffer);
return true;
}
+
+
+bool
+TrexStatelessRxFeaturesToJson::handle(CRxCoreStateless *rx_core) {
+ Json::Value output = rx_core->get_rx_port_mngr(m_port_id).to_json();
+
+ /* set the reply */
+ m_reply.set_reply(output);
+
+ return true;
+}
+
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index b1d9117f..303548aa 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -35,6 +35,57 @@ class CFlowGenListPerThread;
class RXPacketBuffer;
/**
+ * Generic message reply object
+ *
+ * @author imarom (11/27/2016)
+ */
+template<typename T> class MsgReply {
+
+public:
+
+ MsgReply() {
+ reset();
+ }
+
+ void reset() {
+ m_pending = true;
+ }
+
+ bool is_pending() const {
+ return m_pending;
+ }
+
+ void set_reply(const T &reply) {
+ m_reply = reply;
+
+ /* before marking as done make sure all stores are committed */
+ asm volatile("mfence" ::: "memory");
+ m_pending = false;
+ }
+
+ T wait_for_reply(int timeout_ms = 100, int backoff_ms = 1) {
+ int guard = timeout_ms;
+
+ while (is_pending()) {
+ guard -= backoff_ms;
+ if (guard < 0) {
+ throw TrexException("timeout: failed to get reply from core");
+ }
+
+ delay(backoff_ms);
+ }
+
+ return m_reply;
+
+ }
+
+protected:
+ volatile bool m_pending;
+ T m_reply;
+};
+
+
+/**
* defines the base class for CP to DP messages
*
* @author imarom (27-Oct-15)
@@ -408,52 +459,6 @@ public:
};
-/**
- * defines the base class for CP to RX with reply
- *
- * @author imarom (11/27/2016)
- */
-template<typename T> class TrexStatelessCpToRxMsgReply : public TrexStatelessCpToRxMsgBase {
-
-public:
-
- TrexStatelessCpToRxMsgReply() {
- m_pending = true;
- }
-
- bool is_pending() const {
- return m_pending;
- }
-
- void set_reply(const T &reply) {
- m_reply = reply;
-
- /* before marking as done - memory fence */
- asm volatile("mfence" ::: "memory");
- m_pending = false;
- }
-
- T wait_for_reply(int timeout_ms = 100, int backoff_ms = 1) {
- int guard = timeout_ms;
-
- while (is_pending()) {
- guard -= backoff_ms;
- if (guard < 0) {
- throw TrexException("timeout: RX core has failed to reply");
- }
-
- delay(backoff_ms);
- }
-
- return m_reply;
-
- }
-
-protected:
- volatile bool m_pending;
- T m_reply;
-};
-
class TrexStatelessRxEnableLatency : public TrexStatelessCpToRxMsgBase {
bool handle (CRxCoreStateless *rx_core);
@@ -469,22 +474,25 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase {
-class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgReply<bool> {
+class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase {
public:
- TrexStatelessRxStartCapture(uint8_t port_id, RXCaptureInfo &rx_capture_info) {
+ TrexStatelessRxStartCapture(uint8_t port_id,
+ const std::string &pcap_filename,
+ uint64_t limit,
+ MsgReply<bool> &reply) : m_reply(reply) {
+
m_port_id = port_id;
- m_limit = rx_capture_info.m_limit;
- m_pcap_filename = rx_capture_info.m_pcap_filename;
- m_shared_counter = &rx_capture_info.m_shared_counter;
+ m_limit = limit;
+ m_pcap_filename = pcap_filename;
}
virtual bool handle(CRxCoreStateless *rx_core);
private:
- uint8_t m_port_id;
- std::string m_pcap_filename;
- uint64_t m_limit;
- uint64_t *m_shared_counter;
+ uint8_t m_port_id;
+ std::string m_pcap_filename;
+ uint64_t m_limit;
+ MsgReply<bool> &m_reply;
};
@@ -501,12 +509,14 @@ private:
};
-class TrexStatelessRxStartQueue : public TrexStatelessCpToRxMsgReply<bool> {
+class TrexStatelessRxStartQueue : public TrexStatelessCpToRxMsgBase {
public:
- TrexStatelessRxStartQueue(uint8_t port_id, RXQueueInfo &rx_queue_info) {
+ TrexStatelessRxStartQueue(uint8_t port_id,
+ uint64_t size,
+ MsgReply<bool> &reply) : m_reply(reply) {
+
m_port_id = port_id;
- m_size = rx_queue_info.m_size;
- m_shared_counter = &rx_queue_info.m_shared_counter;
+ m_size = size;
}
virtual bool handle(CRxCoreStateless *rx_core);
@@ -514,7 +524,7 @@ public:
private:
uint8_t m_port_id;
uint64_t m_size;
- uint64_t *m_shared_counter;
+ MsgReply<bool> &m_reply;
};
@@ -532,10 +542,10 @@ private:
-class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgReply<RXPacketBuffer *> {
+class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgBase {
public:
- TrexStatelessRxQueueGetPkts(uint8_t port_id) {
+ TrexStatelessRxQueueGetPkts(uint8_t port_id, MsgReply<const RXPacketBuffer *> &reply) : m_reply(reply) {
m_port_id = port_id;
}
@@ -546,8 +556,30 @@ public:
virtual bool handle(CRxCoreStateless *rx_core);
private:
- uint8_t m_port_id;
+ uint8_t m_port_id;
+ MsgReply<const RXPacketBuffer *> &m_reply;
+
};
+/**
+ * a request from RX core to dump to Json the RX features
+ */
+class TrexStatelessRxFeaturesToJson : public TrexStatelessCpToRxMsgBase {
+public:
+
+ TrexStatelessRxFeaturesToJson(uint8_t port_id, MsgReply<Json::Value> &reply) : m_reply(reply) {
+ m_port_id = port_id;
+ }
+
+ /**
+ * virtual function to handle a message
+ *
+ */
+ virtual bool handle(CRxCoreStateless *rx_core);
+
+private:
+ uint8_t m_port_id;
+ MsgReply<Json::Value> &m_reply;
+};
#endif /* __TREX_STATELESS_MESSAGING_H__ */
diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp
index a1ff9c6a..b24fcb8f 100644
--- a/src/stateless/rx/trex_stateless_rx_core.cpp
+++ b/src/stateless/rx/trex_stateless_rx_core.cpp
@@ -374,8 +374,8 @@ double CRxCoreStateless::get_cpu_util() {
void
-CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter) {
- m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit, shared_counter);
+CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit) {
+ m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit);
}
void
@@ -384,8 +384,8 @@ CRxCoreStateless::stop_recorder(uint8_t port_id) {
}
void
-CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size, uint64_t *shared_counter) {
- m_rx_port_mngr[port_id].start_queue(size, shared_counter);
+CRxCoreStateless::start_queue(uint8_t port_id, uint64_t size) {
+ m_rx_port_mngr[port_id].start_queue(size);
}
void
@@ -407,3 +407,9 @@ CRxCoreStateless::disable_latency() {
}
}
+const RXPortManager &
+CRxCoreStateless::get_rx_port_mngr(uint8_t port_id) {
+ assert(port_id < m_max_ports);
+ return m_rx_port_mngr[port_id];
+
+}
diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h
index 8e50a46e..b27a7ca5 100644
--- a/src/stateless/rx/trex_stateless_rx_core.h
+++ b/src/stateless/rx/trex_stateless_rx_core.h
@@ -111,27 +111,22 @@ class CRxCoreStateless {
double get_cpu_util();
void update_cpu_util();
- RXPacketBuffer *get_rx_queue_pkts(uint8_t port_id) {
+ const RXPacketBuffer *get_rx_queue_pkts(uint8_t port_id) {
return m_rx_port_mngr[port_id].get_pkt_buffer();
}
/**
- * start capturing of RX packets on a specific port
- *
- * @author imarom (11/2/2016)
- *
- * @param port_id
- * @param pcap_filename
- * @param limit
+ * start capturing of RX packets on a specific port
+ *
*/
- void start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter);
+ void start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit);
void stop_recorder(uint8_t port_id);
/**
* start RX queueing of packets
*
*/
- void start_queue(uint8_t port_id, uint64_t size, uint64_t *shared_counter);
+ void start_queue(uint8_t port_id, uint64_t size);
void stop_queue(uint8_t port_id);
/**
@@ -141,6 +136,8 @@ class CRxCoreStateless {
void enable_latency();
void disable_latency();
+ const RXPortManager &get_rx_port_mngr(uint8_t port_id);
+
private:
void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg);
bool periodic_check_for_cp_messages();
@@ -162,7 +159,7 @@ class CRxCoreStateless {
}
void try_rx_queues();
-
+
private:
TrexMonitor m_monitor;
uint32_t m_max_ports;
diff --git a/src/stateless/rx/trex_stateless_rx_defs.h b/src/stateless/rx/trex_stateless_rx_defs.h
index 7b1e0f32..3e5d2c38 100644
--- a/src/stateless/rx/trex_stateless_rx_defs.h
+++ b/src/stateless/rx/trex_stateless_rx_defs.h
@@ -55,103 +55,5 @@ typedef enum rx_filter_mode_ {
RX_FILTER_MODE_ALL
} rx_filter_mode_e;
-/**
- * holds RX capture info
- *
- */
-class RXCaptureInfo {
-public:
- RXCaptureInfo() {
- m_is_active = false;
- m_limit = 0;
- m_shared_counter = 0;
- }
-
- void enable(const std::string &pcap_filename, uint64_t limit) {
- m_pcap_filename = pcap_filename;
- m_limit = limit;
- m_is_active = true;
- }
-
- void disable() {
- m_is_active = false;
- m_pcap_filename = "";
- m_limit = 0;
- }
-
- bool is_empty() const {
- return (m_shared_counter == 0);
- }
-
- void to_json(Json::Value &output) const {
- output["is_active"] = m_is_active;
- if (m_is_active) {
- output["pcap_filename"] = m_pcap_filename;
- output["limit"] = Json::UInt64(m_limit);
- output["count"] = Json::UInt64(m_shared_counter);
- }
- }
-
-public:
- bool m_is_active;
- std::string m_pcap_filename;
- uint64_t m_limit;
- uint64_t m_shared_counter;
-};
-
-
-class RXQueueInfo {
-public:
-
- RXQueueInfo() {
- m_is_active = false;
- m_shared_counter = 0;
- }
-
- void enable(uint64_t size) {
- m_size = size;
- m_is_active = true;
- }
-
- void disable() {
- m_is_active = false;
- m_size = 0;
- }
-
- bool is_empty() const {
- return (m_shared_counter == 0);
- }
-
- void to_json(Json::Value &output) const {
- output["is_active"] = m_is_active;
- if (m_is_active) {
- output["size"] = Json::UInt64(m_size);
- output["count"] = Json::UInt64(m_shared_counter);
- }
- }
-
-public:
- bool m_is_active;
- uint64_t m_size;
- uint64_t m_shared_counter;
-};
-
-
-/**
- * holds all the RX features info
- *
- * @author imarom (11/7/2016)
- */
-class RXFeaturesInfo {
-public:
- RXCaptureInfo m_rx_capture_info;
- RXQueueInfo m_rx_queue_info;
-
- void to_json(Json::Value &msg) const {
- m_rx_capture_info.to_json(msg["sniffer"]);
- m_rx_queue_info.to_json(msg["queue"]);
- }
-};
-
#endif /* __TREX_STATELESS_RX_DEFS_H__ */
diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
index 78f4ac5c..00032e8b 100644
--- a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
+++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
@@ -156,25 +156,24 @@ RXLatency::reset_stats() {
}
}
+Json::Value
+RXLatency::to_json() const {
+ return Json::objectValue;
+}
+
/**************************************
* RX feature queue
*
*************************************/
-RXPacketBuffer::RXPacketBuffer(uint64_t size, uint64_t *shared_counter) {
+RXPacketBuffer::RXPacketBuffer(uint64_t size) {
m_buffer = nullptr;
m_head = 0;
m_tail = 0;
m_size = (size + 1); // for the empty/full difference 1 slot reserved
- m_shared_counter = shared_counter;
-
- /* reset the counter */
- *m_shared_counter = 0;
/* generate queue */
m_buffer = new RXPacket*[m_size](); // zeroed
-
- m_is_enabled = true;
}
RXPacketBuffer::~RXPacketBuffer() {
@@ -187,22 +186,8 @@ RXPacketBuffer::~RXPacketBuffer() {
delete [] m_buffer;
}
-RXPacketBuffer *
-RXPacketBuffer::freeze_and_clone() {
- /* create a new one - same size and shared counter 0 */
- RXPacketBuffer *new_buffer = new RXPacketBuffer(m_size, m_shared_counter);
-
- /* freeze the current */
- m_shared_counter = NULL;
- m_is_enabled = false;
-
- return new_buffer;
-}
-
void
RXPacketBuffer::push(const rte_mbuf_t *m) {
- assert(m_is_enabled);
-
/* if full - pop the oldest */
if (is_full()) {
delete pop();
@@ -211,22 +196,27 @@ RXPacketBuffer::push(const rte_mbuf_t *m) {
/* push packet */
m_buffer[m_head] = new RXPacket(m);
m_head = next(m_head);
-
- /* update the shared counter - control plane memory */
- (*m_shared_counter)++;
}
RXPacket *
RXPacketBuffer::pop() {
- assert(m_is_enabled);
assert(!is_empty());
RXPacket *pkt = m_buffer[m_tail];
m_tail = next(m_tail);
- (*m_shared_counter)--;
+
return pkt;
}
+uint64_t
+RXPacketBuffer::get_element_count() const {
+ if (m_head >= m_tail) {
+ return (m_head - m_tail);
+ } else {
+ return ( get_capacity() - (m_tail - m_head - 1) );
+ }
+}
+
Json::Value
RXPacketBuffer::to_json() const {
@@ -244,11 +234,11 @@ RXPacketBuffer::to_json() const {
void
-RXQueue::start(uint64_t size, uint64_t *shared_counter) {
+RXQueue::start(uint64_t size) {
if (m_pkt_buffer) {
delete m_pkt_buffer;
}
- m_pkt_buffer = new RXPacketBuffer(size, shared_counter);
+ m_pkt_buffer = new RXPacketBuffer(size);
}
void
@@ -259,10 +249,11 @@ RXQueue::stop() {
}
}
-RXPacketBuffer *
+const RXPacketBuffer *
RXQueue::fetch() {
- if (!m_pkt_buffer) {
+ /* if no buffer or the buffer is empty - give a NULL one */
+ if ( (!m_pkt_buffer) || (m_pkt_buffer->get_element_count() == 0) ) {
return nullptr;
}
@@ -270,7 +261,7 @@ RXQueue::fetch() {
RXPacketBuffer *old_buffer = m_pkt_buffer;
/* replace the old one with a new one and freeze the old */
- m_pkt_buffer = old_buffer->freeze_and_clone();
+ m_pkt_buffer = new RXPacketBuffer(old_buffer->get_capacity());
return old_buffer;
}
@@ -280,6 +271,18 @@ RXQueue::handle_pkt(const rte_mbuf_t *m) {
m_pkt_buffer->push(m);
}
+Json::Value
+RXQueue::to_json() const {
+ assert(m_pkt_buffer != NULL);
+
+ Json::Value output = Json::objectValue;
+
+ output["size"] = Json::UInt64(m_pkt_buffer->get_capacity());
+ output["count"] = Json::UInt64(m_pkt_buffer->get_element_count());
+
+ return output;
+}
+
/**************************************
* RX feature recorder
*
@@ -287,13 +290,15 @@ RXQueue::handle_pkt(const rte_mbuf_t *m) {
RXPacketRecorder::RXPacketRecorder() {
m_writer = NULL;
- m_shared_counter = NULL;
+ m_count = 0;
m_limit = 0;
m_epoch = -1;
+
+ m_pending_flush = false;
}
void
-RXPacketRecorder::start(const std::string &pcap, uint64_t limit, uint64_t *shared_counter) {
+RXPacketRecorder::start(const std::string &pcap, uint64_t limit) {
m_writer = CCapWriterFactory::CreateWriter(LIBPCAP, (char *)pcap.c_str());
if (m_writer == NULL) {
std::stringstream ss;
@@ -302,23 +307,29 @@ RXPacketRecorder::start(const std::string &pcap, uint64_t limit, uint64_t *share
}
assert(limit > 0);
+
m_limit = limit;
- m_shared_counter = shared_counter;
- (*m_shared_counter) = 0;
+ m_count = 0;
+ m_pending_flush = false;
+ m_pcap_filename = pcap;
}
void
RXPacketRecorder::stop() {
- if (m_writer) {
- delete m_writer;
- m_writer = NULL;
+ if (!m_writer) {
+ return;
}
+
+ delete m_writer;
+ m_writer = NULL;
}
void
RXPacketRecorder::flush_to_disk() {
- if (m_writer) {
+
+ if (m_writer && m_pending_flush) {
m_writer->flush_to_disk();
+ m_pending_flush = false;
}
}
@@ -344,15 +355,25 @@ RXPacketRecorder::handle_pkt(const rte_mbuf_t *m) {
memcpy(m_pkt.raw, p, m->pkt_len);
m_writer->write_packet(&m_pkt);
-
- m_limit--;
- (*m_shared_counter)++;
-
- if (m_limit == 0) {
+ m_count++;
+ m_pending_flush = true;
+
+ if (m_count == m_limit) {
stop();
}
+
}
+Json::Value
+RXPacketRecorder::to_json() const {
+ Json::Value output = Json::objectValue;
+
+ output["pcap_filename"] = m_pcap_filename;
+ output["limit"] = Json::UInt64(m_limit);
+ output["count"] = Json::UInt64(m_count);
+
+ return output;
+}
/**************************************
* Port manager
@@ -432,3 +453,31 @@ RXPortManager::tick() {
m_recorder.flush_to_disk();
}
}
+
+Json::Value
+RXPortManager::to_json() const {
+ Json::Value output = Json::objectValue;
+
+ if (is_feature_set(LATENCY)) {
+ output["latency"] = m_latency.to_json();
+ output["latency"]["is_active"] = true;
+ } else {
+ output["latency"]["is_active"] = false;
+ }
+
+ if (is_feature_set(RECORDER)) {
+ output["sniffer"] = m_recorder.to_json();
+ output["sniffer"]["is_active"] = true;
+ } else {
+ output["sniffer"]["is_active"] = false;
+ }
+
+ if (is_feature_set(QUEUE)) {
+ output["queue"] = m_queue.to_json();
+ output["queue"]["is_active"] = true;
+ } else {
+ output["queue"]["is_active"] = false;
+ }
+
+ return output;
+}
diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h
index 564b15d4..bc34b5aa 100644
--- a/src/stateless/rx/trex_stateless_rx_port_mngr.h
+++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h
@@ -47,6 +47,8 @@ public:
void handle_pkt(const rte_mbuf_t *m);
+ Json::Value to_json() const;
+
private:
bool is_flow_stat_id(uint32_t id) {
if ((id & 0x000fff00) == IP_ID_RESERVE_BASE) return true;
@@ -123,7 +125,7 @@ private:
class RXPacketBuffer {
public:
- RXPacketBuffer(uint64_t size, uint64_t *shared_counter);
+ RXPacketBuffer(uint64_t size);
~RXPacketBuffer();
/**
@@ -133,12 +135,6 @@ public:
void push(const rte_mbuf_t *m);
/**
- * freezes the queue and clones a new one
- *
- */
- RXPacketBuffer * freeze_and_clone();
-
- /**
* generate a JSON output of the queue
*
*/
@@ -153,6 +149,19 @@ public:
return ( next(m_head) == m_tail);
}
+ /**
+ * return the total amount of space possible
+ */
+ uint64_t get_capacity() const {
+ /* one slot is used for diff between full/empty */
+ return (m_size - 1);
+ }
+
+ /**
+ * returns how many elements are in the queue
+ */
+ uint64_t get_element_count() const;
+
private:
int next(int v) const {
return ( (v + 1) % m_size );
@@ -165,8 +174,6 @@ private:
int m_tail;
int m_size;
RXPacket **m_buffer;
- uint64_t *m_shared_counter;
- bool m_is_enabled;
};
@@ -184,13 +191,13 @@ public:
* start RX queue
*
*/
- void start(uint64_t size, uint64_t *shared_counter);
+ void start(uint64_t size);
/**
* fetch the current buffer
- *
+ * return NULL if no packets
*/
- RXPacketBuffer * fetch();
+ const RXPacketBuffer * fetch();
/**
* stop RX queue
@@ -200,6 +207,8 @@ public:
void handle_pkt(const rte_mbuf_t *m);
+ Json::Value to_json() const;
+
private:
RXPacketBuffer *m_pkt_buffer;
};
@@ -217,7 +226,7 @@ public:
stop();
}
- void start(const std::string &pcap, uint64_t limit, uint64_t *shared_counter);
+ void start(const std::string &pcap, uint64_t limit);
void stop();
void handle_pkt(const rte_mbuf_t *m);
@@ -227,12 +236,16 @@ public:
*/
void flush_to_disk();
+ Json::Value to_json() const;
+
private:
CFileWriterBase *m_writer;
+ std::string m_pcap_filename;
CCapPktRaw m_pkt;
dsec_t m_epoch;
uint64_t m_limit;
- uint64_t *m_shared_counter;
+ uint64_t m_count;
+ bool m_pending_flush;
};
@@ -277,8 +290,8 @@ public:
}
/* recorder */
- void start_recorder(const std::string &pcap, uint64_t limit_pkts, uint64_t *shared_counter) {
- m_recorder.start(pcap, limit_pkts, shared_counter);
+ void start_recorder(const std::string &pcap, uint64_t limit_pkts) {
+ m_recorder.start(pcap, limit_pkts);
set_feature(RECORDER);
}
@@ -288,8 +301,8 @@ public:
}
/* queue */
- void start_queue(uint32_t size, uint64_t *shared_counter) {
- m_queue.start(size, shared_counter);
+ void start_queue(uint32_t size) {
+ m_queue.start(size);
set_feature(QUEUE);
}
@@ -298,7 +311,7 @@ public:
unset_feature(QUEUE);
}
- RXPacketBuffer *get_pkt_buffer() {
+ const RXPacketBuffer *get_pkt_buffer() {
if (!is_feature_set(QUEUE)) {
return nullptr;
}
@@ -346,6 +359,11 @@ public:
return (!has_features_set());
}
+ /**
+ * write the status to a JSON format
+ */
+ Json::Value to_json() const;
+
private:
void clear_all_features() {