summaryrefslogtreecommitdiffstats
path: root/src/stateless
diff options
context:
space:
mode:
authorimarom <imarom@cisco.com>2017-01-04 18:46:45 +0200
committerimarom <imarom@cisco.com>2017-01-04 18:46:45 +0200
commit5257dbb8253fe5b70b75f9c064c4593ca7aee99f (patch)
treef7fa69be359ce165ae992ba0021fd15ca471b818 /src/stateless
parentea10422c22479c8e498d8efb5cb19882e70db9ff (diff)
draft - unreviewed
Signed-off-by: imarom <imarom@cisco.com>
Diffstat (limited to 'src/stateless')
-rw-r--r--src/stateless/cp/trex_stateless.cpp83
-rw-r--r--src/stateless/cp/trex_stateless.h30
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp28
-rw-r--r--src/stateless/cp/trex_stateless_port.h95
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp12
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h32
-rw-r--r--src/stateless/rx/trex_stateless_capture.cpp28
-rw-r--r--src/stateless/rx/trex_stateless_capture.h53
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.cpp65
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.h17
-rw-r--r--src/stateless/rx/trex_stateless_rx_defs.h2
-rw-r--r--src/stateless/rx/trex_stateless_rx_port_mngr.cpp227
-rw-r--r--src/stateless/rx/trex_stateless_rx_port_mngr.h149
13 files changed, 287 insertions, 534 deletions
diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp
index c31ba0a5..32babbf7 100644
--- a/src/stateless/cp/trex_stateless.cpp
+++ b/src/stateless/cp/trex_stateless.cpp
@@ -18,13 +18,15 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
-#include <trex_stateless.h>
-#include <trex_stateless_port.h>
-#include <sched.h>
+//#include <sched.h>
#include <iostream>
#include <unistd.h>
+#include "trex_stateless.h"
+#include "trex_stateless_port.h"
+#include "trex_stateless_messaging.h"
+
using namespace std;
/***********************************************************
@@ -140,54 +142,35 @@ TrexStateless::get_dp_core_count() {
return m_platform_api->get_dp_core_count();
}
-void
-TrexStateless::encode_stats(Json::Value &global) {
-
- TrexPlatformGlobalStats stats;
- m_platform_api->get_global_stats(stats);
-
- global["cpu_util"] = stats.m_stats.m_cpu_util;
- global["rx_cpu_util"] = stats.m_stats.m_rx_cpu_util;
-
- global["tx_bps"] = stats.m_stats.m_tx_bps;
- global["rx_bps"] = stats.m_stats.m_rx_bps;
-
- global["tx_pps"] = stats.m_stats.m_tx_pps;
- global["rx_pps"] = stats.m_stats.m_rx_pps;
-
- global["total_tx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_tx_pkts);
- global["total_rx_pkts"] = Json::Value::UInt64(stats.m_stats.m_total_rx_pkts);
-
- global["total_tx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_tx_bytes);
- global["total_rx_bytes"] = Json::Value::UInt64(stats.m_stats.m_total_rx_bytes);
-
- global["tx_rx_errors"] = Json::Value::UInt64(stats.m_stats.m_tx_rx_errors);
-
- for (uint8_t i = 0; i < m_port_count; i++) {
- std::stringstream ss;
-
- ss << "port " << i;
- Json::Value &port_section = global[ss.str()];
-
- m_ports[i]->encode_stats(port_section);
- }
+capture_id_t
+TrexStateless::start_capture(const CaptureFilter &filter, uint64_t limit) {
+ static MsgReply<capture_id_t> reply;
+
+ reply.reset();
+
+ CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
+ TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(filter, limit, reply);
+
+ ring->Enqueue((CGenNode *)msg);
+
+ capture_id_t new_id = reply.wait_for_reply();
+
+ return (new_id);
}
-/**
- * generate a snapshot for publish (async publish)
- *
- */
-void
-TrexStateless::generate_publish_snapshot(std::string &snapshot) {
- Json::FastWriter writer;
- Json::Value root;
-
- root["name"] = "trex-stateless-info";
- root["type"] = 0;
-
- /* stateless specific info goes here */
- root["data"] = Json::nullValue;
-
- snapshot = writer.write(root);
+capture_id_t
+TrexStateless::stop_capture(capture_id_t capture_id) {
+ static MsgReply<capture_id_t> reply;
+
+ reply.reset();
+
+ CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
+ TrexStatelessRxStopCapture *msg = new TrexStatelessRxStopCapture(capture_id, reply);
+
+ ring->Enqueue((CGenNode *)msg);
+
+ capture_id_t rc = reply.wait_for_reply();
+
+ return (rc);
}
diff --git a/src/stateless/cp/trex_stateless.h b/src/stateless/cp/trex_stateless.h
index 3a1a2c24..33f16ce9 100644
--- a/src/stateless/cp/trex_stateless.h
+++ b/src/stateless/cp/trex_stateless.h
@@ -102,6 +102,7 @@ public:
* defines the TRex stateless operation mode
*
*/
+class CaptureFilter;
class TrexStateless {
public:
@@ -132,32 +133,21 @@ public:
/**
- * shutdown the server
- */
- void shutdown();
-
- /**
- * fetch xstats names (keys of dict)
- *
- */
- void encode_xstats_names(Json::Value &global);
-
- /**
- * fetch xstats values
- *
+ * starts a capture on a 'filter' of ports
+ * with a limit of packets
*/
- void encode_xstats_values(Json::Value &global);
-
+ capture_id_t start_capture(const CaptureFilter &filter, uint64_t limit);
+
/**
- * fetch all the stats
+ * stops an active capture
*
*/
- void encode_stats(Json::Value &global);
-
+ capture_id_t stop_capture(capture_id_t capture_id);
+
/**
- * generate a snapshot for publish
+ * shutdown the server
*/
- void generate_publish_snapshot(std::string &snapshot);
+ void shutdown();
const TrexPlatformApi * get_platform_api() {
return (m_platform_api);
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index 7d331c6e..9cf048b0 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -162,7 +162,7 @@ private:
* trex stateless port
*
**************************/
-TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this) {
+TrexStatelessPort::TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api) : m_dp_events(this), m_service_mode(port_id, api) {
std::vector<std::pair<uint8_t, uint8_t>> core_pair_list;
m_port_id = port_id;
@@ -948,24 +948,6 @@ TrexStatelessPort::remove_and_delete_all_streams() {
}
}
-void
-TrexStatelessPort::start_rx_capture(const std::string &pcap_filename, uint64_t limit) {
- static MsgReply<bool> reply;
-
- reply.reset();
-
- TrexStatelessRxStartCapture *msg = new TrexStatelessRxStartCapture(m_port_id, pcap_filename, limit, reply);
- send_message_to_rx((TrexStatelessCpToRxMsgBase *)msg);
-
- /* as below, must wait for ACK from RX core before returning ACK */
- reply.wait_for_reply();
-}
-
-void
-TrexStatelessPort::stop_rx_capture() {
- TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopCapture(m_port_id);
- send_message_to_rx(msg);
-}
void
TrexStatelessPort::start_rx_queue(uint64_t size) {
@@ -980,18 +962,22 @@ TrexStatelessPort::start_rx_queue(uint64_t size) {
this might cause the user to lose some packets from the queue
*/
reply.wait_for_reply();
+
+ m_service_mode.set_rx_queue();
}
void
TrexStatelessPort::stop_rx_queue() {
TrexStatelessCpToRxMsgBase *msg = new TrexStatelessRxStopQueue(m_port_id);
send_message_to_rx(msg);
+
+ m_service_mode.unset_rx_queue();
}
-const RXPacketBuffer *
+const TrexPktBuffer *
TrexStatelessPort::get_rx_queue_pkts() {
- static MsgReply<const RXPacketBuffer *> reply;
+ static MsgReply<const TrexPktBuffer *> reply;
reply.reset();
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index d4ac4018..2cc1b9ca 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -26,12 +26,14 @@ limitations under the License.
#include "trex_dp_port_events.h"
#include "trex_stateless_rx_defs.h"
#include "trex_stream.h"
+#include "trex_exception.h"
+#include "trex_stateless_capture.h"
class TrexStatelessCpToDpMsgBase;
class TrexStatelessCpToRxMsgBase;
class TrexStreamsGraphObj;
class TrexPortMultiplier;
-class RXPacketBuffer;
+class TrexPktBuffer;
/**
@@ -113,6 +115,56 @@ private:
static const std::string g_unowned_handler;
};
+/**
+ * enforces in/out from service mode
+ *
+ * @author imarom (1/4/2017)
+ */
+class TrexServiceMode {
+public:
+ TrexServiceMode(uint8_t port_id, const TrexPlatformApi *api) {
+ m_is_enabled = false;
+ m_has_rx_queue = false;
+ m_port_id = port_id;
+ m_port_attr = api->getPortAttrObj(port_id);
+ }
+
+ void enable() {
+ m_port_attr->set_rx_filter_mode(RX_FILTER_MODE_ALL);
+ m_is_enabled = true;
+ }
+
+ void disable() {
+ if (m_has_rx_queue) {
+ throw TrexException("unable to disable service mode - please remove RX queue");
+ }
+
+ if (TrexStatelessCaptureMngr::getInstance().is_active(m_port_id)) {
+ throw TrexException("unable to disable service - an active capture on port " + std::to_string(m_port_id) + " exists");
+ }
+
+ m_port_attr->set_rx_filter_mode(RX_FILTER_MODE_HW);
+ m_is_enabled = false;
+ }
+
+ bool is_enabled() const {
+ return m_is_enabled;
+ }
+
+ void set_rx_queue() {
+ m_has_rx_queue = true;
+ }
+
+ void unset_rx_queue() {
+ m_has_rx_queue = false;
+ }
+
+private:
+ bool m_is_enabled;
+ bool m_has_rx_queue;
+ TRexPortAttr *m_port_attr;
+ uint8_t m_port_id;
+};
class AsyncStopEvent;
@@ -150,7 +202,15 @@ public:
RC_ERR_FAILED_TO_COMPILE_STREAMS
};
-
+ /**
+ * port capture mode
+ */
+ enum capture_mode_e {
+ PORT_CAPTURE_NONE = 0,
+ PORT_CAPTURE_RX,
+ PORT_CAPTURE_ALL
+ };
+
TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api);
~TrexStatelessPort();
@@ -227,6 +287,20 @@ public:
double duration,
bool is_dual);
+ /**
+ * moves port to / out service mode
+ */
+ void set_service_mode(bool enabled) {
+ if (enabled) {
+ m_service_mode.enable();
+ } else {
+ m_service_mode.disable();
+ }
+ }
+ bool is_service_mode_on() const {
+ return m_service_mode.is_enabled();
+ }
+
/**
* get the port state
*
@@ -367,16 +441,16 @@ public:
/**
- * enable RX capture on port
+ * starts capturing packets
*
*/
- void start_rx_capture(const std::string &pcap_filename, uint64_t limit);
+ void start_capture(capture_mode_e mode, uint64_t limit);
/**
- * disable RX capture if on
+ * stops capturing packets
*
*/
- void stop_rx_capture();
+ void stop_capture();
/**
* start RX queueing of packets
@@ -398,7 +472,7 @@ public:
* fetch the RX queue packets from the queue
*
*/
- const RXPacketBuffer *get_rx_queue_pkts();
+ const TrexPktBuffer *get_rx_queue_pkts();
/**
* configures port for L2 mode
@@ -429,7 +503,9 @@ public:
}
private:
-
+ void set_service_mode_on();
+ void set_service_mode_off();
+
bool is_core_active(int core_id);
const std::vector<uint8_t> get_core_id_list () {
@@ -514,6 +590,9 @@ private:
TrexPortOwner m_owner;
int m_pending_async_stop_event;
+
+ TrexServiceMode m_service_mode;
+
static const uint32_t MAX_STREAMS = 20000;
};
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index aeb1e677..f441c692 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -263,18 +263,20 @@ bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) {
bool
TrexStatelessRxStartCapture::handle(CRxCoreStateless *rx_core) {
- rx_core->start_recorder(m_port_id, m_pcap_filename, m_limit);
+ capture_id_t capture_id = rx_core->start_capture(m_limit, m_filter);
/* mark as done */
- m_reply.set_reply(true);
+ m_reply.set_reply(capture_id);
return true;
}
bool
TrexStatelessRxStopCapture::handle(CRxCoreStateless *rx_core) {
- rx_core->stop_recorder(m_port_id);
-
+ capture_id_t rc = rx_core->stop_capture(m_capture_id);
+
+ m_reply.set_reply(rc);
+
return true;
}
@@ -299,7 +301,7 @@ TrexStatelessRxStopQueue::handle(CRxCoreStateless *rx_core) {
bool
TrexStatelessRxQueueGetPkts::handle(CRxCoreStateless *rx_core) {
- const RXPacketBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id);
+ const TrexPktBuffer *pkt_buffer = rx_core->get_rx_queue_pkts(m_port_id);
/* set the reply */
m_reply.set_reply(pkt_buffer);
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index 72b92d11..5f4978f5 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -28,12 +28,13 @@ limitations under the License.
#include "trex_stateless_rx_defs.h"
#include "os_time.h"
#include "utl_ip.h"
+#include "trex_stateless_capture.h"
class TrexStatelessDpCore;
class CRxCoreStateless;
class TrexStreamsCompiledObj;
class CFlowGenListPerThread;
-class RXPacketBuffer;
+class TrexPktBuffer;
/**
* Generic message reply object
@@ -487,36 +488,35 @@ class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase {
class TrexStatelessRxStartCapture : public TrexStatelessCpToRxMsgBase {
public:
- TrexStatelessRxStartCapture(uint8_t port_id,
- const std::string &pcap_filename,
+ TrexStatelessRxStartCapture(const CaptureFilter& filter,
uint64_t limit,
- MsgReply<bool> &reply) : m_reply(reply) {
+ MsgReply<capture_id_t> &reply) : m_reply(reply) {
- m_port_id = port_id;
- m_limit = limit;
- m_pcap_filename = pcap_filename;
+ m_limit = limit;
+ m_filter = filter;
}
virtual bool handle(CRxCoreStateless *rx_core);
private:
- uint8_t m_port_id;
- std::string m_pcap_filename;
- uint64_t m_limit;
- MsgReply<bool> &m_reply;
+ uint8_t m_port_id;
+ uint64_t m_limit;
+ CaptureFilter m_filter;
+ MsgReply<capture_id_t> &m_reply;
};
class TrexStatelessRxStopCapture : public TrexStatelessCpToRxMsgBase {
public:
- TrexStatelessRxStopCapture(uint8_t port_id) {
- m_port_id = port_id;
+ TrexStatelessRxStopCapture(capture_id_t capture_id, MsgReply<capture_id_t> &reply) : m_reply(reply) {
+ m_capture_id = capture_id;
}
virtual bool handle(CRxCoreStateless *rx_core);
private:
- uint8_t m_port_id;
+ capture_id_t m_capture_id;
+ MsgReply<capture_id_t> &m_reply;
};
@@ -556,7 +556,7 @@ private:
class TrexStatelessRxQueueGetPkts : public TrexStatelessCpToRxMsgBase {
public:
- TrexStatelessRxQueueGetPkts(uint8_t port_id, MsgReply<const RXPacketBuffer *> &reply) : m_reply(reply) {
+ TrexStatelessRxQueueGetPkts(uint8_t port_id, MsgReply<const TrexPktBuffer *> &reply) : m_reply(reply) {
m_port_id = port_id;
}
@@ -568,7 +568,7 @@ public:
private:
uint8_t m_port_id;
- MsgReply<const RXPacketBuffer *> &m_reply;
+ MsgReply<const TrexPktBuffer *> &m_reply;
};
diff --git a/src/stateless/rx/trex_stateless_capture.cpp b/src/stateless/rx/trex_stateless_capture.cpp
index 83bb2d38..4ed126cc 100644
--- a/src/stateless/rx/trex_stateless_capture.cpp
+++ b/src/stateless/rx/trex_stateless_capture.cpp
@@ -53,22 +53,36 @@ TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) {
m_pkt_buffer->push(m);
}
+void
+TrexStatelessCaptureMngr::update_global_filter() {
+ CaptureFilter new_filter;
+
+ for (TrexStatelessCapture *capture : m_captures) {
+ new_filter += capture->get_filter();
+ }
+
+ m_global_filter = new_filter;
+}
+
capture_id_t
TrexStatelessCaptureMngr::add(uint64_t limit, const CaptureFilter &filter) {
if (m_captures.size() > MAX_CAPTURE_SIZE) {
- throw TrexException(TrexException::T_CAPTURE_MAX_INSTANCES);
+ return CAPTURE_TOO_MANY_CAPTURES;
}
int new_id = m_id_counter++;
TrexStatelessCapture *new_buffer = new TrexStatelessCapture(new_id, limit, filter);
m_captures.push_back(new_buffer);
+
+ /* update global filter */
+ update_global_filter();
return new_id;
}
-void
+capture_id_t
TrexStatelessCaptureMngr::remove(capture_id_t id) {
int index = -1;
@@ -81,12 +95,18 @@ TrexStatelessCaptureMngr::remove(capture_id_t id) {
/* does not exist */
if (index == -1) {
- throw TrexException(TrexException::T_CAPTURE_INVALID_ID);
+ return CAPTURE_ID_NOT_FOUND;
}
TrexStatelessCapture *capture = m_captures[index];
m_captures.erase(m_captures.begin() + index);
- delete capture;
+
+ delete capture;
+
+ /* update global filter */
+ update_global_filter();
+
+ return id;
}
void
diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h
index f7cd451f..4d0b6a78 100644
--- a/src/stateless/rx/trex_stateless_capture.h
+++ b/src/stateless/rx/trex_stateless_capture.h
@@ -31,16 +31,16 @@ limitations under the License.
class CaptureFilter {
public:
CaptureFilter() {
- tx_active = 0;
- rx_active = 0;
+ m_tx_active = 0;
+ m_rx_active = 0;
}
void add_tx(uint8_t port_id) {
- tx_active |= (1LL << port_id);
+ m_tx_active |= (1LL << port_id);
}
void add_rx(uint8_t port_id) {
- rx_active |= (1LL << port_id);
+ m_rx_active |= (1LL << port_id);
}
void add(uint8_t port_id) {
@@ -63,21 +63,36 @@ public:
bool in_rx(uint8_t port_id) const {
uint64_t bit = (1LL << port_id);
- return ((rx_active & bit) == bit);
+ return ((m_rx_active & bit) == bit);
}
bool in_tx(uint8_t port_id) const {
uint64_t bit = (1LL << port_id);
- return ((tx_active & bit) == bit);
+ return ((m_tx_active & bit) == bit);
+ }
+
+ bool in_any(uint8_t port_id) const {
+ return ( in_tx(port_id) || in_rx(port_id) );
+ }
+
+ CaptureFilter& operator +=(const CaptureFilter &other) {
+ m_tx_active |= other.m_tx_active;
+ m_rx_active |= other.m_rx_active;
+
+ return *this;
}
private:
- uint64_t tx_active;
- uint64_t rx_active;
+ uint64_t m_tx_active;
+ uint64_t m_rx_active;
};
-typedef uint64_t capture_id_t;
+typedef int64_t capture_id_t;
+enum {
+ CAPTURE_ID_NOT_FOUND = -1,
+ CAPTURE_TOO_MANY_CAPTURES = -2,
+};
class TrexStatelessCapture {
public:
@@ -93,6 +108,10 @@ public:
return m_id;
}
+ const CaptureFilter & get_filter() const {
+ return m_filter;
+ }
+
private:
TrexPktBuffer *m_pkt_buffer;
CaptureFilter m_filter;
@@ -122,9 +141,11 @@ public:
/**
- * stops capture mode
+ * stops capture mode
+ * on success, will return the ID of the removed one
+ * o.w it will be an error
*/
- void remove(capture_id_t id);
+ capture_id_t remove(capture_id_t id);
/**
* removes all captures
@@ -139,8 +160,8 @@ public:
*
* @return bool
*/
- bool is_active() const {
- return (m_captures.size() != 0);
+ bool is_active(uint8_t port) const {
+ return m_global_filter.in_any(port);
}
/**
@@ -153,7 +174,7 @@ public:
*/
void handle_pkt_rx(const rte_mbuf_t *m, int port) {
/* fast path */
- if (!is_active()) {
+ if (!is_active(port)) {
return;
}
@@ -169,11 +190,15 @@ private:
}
void handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port);
+ void update_global_filter();
std::vector<TrexStatelessCapture *> m_captures;
capture_id_t m_id_counter;
+ /* a union of all the filters curently active */
+ CaptureFilter m_global_filter;
+
static const int MAX_CAPTURE_SIZE = 10;
};
diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp
index d27485de..f1ba303a 100644
--- a/src/stateless/rx/trex_stateless_rx_core.cpp
+++ b/src/stateless/rx/trex_stateless_rx_core.cpp
@@ -69,11 +69,11 @@ void CRFC2544Info::export_data(rfc2544_info_t_ &obj) {
void CRxCoreStateless::create(const CRxSlCfg &cfg) {
m_capture = false;
m_max_ports = cfg.m_max_ports;
-
+ m_tx_cores = cfg.m_tx_cores;
+
CMessagingManager * cp_rx = CMsgIns::Ins()->getCpRx();
m_ring_from_cp = cp_rx->getRingCpToDp(0);
- m_ring_to_cp = cp_rx->getRingDpToCp(0);
m_state = STATE_IDLE;
for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) {
@@ -130,6 +130,36 @@ bool CRxCoreStateless::periodic_check_for_cp_messages() {
}
+void
+CRxCoreStateless::periodic_check_for_dp_messages() {
+
+ for (int i = 0; i < m_tx_cores; i++) {
+ periodic_check_for_dp_messages_core(i);
+ }
+
+}
+
+void
+CRxCoreStateless::periodic_check_for_dp_messages_core(uint32_t core_id) {
+
+ CNodeRing *ring = CMsgIns::Ins()->getRxDp()->getRingDpToCp(core_id);
+
+ /* fast path */
+ if ( likely ( ring->isEmpty() ) ) {
+ return;
+ }
+
+ while (true) {
+ CGenNode *node = NULL;
+
+ if (ring->Dequeue(node) != 0) {
+ break;
+ }
+
+ //assert(node);
+ }
+}
+
void CRxCoreStateless::recalculate_next_state() {
if (m_state == STATE_QUIT) {
return;
@@ -176,16 +206,6 @@ void CRxCoreStateless::idle_state_loop() {
}
/**
- * for each port give a tick (for flushing if needed)
- *
- */
-void CRxCoreStateless::port_manager_tick() {
- for (int i = 0; i < m_max_ports; i++) {
- m_rx_port_mngr[i].tick();
- }
-}
-
-/**
* for each port handle the grat ARP mechansim
*
*/
@@ -199,7 +219,6 @@ void CRxCoreStateless::handle_work_stage() {
/* set the next sync time to */
dsec_t sync_time_sec = now_sec() + (1.0 / 1000);
- dsec_t tick_time_sec = now_sec() + 1.0;
dsec_t grat_arp_sec = now_sec() + (double)CGlobalInfo::m_options.m_arp_ref_per;
while (m_state == STATE_WORKING) {
@@ -211,14 +230,10 @@ void CRxCoreStateless::handle_work_stage() {
if ( (now - sync_time_sec) > 0 ) {
periodic_check_for_cp_messages();
+ //periodic_check_for_dp_messages();
sync_time_sec = now + (1.0 / 1000);
}
- if ( (now - tick_time_sec) > 0) {
- port_manager_tick();
- tick_time_sec = now + 1.0;
- }
-
if ( (now - grat_arp_sec) > 0) {
handle_grat_arp();
grat_arp_sec = now + (double)CGlobalInfo::m_options.m_arp_ref_per;
@@ -317,16 +332,14 @@ double CRxCoreStateless::get_cpu_util() {
}
-void
-CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit) {
- m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit);
- recalculate_next_state();
+capture_id_t
+CRxCoreStateless::start_capture(uint64_t limit, const CaptureFilter &filter) {
+ return TrexStatelessCaptureMngr::getInstance().add(limit, filter);
}
-void
-CRxCoreStateless::stop_recorder(uint8_t port_id) {
- m_rx_port_mngr[port_id].stop_recorder();
- recalculate_next_state();
+capture_id_t
+CRxCoreStateless::stop_capture(capture_id_t capture_id) {
+ return TrexStatelessCaptureMngr::getInstance().remove(capture_id);
}
void
diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h
index 4eed59a1..21ed51ba 100644
--- a/src/stateless/rx/trex_stateless_rx_core.h
+++ b/src/stateless/rx/trex_stateless_rx_core.h
@@ -27,6 +27,7 @@
#include "pal/linux/sanb_atomic.h"
#include "utl_cpuu.h"
#include "trex_stateless_rx_port_mngr.h"
+#include "trex_stateless_capture.h"
class TrexStatelessCpToRxMsgBase;
@@ -127,16 +128,16 @@ class CRxCoreStateless {
double get_cpu_util();
void update_cpu_util();
- const RXPacketBuffer *get_rx_queue_pkts(uint8_t port_id) {
+ const TrexPktBuffer *get_rx_queue_pkts(uint8_t port_id) {
return m_rx_port_mngr[port_id].get_pkt_buffer();
}
/**
- * start capturing of RX packets on a specific port
+ * start capturing packets
*
*/
- void start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit);
- void stop_recorder(uint8_t port_id);
+ capture_id_t start_capture(uint64_t limit, const CaptureFilter &filter);
+ capture_id_t stop_capture(capture_id_t capture_id);
/**
* start RX queueing of packets
@@ -162,7 +163,12 @@ class CRxCoreStateless {
private:
void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg);
+
bool periodic_check_for_cp_messages();
+
+ void periodic_check_for_dp_messages();
+ void periodic_check_for_dp_messages_core(uint32_t core_id);
+
void tickle();
void idle_state_loop();
@@ -172,7 +178,6 @@ class CRxCoreStateless {
void capture_pkt(rte_mbuf_t *m);
void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r);
void handle_work_stage();
- void port_manager_tick();
void handle_grat_arp();
int process_all_pending_pkts(bool flush_rx = false);
@@ -186,10 +191,10 @@ class CRxCoreStateless {
private:
TrexMonitor m_monitor;
uint32_t m_max_ports;
+ uint32_t m_tx_cores;
bool m_capture;
state_e m_state;
CNodeRing *m_ring_from_cp;
- CNodeRing *m_ring_to_cp;
CCpuUtlDp m_cpu_dp_u;
CCpuUtlCp m_cpu_cp_u;
diff --git a/src/stateless/rx/trex_stateless_rx_defs.h b/src/stateless/rx/trex_stateless_rx_defs.h
index aefcc133..367cf4e3 100644
--- a/src/stateless/rx/trex_stateless_rx_defs.h
+++ b/src/stateless/rx/trex_stateless_rx_defs.h
@@ -38,10 +38,12 @@ class CRxSlCfg {
m_max_ports = 0;
m_cps = 0.0;
m_num_crc_fix_bytes = 0;
+ m_tx_cores = 0;
}
public:
uint32_t m_max_ports;
+ uint32_t m_tx_cores;
double m_cps;
CPortLatencyHWBase * m_ports[TREX_MAX_PORTS];
uint8_t m_num_crc_fix_bytes;
diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
index e16b3d0c..d2e0b4e8 100644
--- a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
+++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp
@@ -20,48 +20,10 @@
*/
#include "bp_sim.h"
#include "trex_stateless_rx_port_mngr.h"
-#include "common/captureFile.h"
#include "trex_stateless_rx_core.h"
#include "common/Network/Packet/Arp.h"
#include "pkt_gen.h"
-
-/**
- * copy MBUF to a flat buffer
- *
- * @author imarom (12/20/2016)
- *
- * @param dest - buffer with at least rte_pktmbuf_pkt_len(m)
- * bytes
- * @param m - MBUF to copy
- *
- * @return uint8_t*
- */
-void copy_mbuf(uint8_t *dest, const rte_mbuf_t *m) {
-
- int index = 0;
- for (const rte_mbuf_t *it = m; it != NULL; it = it->next) {
- const uint8_t *src = rte_pktmbuf_mtod(it, const uint8_t *);
- memcpy(dest + index, src, it->data_len);
- index += it->data_len;
- }
-}
-
-/**************************************
- * RX packet
- *
- *************************************/
-RXPacket::RXPacket(const rte_mbuf_t *m) {
-
- /* allocate buffer */
- m_size = m->pkt_len;
- m_raw = new uint8_t[m_size];
-
- /* copy data */
- copy_mbuf(m_raw, m);
-
- /* generate a packet timestamp */
- m_timestamp = now_sec();
-}
+#include "trex_stateless_capture.h"
/**************************************
* latency RX feature
@@ -231,79 +193,12 @@ RXLatency::to_json() const {
*
*************************************/
-RXPacketBuffer::RXPacketBuffer(uint64_t size) {
- m_buffer = nullptr;
- m_head = 0;
- m_tail = 0;
- m_size = (size + 1); // for the empty/full difference 1 slot reserved
-
- /* generate queue */
- m_buffer = new RXPacket*[m_size](); // zeroed
-}
-
-RXPacketBuffer::~RXPacketBuffer() {
- assert(m_buffer);
-
- while (!is_empty()) {
- RXPacket *pkt = pop();
- delete pkt;
- }
- delete [] m_buffer;
-}
-
-void
-RXPacketBuffer::push(const rte_mbuf_t *m) {
- /* if full - pop the oldest */
- if (is_full()) {
- delete pop();
- }
-
- /* push packet */
- m_buffer[m_head] = new RXPacket(m);
- m_head = next(m_head);
-}
-
-RXPacket *
-RXPacketBuffer::pop() {
- assert(!is_empty());
-
- RXPacket *pkt = m_buffer[m_tail];
- m_tail = next(m_tail);
-
- return pkt;
-}
-
-uint64_t
-RXPacketBuffer::get_element_count() const {
- if (m_head >= m_tail) {
- return (m_head - m_tail);
- } else {
- return ( get_capacity() - (m_tail - m_head - 1) );
- }
-}
-
-Json::Value
-RXPacketBuffer::to_json() const {
-
- Json::Value output = Json::arrayValue;
-
- int tmp = m_tail;
- while (tmp != m_head) {
- RXPacket *pkt = m_buffer[tmp];
- output.append(pkt->to_json());
- tmp = next(tmp);
- }
-
- return output;
-}
-
-
void
RXQueue::start(uint64_t size) {
if (m_pkt_buffer) {
delete m_pkt_buffer;
}
- m_pkt_buffer = new RXPacketBuffer(size);
+ m_pkt_buffer = new TrexPktBuffer(size, TrexPktBuffer::MODE_DROP_HEAD);
}
void
@@ -314,7 +209,7 @@ RXQueue::stop() {
}
}
-const RXPacketBuffer *
+const TrexPktBuffer *
RXQueue::fetch() {
/* if no buffer or the buffer is empty - give a NULL one */
@@ -323,10 +218,10 @@ RXQueue::fetch() {
}
/* hold a pointer to the old one */
- RXPacketBuffer *old_buffer = m_pkt_buffer;
+ TrexPktBuffer *old_buffer = m_pkt_buffer;
/* replace the old one with a new one and freeze the old */
- m_pkt_buffer = new RXPacketBuffer(old_buffer->get_capacity());
+ m_pkt_buffer = new TrexPktBuffer(old_buffer->get_capacity(), old_buffer->get_mode());
return old_buffer;
}
@@ -348,97 +243,6 @@ RXQueue::to_json() const {
return output;
}
-/**************************************
- * RX feature recorder
- *
- *************************************/
-
-RXPacketRecorder::RXPacketRecorder() {
- m_writer = NULL;
- m_count = 0;
- m_limit = 0;
- m_epoch = -1;
-
- m_pending_flush = false;
-}
-
-void
-RXPacketRecorder::start(const std::string &pcap, uint64_t limit) {
- m_writer = CCapWriterFactory::CreateWriter(LIBPCAP, (char *)pcap.c_str());
- if (m_writer == NULL) {
- std::stringstream ss;
- ss << "unable to create PCAP file: " << pcap;
- throw TrexException(ss.str());
- }
-
- assert(limit > 0);
-
- m_limit = limit;
- m_count = 0;
- m_pending_flush = false;
- m_pcap_filename = pcap;
-}
-
-void
-RXPacketRecorder::stop() {
- if (!m_writer) {
- return;
- }
-
- delete m_writer;
- m_writer = NULL;
-}
-
-void
-RXPacketRecorder::flush_to_disk() {
-
- if (m_writer && m_pending_flush) {
- m_writer->flush_to_disk();
- m_pending_flush = false;
- }
-}
-
-void
-RXPacketRecorder::handle_pkt(const rte_mbuf_t *m) {
- if (!m_writer) {
- return;
- }
-
- dsec_t now = now_sec();
- if (m_epoch < 0) {
- m_epoch = now;
- }
-
- dsec_t dt = now - m_epoch;
-
- CPktNsecTimeStamp t_c(dt);
- m_pkt.time_nsec = t_c.m_time_nsec;
- m_pkt.time_sec = t_c.m_time_sec;
-
- copy_mbuf((uint8_t *)m_pkt.raw, m);
- m_pkt.pkt_len = m->pkt_len;
-
- m_writer->write_packet(&m_pkt);
- m_count++;
- m_pending_flush = true;
-
- if (m_count == m_limit) {
- stop();
- }
-
-}
-
-Json::Value
-RXPacketRecorder::to_json() const {
- Json::Value output = Json::objectValue;
-
- output["pcap_filename"] = m_pcap_filename;
- output["limit"] = Json::UInt64(m_limit);
- output["count"] = Json::UInt64(m_count);
-
- return output;
-}
-
/**************************************
* RX feature server (ARP, ICMP) and etc.
@@ -786,10 +590,6 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) {
m_latency.handle_pkt(m);
}
- if (is_feature_set(RECORDER)) {
- m_recorder.handle_pkt(m);
- }
-
if (is_feature_set(QUEUE)) {
m_queue.handle_pkt(m);
}
@@ -797,6 +597,9 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) {
if (is_feature_set(SERVER)) {
m_server.handle_pkt(m);
}
+
+ /* capture */
+ TrexStatelessCaptureMngr::getInstance().handle_pkt_rx(m, m_port_id);
}
int RXPortManager::process_all_pending_pkts(bool flush_rx) {
@@ -835,13 +638,6 @@ int RXPortManager::process_all_pending_pkts(bool flush_rx) {
}
void
-RXPortManager::tick() {
- if (is_feature_set(RECORDER)) {
- m_recorder.flush_to_disk();
- }
-}
-
-void
RXPortManager::send_next_grat_arp() {
if (is_feature_set(GRAT_ARP)) {
m_grat_arp.send_next_grat_arp();
@@ -887,13 +683,6 @@ RXPortManager::to_json() const {
output["latency"]["is_active"] = false;
}
- if (is_feature_set(RECORDER)) {
- output["sniffer"] = m_recorder.to_json();
- output["sniffer"]["is_active"] = true;
- } else {
- output["sniffer"]["is_active"] = false;
- }
-
if (is_feature_set(QUEUE)) {
output["queue"] = m_queue.to_json();
output["queue"]["is_active"] = true;
diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h
index 6efdae64..0cc60716 100644
--- a/src/stateless/rx/trex_stateless_rx_port_mngr.h
+++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h
@@ -25,8 +25,7 @@
#include <stdint.h>
#include "common/base64.h"
-#include "common/captureFile.h"
-
+#include "trex_stateless_pkt.h"
class CPortLatencyHWBase;
class CRFC2544Info;
@@ -80,97 +79,12 @@ public:
CRxCoreErrCntrs *m_err_cntrs;
};
-/**
- * describes a single saved RX packet
- *
- */
-class RXPacket {
-public:
-
- RXPacket(const rte_mbuf_t *m);
-
- /* slow path and also RVO - pass by value is ok */
- Json::Value to_json() {
- Json::Value output;
- output["ts"] = m_timestamp;
- output["binary"] = base64_encode(m_raw, m_size);
- return output;
- }
-
- ~RXPacket() {
- if (m_raw) {
- delete [] m_raw;
- }
- }
-
-private:
-
- uint8_t *m_raw;
- uint16_t m_size;
- dsec_t m_timestamp;
-};
-
/**************************************
* RX feature queue
*
*************************************/
-class RXPacketBuffer {
-public:
-
- RXPacketBuffer(uint64_t size);
- ~RXPacketBuffer();
-
- /**
- * push a packet to the buffer
- *
- */
- void push(const rte_mbuf_t *m);
-
- /**
- * generate a JSON output of the queue
- *
- */
- Json::Value to_json() const;
-
-
- bool is_empty() const {
- return (m_head == m_tail);
- }
-
- bool is_full() const {
- return ( next(m_head) == m_tail);
- }
-
- /**
- * return the total amount of space possible
- */
- uint64_t get_capacity() const {
- /* one slot is used for diff between full/empty */
- return (m_size - 1);
- }
-
- /**
- * returns how many elements are in the queue
- */
- uint64_t get_element_count() const;
-
-private:
- int next(int v) const {
- return ( (v + 1) % m_size );
- }
-
- /* pop in case of full queue - internal usage */
- RXPacket * pop();
-
- int m_head;
- int m_tail;
- int m_size;
- RXPacket **m_buffer;
-};
-
-
class RXQueue {
public:
RXQueue() {
@@ -191,7 +105,7 @@ public:
* fetch the current buffer
* return NULL if no packets
*/
- const RXPacketBuffer * fetch();
+ const TrexPktBuffer * fetch();
/**
* stop RX queue
@@ -204,42 +118,7 @@ public:
Json::Value to_json() const;
private:
- RXPacketBuffer *m_pkt_buffer;
-};
-
-/**************************************
- * RX feature PCAP recorder
- *
- *************************************/
-
-class RXPacketRecorder {
-public:
- RXPacketRecorder();
-
- ~RXPacketRecorder() {
- stop();
- }
-
- void start(const std::string &pcap, uint64_t limit);
- void stop();
- void handle_pkt(const rte_mbuf_t *m);
-
- /**
- * flush any cached packets to disk
- *
- */
- void flush_to_disk();
-
- Json::Value to_json() const;
-
-private:
- CFileWriterBase *m_writer;
- std::string m_pcap_filename;
- CCapPktRaw m_pkt;
- dsec_t m_epoch;
- uint64_t m_limit;
- uint64_t m_count;
- bool m_pending_flush;
+ TrexPktBuffer *m_pkt_buffer;
};
@@ -311,7 +190,6 @@ public:
enum feature_t {
NO_FEATURES = 0x0,
LATENCY = 0x1,
- RECORDER = 0x2,
QUEUE = 0x4,
SERVER = 0x8,
GRAT_ARP = 0x10,
@@ -354,17 +232,6 @@ public:
unset_feature(LATENCY);
}
- /* recorder */
- void start_recorder(const std::string &pcap, uint64_t limit_pkts) {
- m_recorder.start(pcap, limit_pkts);
- set_feature(RECORDER);
- }
-
- void stop_recorder() {
- m_recorder.stop();
- unset_feature(RECORDER);
- }
-
/* queue */
void start_queue(uint32_t size) {
m_queue.start(size);
@@ -376,7 +243,7 @@ public:
unset_feature(QUEUE);
}
- const RXPacketBuffer *get_pkt_buffer() {
+ const TrexPktBuffer *get_pkt_buffer() {
if (!is_feature_set(QUEUE)) {
return nullptr;
}
@@ -415,13 +282,6 @@ public:
void handle_pkt(const rte_mbuf_t *m);
/**
- * maintenance
- *
- * @author imarom (11/24/2016)
- */
- void tick();
-
- /**
* send next grat arp (if on)
*
* @author imarom (12/13/2016)
@@ -482,7 +342,6 @@ private:
uint32_t m_features;
uint8_t m_port_id;
RXLatency m_latency;
- RXPacketRecorder m_recorder;
RXQueue m_queue;
RXServer m_server;
RXGratARP m_grat_arp;