summaryrefslogtreecommitdiffstats
path: root/src/stateless
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2016-11-28 11:25:32 +0200
committerimarom <imarom@cisco.com>2016-11-28 11:25:32 +0200
commit537f5831c4400dea7fa15032c4cd6bd2fae86bb1 (patch)
tree10d3b4e158c0a0c3b6019b78a99660c0fbb18d97 /src/stateless
parentd0c838e0abd0b39df54547623b2fa87fcc8aa807 (diff)
RX features - self code review
Signed-off-by: imarom <imarom@cisco.com>
Diffstat (limited to 'src/stateless')
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp29
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp15
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h100
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.cpp32
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.h7
-rw-r--r--src/stateless/rx/trex_stateless_rx_port_mngr.cpp90
-rw-r--r--src/stateless/rx/trex_stateless_rx_port_mngr.h147
7 files changed, 282 insertions, 138 deletions
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index 75530ea3..ca185754 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -939,14 +939,19 @@ TrexStatelessPort::start_rx_capture(const std::string &pcap_filename, uint64_t l
m_rx_features_info.m_rx_capture_info.enable(pcap_filename, limit);
- TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStartCapture(m_port_id, m_rx_features_info.m_rx_capture_info);
- send_message_to_rx(msg);
+ TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(m_port_id, m_rx_features_info.m_rx_capture_info);
+ send_message_to_rx((TrexStatelessCpToRxMsgBase *)msg);
+
+ /* as below, must wait for ACK from RX core before returning ACK */
+ msg->wait_for_reply();
}
void
TrexStatelessPort::stop_rx_capture() {
TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopCapture(m_port_id);
send_message_to_rx(msg);
+
+ /* update our cached data */
m_rx_features_info.m_rx_capture_info.disable();
}
@@ -955,14 +960,21 @@ TrexStatelessPort::start_rx_queue(uint64_t size) {
m_rx_features_info.m_rx_queue_info.enable(size);
- TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStartQueue(m_port_id, m_rx_features_info.m_rx_queue_info);
- send_message_to_rx(msg);
+ TrexStatelessRxStartQueue *msg = new TrexStatelessRxStartQueue(m_port_id, m_rx_features_info.m_rx_queue_info);
+ send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
+
+ /* we cannot return ACK to the user until the RX core has approved
+ this might cause the user to lose some packets from the queue
+ */
+ msg->wait_for_reply();
}
void
TrexStatelessPort::stop_rx_queue() {
TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopQueue(m_port_id);
send_message_to_rx(msg);
+
+ /* update our cached data */
m_rx_features_info.m_rx_queue_info.disable();
}
@@ -974,13 +986,10 @@ TrexStatelessPort::get_rx_queue_pkts() {
return NULL;
}
- /* ask RX core for the pkt queue */
- TrexStatelessMsgReply<RXPacketBuffer *> msg_reply;
-
- TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxQueueGetPkts(m_port_id, msg_reply);
- send_message_to_rx(msg);
+ TrexStatelessRxQueueGetPkts *msg = new TrexStatelessRxQueueGetPkts(m_port_id);
+ send_message_to_rx( (TrexStatelessCpToRxMsgBase *)msg );
- RXPacketBuffer *pkt_buffer = msg_reply.wait_for_reply();
+ RXPacketBuffer *pkt_buffer = msg->wait_for_reply();
return pkt_buffer;
}
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index a8fb7ba9..cad4fe7a 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -259,14 +259,16 @@ bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) {
bool
TrexStatelessRxStartCapture::handle(CRxCoreStateless *rx_core) {
- rx_core->start_capture(m_port_id, m_pcap_filename, m_limit, m_shared_counter);
+ rx_core->start_recorder(m_port_id, m_pcap_filename, m_limit, m_shared_counter);
+ set_reply(true);
+
return true;
}
bool
TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) {
- rx_core->stop_capture(m_port_id);
+ rx_core->stop_recorder(m_port_id);
return true;
}
@@ -274,7 +276,10 @@ TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) {
bool
TrexStatelessRxStartQueue::handle(CRxCoreStateless *rx_core) {
rx_core->start_queue(m_port_id, m_size, m_shared_counter);
-
+
+ /* mark as done */
+ set_reply(true);
+
return true;
}
@@ -290,7 +295,9 @@ TrexStatelessRxStopQueue::handle(CRxCoreStateless *rx_core) {
bool TrexStatelessRxQueueGetPkts::handle(CRxCoreStateless *rx_core) {
RXPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id);
assert(pkt_buffer);
- m_reply.set(pkt_buffer);
+
+ /* set the reply */
+ set_reply(pkt_buffer);
return true;
}
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index ed2ec90e..b1d9117f 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -408,6 +408,53 @@ public:
};
+/**
+ * defines the base class for CP to RX with reply
+ *
+ * @author imarom (11/27/2016)
+ */
+template<typename T> class TrexStatelessCpToRxMsgReply : public TrexStatelessCpToRxMsgBase {
+
+public:
+
+ TrexStatelessCpToRxMsgReply() {
+ m_pending = true;
+ }
+
+ bool is_pending() const {
+ return m_pending;
+ }
+
+ void set_reply(const T &reply) {
+ m_reply = reply;
+
+ /* before marking as done - memory fence */
+ asm volatile("mfence" ::: "memory");
+ m_pending = false;
+ }
+
+ T wait_for_reply(int timeout_ms = 100, int backoff_ms = 1) {
+ int guard = timeout_ms;
+
+ while (is_pending()) {
+ guard -= backoff_ms;
+ if (guard < 0) {
+ throw TrexException("timeout: RX core has failed to reply");
+ }
+
+ delay(backoff_ms);
+ }
+
+ return m_reply;
+
+ }
+
+protected:
+ volatile bool m_pending;
+ T m_reply;
+};
+
+
class TrexStatelessRxEnableLatency : public TrexStatelessCpToRxMsgBase {
bool handle (CRxCoreStateless *rx_core);
};
@@ -421,7 +468,8 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase {
};
-class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase {
+
+class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgReply<bool> {
public:
TrexStatelessRxStartCapture(uint8_t port_id, RXCaptureInfo &rx_capture_info) {
m_port_id = port_id;
@@ -452,7 +500,8 @@ private:
uint8_t m_port_id;
};
-class TrexStatelessRxStartQueue : public TrexStatelessCpToRxMsgBase {
+
+class TrexStatelessRxStartQueue : public TrexStatelessCpToRxMsgReply<bool> {
public:
TrexStatelessRxStartQueue(uint8_t port_id, RXQueueInfo &rx_queue_info) {
m_port_id = port_id;
@@ -468,6 +517,7 @@ private:
uint64_t *m_shared_counter;
};
+
class TrexStatelessRxStopQueue : public TrexStatelessCpToRxMsgBase {
public:
TrexStatelessRxStopQueue(uint8_t port_id) {
@@ -481,50 +531,11 @@ private:
};
-template<typename T> class TrexStatelessMsgReply {
-public:
- TrexStatelessMsgReply() {
- m_pending = true;
- }
-
- bool is_pending() const {
- return m_pending;
- }
-
- void set(T reply) {
- m_reply = reply;
-
- /* before marking as done - memory fence */
- asm volatile("mfence" ::: "memory");
- m_pending = false;
- }
-
- T wait_for_reply(int timeout_ms = 100, int backoff_ms = 1) {
- int guard = timeout_ms;
-
- while (is_pending()) {
- guard -= backoff_ms;
- if (guard < 0) {
- throw TrexException("timeout: RX core has failed to reply");
- }
-
- delay(backoff_ms);
-
- }
- return m_reply;
-
- }
-private:
- bool m_pending;
- T m_reply;
-};
-
-
-class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgBase {
+class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgReply<RXPacketBuffer *> {
public:
- TrexStatelessRxQueueGetPkts(uint8_t port_id, TrexStatelessMsgReply<RXPacketBuffer *> &reply) : m_reply(reply) {
+ TrexStatelessRxQueueGetPkts(uint8_t port_id) {
m_port_id = port_id;
}
@@ -535,8 +546,7 @@ public:
virtual bool handle(CRxCoreStateless *rx_core);
private:
- uint8_t m_port_id;
- TrexStatelessMsgReply<RXPacketBuffer*> &m_reply;
+ uint8_t m_port_id;
};
diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp
index 2a678365..a1ff9c6a 100644
--- a/src/stateless/rx/trex_stateless_rx_core.cpp
+++ b/src/stateless/rx/trex_stateless_rx_core.cpp
@@ -82,7 +82,7 @@ void CRxCoreStateless::create(const CRxSlCfg &cfg) {
m_cpu_cp_u.Create(&m_cpu_dp_u);
- /* init per port manager */
+ /* create per port manager */
for (int i = 0; i < m_max_ports; i++) {
m_rx_port_mngr[i].create(cfg.m_ports[i],
m_rfc2544,
@@ -171,9 +171,20 @@ void CRxCoreStateless::idle_state_loop() {
}
}
+/**
+ * for each port give a tick (for flushing if needed)
+ *
+ */
+void CRxCoreStateless::port_manager_tick() {
+ for (int i = 0; i < m_max_ports; i++) {
+ m_rx_port_mngr[i].tick();
+ }
+}
+
void CRxCoreStateless::handle_work_stage(bool do_try_rx_queue) {
int i = 0;
-
+ int j = 0;
+
while (m_state == STATE_WORKING) {
if (do_try_rx_queue) {
@@ -182,12 +193,19 @@ void CRxCoreStateless::handle_work_stage(bool do_try_rx_queue) {
process_all_pending_pkts();
+ /* TODO: with scheduler, this should be solved better */
i++;
if (i == 100000) { // approx 10msec
i = 0;
periodic_check_for_cp_messages(); // m_state might change in here
+
+ j++;
+ if (j == 100) { // approx 1 sec
+ j = 0;
+ port_manager_tick();
+ }
}
-
+
rte_pause();
}
}
@@ -356,13 +374,13 @@ double CRxCoreStateless::get_cpu_util() {
void
-CRxCoreStateless::start_capture(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter) {
- m_rx_port_mngr[port_id].start_capture(pcap_filename, limit, shared_counter);
+CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter) {
+ m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit, shared_counter);
}
void
-CRxCoreStateless::stop_capture(uint8_t port_id) {
- m_rx_port_mngr[port_id].stop_capture();
+CRxCoreStateless::stop_recorder(uint8_t port_id) {
+ m_rx_port_mngr[port_id].stop_recorder();
}
void
diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h
index 9df36310..8e50a46e 100644
--- a/src/stateless/rx/trex_stateless_rx_core.h
+++ b/src/stateless/rx/trex_stateless_rx_core.h
@@ -124,8 +124,8 @@ class CRxCoreStateless {
* @param pcap_filename
* @param limit
*/
- void start_capture(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter);
- void stop_capture(uint8_t port_id);
+ void start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit, uint64_t *shared_counter);
+ void stop_recorder(uint8_t port_id);
/**
* start RX queueing of packets
@@ -153,7 +153,8 @@ class CRxCoreStateless {
void capture_pkt(rte_mbuf_t *m);
void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r);
void handle_work_stage(bool do_try_rx_queue);
-
+ void port_manager_tick();
+
int process_all_pending_pkts(bool flush_rx = false);
void flush_all_pending_pkts() {
diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
index 46fec432..78f4ac5c 100644
--- a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
+++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
@@ -200,7 +200,7 @@ RXPacketBuffer::freeze_and_clone() {
}
void
-RXPacketBuffer::handle_pkt(const rte_mbuf_t *m) {
+RXPacketBuffer::push(const rte_mbuf_t *m) {
assert(m_is_enabled);
/* if full - pop the oldest */
@@ -242,7 +242,48 @@ RXPacketBuffer::to_json() const {
return output;
}
-/****************************** packet recorder ****************************/
+
+void
+RXQueue::start(uint64_t size, uint64_t *shared_counter) {
+ if (m_pkt_buffer) {
+ delete m_pkt_buffer;
+ }
+ m_pkt_buffer = new RXPacketBuffer(size, shared_counter);
+}
+
+void
+RXQueue::stop() {
+ if (m_pkt_buffer) {
+ delete m_pkt_buffer;
+ m_pkt_buffer = NULL;
+ }
+}
+
+RXPacketBuffer *
+RXQueue::fetch() {
+
+ if (!m_pkt_buffer) {
+ return nullptr;
+ }
+
+ /* hold a pointer to the old one */
+ RXPacketBuffer *old_buffer = m_pkt_buffer;
+
+ /* replace the old one with a new one and freeze the old */
+ m_pkt_buffer = old_buffer->freeze_and_clone();
+
+ return old_buffer;
+}
+
+void
+RXQueue::handle_pkt(const rte_mbuf_t *m) {
+ m_pkt_buffer->push(m);
+}
+
+/**************************************
+ * RX feature recorder
+ *
+ *************************************/
RXPacketRecorder::RXPacketRecorder() {
m_writer = NULL;
@@ -251,10 +292,6 @@ RXPacketRecorder::RXPacketRecorder() {
m_epoch = -1;
}
-RXPacketRecorder::~RXPacketRecorder() {
- stop();
-}
-
void
RXPacketRecorder::start(const std::string &pcap, uint64_t limit, uint64_t *shared_counter) {
m_writer = CCapWriterFactory::CreateWriter(LIBPCAP, (char *)pcap.c_str());
@@ -279,6 +316,13 @@ RXPacketRecorder::stop() {
}
void
+RXPacketRecorder::flush_to_disk() {
+ if (m_writer) {
+ m_writer->flush_to_disk();
+ }
+}
+
+void
RXPacketRecorder::handle_pkt(const rte_mbuf_t *m) {
if (!m_writer) {
return;
@@ -310,6 +354,30 @@ RXPacketRecorder::handle_pkt(const rte_mbuf_t *m) {
}
+/**************************************
+ * Port manager
+ *
+ *************************************/
+
+RXPortManager::RXPortManager() {
+ clear_all_features();
+ m_io = NULL;
+ m_cpu_dp_u = NULL;
+}
+
+
+void
+RXPortManager::create(CPortLatencyHWBase *io,
+ CRFC2544Info *rfc2544,
+ CRxCoreErrCntrs *err_cntrs,
+ CCpuUtlDp *cpu_util) {
+ m_io = io;
+ m_cpu_dp_u = cpu_util;
+
+ /* init features */
+ m_latency.create(rfc2544, err_cntrs);
+}
+
void RXPortManager::handle_pkt(const rte_mbuf_t *m) {
/* handle features */
@@ -318,12 +386,12 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) {
m_latency.handle_pkt(m);
}
- if (is_feature_set(CAPTURE)) {
+ if (is_feature_set(RECORDER)) {
m_recorder.handle_pkt(m);
}
if (is_feature_set(QUEUE)) {
- m_pkt_buffer->handle_pkt(m);
+ m_queue.handle_pkt(m);
}
}
@@ -358,3 +426,9 @@ int RXPortManager::process_all_pending_pkts(bool flush_rx) {
return cnt_p;
}
+void
+RXPortManager::tick() {
+ if (is_feature_set(RECORDER)) {
+ m_recorder.flush_to_disk();
+ }
+}
diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h
index 9df42039..564b15d4 100644
--- a/src/stateless/rx/trex_stateless_rx_port_mngr.h
+++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h
@@ -127,10 +127,10 @@ public:
~RXPacketBuffer();
/**
- * handle a new packet
+ * push a packet to the buffer
*
*/
- void handle_pkt(const rte_mbuf_t *m);
+ void push(const rte_mbuf_t *m);
/**
* freezes the queue and clones a new one
@@ -170,6 +170,40 @@ private:
};
+class RXQueue {
+public:
+ RXQueue() {
+ m_pkt_buffer = nullptr;
+ }
+
+ ~RXQueue() {
+ stop();
+ }
+
+ /**
+ * start RX queue
+ *
+ */
+ void start(uint64_t size, uint64_t *shared_counter);
+
+ /**
+ * fetch the current buffer
+ *
+ */
+ RXPacketBuffer * fetch();
+
+ /**
+ * stop RX queue
+ *
+ */
+ void stop();
+
+ void handle_pkt(const rte_mbuf_t *m);
+
+private:
+ RXPacketBuffer *m_pkt_buffer;
+};
+
/**************************************
* RX feature PCAP recorder
*
@@ -178,12 +212,21 @@ private:
class RXPacketRecorder {
public:
RXPacketRecorder();
- ~RXPacketRecorder();
+
+ ~RXPacketRecorder() {
+ stop();
+ }
void start(const std::string &pcap, uint64_t limit, uint64_t *shared_counter);
void stop();
void handle_pkt(const rte_mbuf_t *m);
+ /**
+ * flush any cached packets to disk
+ *
+ */
+ void flush_to_disk();
+
private:
CFileWriterBase *m_writer;
CCapPktRaw m_pkt;
@@ -202,27 +245,19 @@ private:
*/
class RXPortManager {
public:
- enum features_t {
- LATENCY = 0x1,
- CAPTURE = 0x2,
- QUEUE = 0x4
+ enum feature_t {
+ NO_FEATURES = 0x0,
+ LATENCY = 0x1,
+ RECORDER = 0x2,
+ QUEUE = 0x4
};
- RXPortManager() {
- m_features = 0;
- m_pkt_buffer = NULL;
- m_io = NULL;
- m_cpu_dp_u = NULL;
- }
+ RXPortManager();
void create(CPortLatencyHWBase *io,
CRFC2544Info *rfc2544,
CRxCoreErrCntrs *err_cntrs,
- CCpuUtlDp *cpu_util) {
- m_io = io;
- m_cpu_dp_u = cpu_util;
- m_latency.create(rfc2544, err_cntrs);
- }
+ CCpuUtlDp *cpu_util);
void clear_stats() {
m_latency.reset_stats();
@@ -232,6 +267,7 @@ public:
return m_latency;
}
+ /* latency */
void enable_latency() {
set_feature(LATENCY);
}
@@ -240,60 +276,37 @@ public:
unset_feature(LATENCY);
}
- /**
- * capturing of RX packets
- *
- * @author imarom (11/2/2016)
- *
- * @param pcap
- * @param limit_pkts
- */
- void start_capture(const std::string &pcap, uint64_t limit_pkts, uint64_t *shared_counter) {
+ /* recorder */
+ void start_recorder(const std::string &pcap, uint64_t limit_pkts, uint64_t *shared_counter) {
m_recorder.start(pcap, limit_pkts, shared_counter);
- set_feature(CAPTURE);
+ set_feature(RECORDER);
}
- void stop_capture() {
+ void stop_recorder() {
m_recorder.stop();
- unset_feature(CAPTURE);
+ unset_feature(RECORDER);
}
- /**
- * queueing packets
- *
- */
+ /* queue */
void start_queue(uint32_t size, uint64_t *shared_counter) {
- if (m_pkt_buffer) {
- delete m_pkt_buffer;
- }
- m_pkt_buffer = new RXPacketBuffer(size, shared_counter);
+ m_queue.start(size, shared_counter);
set_feature(QUEUE);
}
void stop_queue() {
- if (m_pkt_buffer) {
- delete m_pkt_buffer;
- m_pkt_buffer = NULL;
- }
- unset_feature(QUEUE);
+ m_queue.stop();
+ unset_feature(QUEUE);
}
RXPacketBuffer *get_pkt_buffer() {
if (!is_feature_set(QUEUE)) {
- return NULL;
+ return nullptr;
}
-
- assert(m_pkt_buffer);
-
- /* hold a pointer to the old one */
- RXPacketBuffer *old_buffer = m_pkt_buffer;
-
- /* replace the old one with a new one and freeze the old */
- m_pkt_buffer = old_buffer->freeze_and_clone();
-
- return old_buffer;
+
+ return m_queue.fetch();
}
+
/**
* fetch and process all packets
@@ -317,9 +330,15 @@ public:
*/
void handle_pkt(const rte_mbuf_t *m);
-
+ /**
+ * maintenance
+ *
+ * @author imarom (11/24/2016)
+ */
+ void tick();
+
bool has_features_set() {
- return (m_features != 0);
+ return (m_features != NO_FEATURES);
}
@@ -329,23 +348,29 @@ public:
private:
+ void clear_all_features() {
+ m_features = NO_FEATURES;
+ }
- void set_feature(features_t feature) {
+ void set_feature(feature_t feature) {
m_features |= feature;
}
- void unset_feature(features_t feature) {
+ void unset_feature(feature_t feature) {
m_features &= (~feature);
}
- bool is_feature_set(features_t feature) {
+ bool is_feature_set(feature_t feature) const {
return ( (m_features & feature) == feature );
}
uint32_t m_features;
- RXPacketRecorder m_recorder;
+
RXLatency m_latency;
- RXPacketBuffer *m_pkt_buffer;
+ RXPacketRecorder m_recorder;
+ RXQueue m_queue;
+
+
CCpuUtlDp *m_cpu_dp_u;
CPortLatencyHWBase *m_io;
};