summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xlinux/ws_main.py1
-rwxr-xr-xlinux_dpdk/ws_main.py1
-rw-r--r--src/flow_stat.cpp32
-rw-r--r--src/flow_stat.h8
-rw-r--r--src/main_dpdk.cpp8
-rwxr-xr-xsrc/msg_manager.cpp17
-rwxr-xr-xsrc/msg_manager.h26
-rw-r--r--src/stateless/cp/trex_stateless_port.cpp7
-rw-r--r--src/stateless/cp/trex_stateless_port.h100
-rw-r--r--src/stateless/dp/trex_stateless_dp_core.cpp16
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp42
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h76
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.cpp112
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.h18
14 files changed, 320 insertions, 144 deletions
diff --git a/linux/ws_main.py b/linux/ws_main.py
index 9422a8ff..58f5b661 100755
--- a/linux/ws_main.py
+++ b/linux/ws_main.py
@@ -258,6 +258,7 @@ includes_path =''' ../src/pal/linux/
../src/rpc-server/
../src/stateless/cp/
../src/stateless/dp/
+ ../src/stateless/rx/
../src/stateless/messaging/
../external_libs/json/
../external_libs/zmq/include/
diff --git a/linux_dpdk/ws_main.py b/linux_dpdk/ws_main.py
index faaca0de..2aa06e3b 100755
--- a/linux_dpdk/ws_main.py
+++ b/linux_dpdk/ws_main.py
@@ -424,6 +424,7 @@ includes_path =''' ../src/pal/linux_dpdk/
../src/rpc-server/
../src/stateless/cp/
../src/stateless/dp/
+ ../src/stateless/rx/
../src/stateless/messaging/
../external_libs/yaml-cpp/include/
diff --git a/src/flow_stat.cpp b/src/flow_stat.cpp
index 01038292..d44a91da 100644
--- a/src/flow_stat.cpp
+++ b/src/flow_stat.cpp
@@ -25,6 +25,7 @@
#include <os_time.h>
#include "internal_api/trex_platform_api.h"
#include "trex_stateless.h"
+#include "trex_stateless_messaging.h"
#include "trex_stream.h"
#include "flow_stat_parser.h"
#include "flow_stat.h"
@@ -385,6 +386,8 @@ void CFlowStatHwIdMap::unmap(uint16_t hw_id) {
CFlowStatRuleMgr::CFlowStatRuleMgr() {
m_api = NULL;
m_max_hw_id = -1;
+ m_num_started_streams = 0;
+ m_ring_to_rx = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
}
std::ostream& operator<<(std::ostream& os, const CFlowStatRuleMgr& cf) {
@@ -488,6 +491,12 @@ int CFlowStatRuleMgr::del_stream(const TrexStream * stream) {
return 0;
}
+ if (m_user_id_map.is_started(stream->m_rx_check.m_pg_id)) {
+ std::cerr << "Error: Trying to delete flow statistics stream " << stream->m_rx_check.m_pg_id
+ << " which is not stopped." << std::endl;
+ return -1;
+ }
+
return m_user_id_map.del_stream(stream->m_rx_check.m_pg_id);
}
@@ -556,6 +565,10 @@ int CFlowStatRuleMgr::start_stream(TrexStream * stream, uint16_t &ret_hw_id) {
std::cout << "exit:" << __METHOD_NAME__ << " hw_id:" << ret_hw_id << std::endl;
#endif
+ if (m_num_started_streams == 0) {
+ send_start_stop_msg_to_rx(true); // First transmitting stream. Rx core should start reading packets;
+ }
+ m_num_started_streams++;
return 0;
}
@@ -605,6 +618,11 @@ int CFlowStatRuleMgr::stop_stream(const TrexStream * stream) {
m_hw_id_map.unmap(hw_id);
}
}
+ m_num_started_streams--;
+ assert (m_num_started_streams >= 0);
+ if (m_num_started_streams == 0) {
+ send_start_stop_msg_to_rx(false); // No more transmittig streams. Rx core shoulde get into idle loop.
+ }
return 0;
}
@@ -618,6 +636,18 @@ int CFlowStatRuleMgr::get_active_pgids(flow_stat_active_t &result) {
return 0;
}
+extern bool rx_should_stop;
+void CFlowStatRuleMgr::send_start_stop_msg_to_rx(bool is_start) {
+ TrexStatelessCpToRxMsgBase *msg;
+
+ if (is_start) {
+ msg = new TrexRxStartMsg();
+ } else {
+ msg = new TrexRxStopMsg();
+ }
+ m_ring_to_rx->Enqueue((CGenNode *)msg);
+}
+
// return false if no counters changed since last run. true otherwise
bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) {
rx_per_flow_t rx_stats[MAX_FLOW_STATS];
@@ -627,7 +657,7 @@ bool CFlowStatRuleMgr::dump_json(std::string & json, bool baseline) {
root["name"] = "flow_stats";
root["type"] = 0;
-
+
if (baseline) {
root["baseline"] = true;
}
diff --git a/src/flow_stat.h b/src/flow_stat.h
index 0fb4fede..83f076de 100644
--- a/src/flow_stat.h
+++ b/src/flow_stat.h
@@ -26,6 +26,7 @@
#include <map>
#include "trex_defs.h"
#include "trex_stream.h"
+#include "msg_manager.h"
#include <internal_api/trex_platform_api.h>
// range reserved for rx stat measurement is from IP_ID_RESERVE_BASE to 0xffff
@@ -144,8 +145,8 @@ class CFlowStatUserIdInfo {
tx_per_flow_t m_tx_counter_base[TREX_MAX_PORTS];
uint16_t m_hw_id; // Associated hw id. UINT16_MAX if no associated hw id.
uint8_t m_proto; // protocol (UDP, TCP, other), associated with this user id.
- uint8_t m_ref_count; // How many streams with this ref count exists
- uint8_t m_trans_ref_count; // How many streams with this ref count currently transmit
+ uint8_t m_ref_count; // How many streams with this user id exists
+ uint8_t m_trans_ref_count; // How many streams with this user id currently transmit
bool m_was_sent; // Did we send this info to clients once?
};
@@ -208,6 +209,7 @@ class CFlowStatRuleMgr {
private:
int compile_stream(const TrexStream * stream, Cxl710Parser &parser);
int add_hw_rule(uint16_t hw_id, uint8_t proto);
+ void send_start_stop_msg_to_rx(bool is_start);
private:
CFlowStatHwIdMap m_hw_id_map; // map hw ids to user ids
@@ -215,6 +217,8 @@ class CFlowStatRuleMgr {
uint8_t m_num_ports; // How many ports are being used
const TrexPlatformApi *m_api;
int m_max_hw_id; // max hw id we ever used
+ uint32_t m_num_started_streams; // How many started (transmitting) streams we have
+ CNodeRing *m_ring_to_rx; // handle for sending messages to Rx core
};
#endif
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index 4fc048ff..9e690951 100644
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -2873,14 +2873,14 @@ void CGlobalTRex::rx_sl_configure(void) {
if ( get_vm_one_queue_enable() ) {
#if 0
- ???
+ /// what to do here ???
/* vm mode, indirect queues */
for (i=0; i < m_max_ports; i++) {
CMessagingManager * rx_dp = CMsgIns::Ins()->getRxDp();
uint8_t thread_id = (i >> 1);
- CNodeRing * r = rx_dp->getRingCpToDp(thread_id); ///??? should be rx to dp?
- m_latency_vm_vports[i].Create((uint8_t)i,r,&m_mg);
- rx_sl_cfg.m_ports[i] =&m_latency_vm_vports[i];
+ CNodeRing * r = rx_dp->getRingCpToDp(thread_id);
+ m_latency_vm_vports[i].Create((uint8_t)i, r, &m_mg);
+ rx_sl_cfg.m_ports[i] = &m_latency_vm_vports[i];
}
#endif
} else {
diff --git a/src/msg_manager.cpp b/src/msg_manager.cpp
index 9ade1bfc..7e39391a 100755
--- a/src/msg_manager.cpp
+++ b/src/msg_manager.cpp
@@ -4,7 +4,7 @@
*/
/*
-Copyright (c) 2015-2015 Cisco Systems, Inc.
+Copyright (c) 2015-2016 Cisco Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -65,12 +65,12 @@ void CMessagingManager::Delete(){
delete [] m_dp_to_cp;
m_dp_to_cp = NULL;
}
-
+
if (m_cp_to_dp) {
delete [] m_cp_to_dp;
m_cp_to_dp = NULL;
}
-
+
}
CNodeRing * CMessagingManager::getRingCpToDp(uint8_t thread_id){
@@ -84,7 +84,6 @@ CNodeRing * CMessagingManager::getRingDpToCp(uint8_t thread_id){
}
-
void CMsgIns::Free(){
if (m_ins) {
m_ins->Delete();
@@ -107,6 +106,11 @@ bool CMsgIns::Create(uint8_t num_threads){
if (!res) {
return (res);
}
+ res = m_cp_rx.Create(1, "cp_rx");
+ if (!res) {
+ return (res);
+ }
+
return (m_rx_dp.Create(num_threads,"rx_dp"));
}
@@ -114,9 +118,8 @@ bool CMsgIns::Create(uint8_t num_threads){
void CMsgIns::Delete(){
m_cp_dp.Delete();
m_rx_dp.Delete();
+ m_cp_rx.Delete();
}
-CMsgIns * CMsgIns::m_ins=0;
-
-
+CMsgIns * CMsgIns::m_ins=0;
diff --git a/src/msg_manager.h b/src/msg_manager.h
index 0390ce10..de11edbd 100755
--- a/src/msg_manager.h
+++ b/src/msg_manager.h
@@ -6,7 +6,7 @@
*/
/*
-Copyright (c) 2015-2015 Cisco Systems, Inc.
+Copyright (c) 2015-2016 Cisco Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -40,37 +40,37 @@ public:
/*
-e.g DP with 4 threads
-will look like this
+e.g DP with 4 threads
+will look like this
- cp_to_dp
+ cp_to_dp
master :push
dpx : pop
-
+
- --> dp0
cp - --> dp1
- --> dp2
- --> dp3
- dp_to_cp
+ dp_to_cp
cp : pop
dpx : push
-
+
<- -- dp0
cp <- -- dp1
<- -- dp2
<- -- dp3
-*/
+*/
class CGenNode ;
typedef CTRingSp<CGenNode> CNodeRing;
-/* CP == latency thread
+/* CP == latency thread
DP == traffic pkt generator */
class CMessagingManager {
public:
@@ -83,6 +83,7 @@ public:
void Delete();
CNodeRing * getRingCpToDp(uint8_t thread_id);
CNodeRing * getRingDpToCp(uint8_t thread_id);
+ CNodeRing * getRingCpToRx();
uint8_t get_num_threads(){
return (m_num_dp_threads);
}
@@ -106,6 +107,9 @@ public:
CMessagingManager * getCpDp(){
return (&m_cp_dp);
}
+ CMessagingManager * getCpRx(){
+ return (&m_cp_rx);
+ }
uint8_t get_num_threads(){
return (m_rx_dp.get_num_threads());
@@ -114,11 +118,11 @@ public:
private:
CMessagingManager m_rx_dp;
CMessagingManager m_cp_dp;
-
+ CMessagingManager m_cp_rx;
private:
/* one instance */
- static CMsgIns * m_ins;
+ static CMsgIns * m_ins;
};
#endif
diff --git a/src/stateless/cp/trex_stateless_port.cpp b/src/stateless/cp/trex_stateless_port.cpp
index 5947aaf7..90589d7a 100644
--- a/src/stateless/cp/trex_stateless_port.cpp
+++ b/src/stateless/cp/trex_stateless_port.cpp
@@ -473,6 +473,13 @@ TrexStatelessPort::send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBas
ring->Enqueue((CGenNode *)msg);
}
+void
+TrexStatelessPort::send_message_to_rx(TrexStatelessCpToRxMsgBase *msg) {
+
+ /* send the message to the core */
+ CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
+ ring->Enqueue((CGenNode *)msg);
+}
uint64_t
TrexStatelessPort::get_port_speed_bps() const {
diff --git a/src/stateless/cp/trex_stateless_port.h b/src/stateless/cp/trex_stateless_port.h
index d3c4dcb9..7e1838d4 100644
--- a/src/stateless/cp/trex_stateless_port.h
+++ b/src/stateless/cp/trex_stateless_port.h
@@ -4,7 +4,7 @@
*/
/*
-Copyright (c) 2015-2015 Cisco Systems, Inc.
+Copyright (c) 2015-2016 Cisco Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -21,20 +21,21 @@ limitations under the License.
#ifndef __TREX_STATELESS_PORT_H__
#define __TREX_STATELESS_PORT_H__
-#include <trex_stream.h>
-#include <trex_dp_port_events.h>
-#include <internal_api/trex_platform_api.h>
+#include "internal_api/trex_platform_api.h"
+#include "trex_dp_port_events.h"
+#include "trex_stream.h"
class TrexStatelessCpToDpMsgBase;
+class TrexStatelessCpToRxMsgBase;
class TrexStreamsGraphObj;
class TrexPortMultiplier;
-/**
+/**
* TRex port owner can perform
* write commands
* while port is owned - others can
* do read only commands
- *
+ *
*/
class TrexPortOwner {
public:
@@ -92,7 +93,7 @@ private:
/* handler genereated internally */
std::string m_handler;
-
+
/* seed for generating random values */
unsigned int m_seed;
@@ -106,7 +107,7 @@ class AsyncStopEvent;
/**
* describes a stateless port
- *
+ *
* @author imarom (31-Aug-15)
*/
class TrexStatelessPort {
@@ -137,9 +138,9 @@ public:
RC_ERR_FAILED_TO_COMPILE_STREAMS
};
-
+
TrexStatelessPort(uint8_t port_id, const TrexPlatformApi *api);
-
+
~TrexStatelessPort();
/**
@@ -155,11 +156,11 @@ public:
void release(void);
/**
- * validate the state of the port before start
- * it will return a stream graph
- * containing information about the streams
- * configured on this port
- *
+ * validate the state of the port before start
+ * it will return a stream graph
+ * containing information about the streams
+ * configured on this port
+ *
* on error it throws TrexException
*/
const TrexStreamsGraphObj *validate(void);
@@ -190,13 +191,13 @@ public:
/**
* update current traffic on port
- *
+ *
*/
void update_traffic(const TrexPortMultiplier &mul, bool force);
/**
* get the port state
- *
+ *
*/
port_state_e get_state() const {
return m_port_state;
@@ -204,23 +205,23 @@ public:
/**
* port state as string
- *
+ *
*/
std::string get_state_as_string() const;
/**
* the the max stream id currently assigned
- *
+ *
*/
int get_max_stream_id() const;
/**
* fill up properties of the port
- *
+ *
* @author imarom (16-Sep-15)
- *
- * @param driver
- * @param speed
+ *
+ * @param driver
+ * @param speed
*/
void get_properties(std::string &driver, TrexPlatformApi::driver_speed_e &speed);
@@ -237,7 +238,7 @@ public:
/**
* delegators
- *
+ *
*/
void add_stream(TrexStream *stream);
@@ -267,7 +268,7 @@ public:
/**
* returns the number of DP cores linked to this port
- *
+ *
*/
uint8_t get_dp_core_count() {
return m_cores_id_list.size();
@@ -275,7 +276,7 @@ public:
/**
* returns the traffic multiplier currently being used by the DP
- *
+ *
*/
double get_multiplier() {
return (m_factor);
@@ -283,13 +284,13 @@ public:
/**
* get port speed in bits per second
- *
+ *
*/
uint64_t get_port_speed_bps() const;
/**
* return RX caps
- *
+ *
*/
int get_rx_caps() const {
return m_rx_caps;
@@ -300,12 +301,12 @@ public:
}
/**
- * return true if port adds CRC to a packet (not occurs for
- * VNICs)
- *
+ * return true if port adds CRC to a packet (not occurs for
+ * VNICs)
+ *
* @author imarom (24-Feb-16)
- *
- * @return bool
+ *
+ * @return bool
*/
bool has_crc_added() const {
return m_api_info.has_crc;
@@ -318,9 +319,9 @@ public:
/**
* get the port effective rate (on a started / paused port)
- *
+ *
* @author imarom (07-Jan-16)
- *
+ *
*/
void get_port_effective_rate(double &pps,
double &bps_L1,
@@ -330,8 +331,8 @@ public:
/**
* set port promiscuous on/off
- *
- * @param enabled
+ *
+ * @param enabled
*/
void set_promiscuous(bool enabled);
bool get_promiscuous();
@@ -357,40 +358,45 @@ private:
/**
* send message to all cores using duplicate
- *
+ *
*/
void send_message_to_all_dp(TrexStatelessCpToDpMsgBase *msg);
/**
* send message to specific DP core
- *
+ *
*/
void send_message_to_dp(uint8_t core_id, TrexStatelessCpToDpMsgBase *msg);
+ /**
+ * send message to specific RX core
+ *
+ */
+ void send_message_to_rx(TrexStatelessCpToRxMsgBase *msg);
/**
* when a port stops, perform various actions
- *
+ *
*/
void common_port_stop_actions(bool async);
/**
* calculate effective M per core
- *
+ *
*/
double calculate_effective_factor(const TrexPortMultiplier &mul, bool force = false);
double calculate_effective_factor_internal(const TrexPortMultiplier &mul);
-
+
/**
* generates a graph of streams graph
- *
+ *
*/
void generate_streams_graph();
/**
* dispose of it
- *
+ *
* @author imarom (26-Nov-15)
*/
void delete_streams_graph();
@@ -426,7 +432,7 @@ private:
/**
* port multiplier object
- *
+ *
*/
class TrexPortMultiplier {
public:
@@ -443,8 +449,8 @@ public:
};
/**
- * multiplier can be absolute value
- * increment value or subtract value
+ * multiplier can be absolute value
+ * increment value or subtract value
*/
enum mul_op_e {
OP_ABS,
diff --git a/src/stateless/dp/trex_stateless_dp_core.cpp b/src/stateless/dp/trex_stateless_dp_core.cpp
index f8d6d828..ba25f61d 100644
--- a/src/stateless/dp/trex_stateless_dp_core.cpp
+++ b/src/stateless/dp/trex_stateless_dp_core.cpp
@@ -5,7 +5,7 @@
*/
/*
-Copyright (c) 2015-2015 Cisco Systems, Inc.
+Copyright (c) 2015-2016 Cisco Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -19,14 +19,12 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
-#include <trex_stateless_dp_core.h>
-#include <trex_stateless_messaging.h>
-#include <trex_streams_compiler.h>
-#include <trex_stream_node.h>
-#include <trex_stream.h>
-
-#include <bp_sim.h>
-
+#include "bp_sim.h"
+#include "trex_stateless_dp_core.h"
+#include "trex_stateless_messaging.h"
+#include "trex_stream.h"
+#include "trex_stream_node.h"
+#include "trex_streams_compiler.h"
void CDpOneStream::Delete(CFlowGenListPerThread * core){
assert(m_node->get_state() == CGenNodeStateless::ss_INACTIVE);
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index 333aec88..3468d622 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -5,7 +5,7 @@
*/
/*
-Copyright (c) 2015-2015 Cisco Systems, Inc.
+Copyright (c) 2015-2016 Cisco Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -19,17 +19,18 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
-#include <trex_stateless_messaging.h>
-#include <trex_stateless_dp_core.h>
-#include <trex_streams_compiler.h>
-#include <trex_stateless.h>
-#include <bp_sim.h>
-
#include <string.h>
+#include "trex_stateless_messaging.h"
+#include "trex_stateless_dp_core.h"
+#include "trex_stateless_rx_core.h"
+#include "trex_streams_compiler.h"
+#include "trex_stateless.h"
+#include "bp_sim.h"
+
/*************************
start traffic message
- ************************/
+ ************************/
TrexStatelessDpStart::TrexStatelessDpStart(uint8_t port_id, int event_id, TrexStreamsCompiledObj *obj, double duration) {
m_port_id = port_id;
m_event_id = event_id;
@@ -40,7 +41,7 @@ TrexStatelessDpStart::TrexStatelessDpStart(uint8_t port_id, int event_id, TrexSt
/**
* clone for DP start message
- *
+ *
*/
TrexStatelessCpToDpMsgBase *
TrexStatelessDpStart::clone() {
@@ -69,7 +70,7 @@ TrexStatelessDpStart::handle(TrexStatelessDpCore *dp_core) {
/*************************
stop traffic message
- ************************/
+ ************************/
bool
TrexStatelessDpStop::handle(TrexStatelessDpCore *dp_core) {
@@ -114,7 +115,7 @@ bool TrexStatelessDpResume::handle(TrexStatelessDpCore *dp_core){
/**
* clone for DP stop message
- *
+ *
*/
TrexStatelessCpToDpMsgBase *
TrexStatelessDpStop::clone() {
@@ -130,7 +131,7 @@ TrexStatelessDpStop::clone() {
-TrexStatelessCpToDpMsgBase *
+TrexStatelessCpToDpMsgBase *
TrexStatelessDpQuit::clone(){
TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpQuit();
@@ -140,7 +141,7 @@ TrexStatelessDpQuit::clone(){
bool TrexStatelessDpQuit::handle(TrexStatelessDpCore *dp_core){
-
+
/* quit */
dp_core->quit_main_loop();
return (true);
@@ -155,7 +156,7 @@ bool TrexStatelessDpCanQuit::handle(TrexStatelessDpCore *dp_core){
return (true);
}
-TrexStatelessCpToDpMsgBase *
+TrexStatelessCpToDpMsgBase *
TrexStatelessDpCanQuit::clone(){
TrexStatelessCpToDpMsgBase *new_msg = new TrexStatelessDpCanQuit();
@@ -165,7 +166,7 @@ TrexStatelessDpCanQuit::clone(){
/*************************
update traffic message
- ************************/
+ ************************/
bool
TrexStatelessDpUpdate::handle(TrexStatelessDpCore *dp_core) {
dp_core->update_traffic(m_port_id, m_factor);
@@ -207,3 +208,14 @@ TrexDpPortEventMsg::handle() {
return (true);
}
+/************************* messages from CP to RX **********************/
+bool TrexRxStartMsg::handle (CRxCoreStateless *rx_core) {
+ rx_core->work();
+ return true;
+}
+
+/************************* messages from CP to RX **********************/
+bool TrexRxStopMsg::handle (CRxCoreStateless *rx_core) {
+ rx_core->idle();
+ return true;
+}
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index dda086b7..b7e8fd3f 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -5,7 +5,7 @@
*/
/*
-Copyright (c) 2015-2015 Cisco Systems, Inc.
+Copyright (c) 2015-2016 Cisco Systems, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@@ -22,16 +22,17 @@ limitations under the License.
#ifndef __TREX_STATELESS_MESSAGING_H__
#define __TREX_STATELESS_MESSAGING_H__
-#include <msg_manager.h>
-#include <trex_dp_port_events.h>
+#include "msg_manager.h"
+#include "trex_dp_port_events.h"
class TrexStatelessDpCore;
+class CRxCoreStateless;
class TrexStreamsCompiledObj;
class CFlowGenListPerThread;
/**
* defines the base class for CP to DP messages
- *
+ *
* @author imarom (27-Oct-15)
*/
class TrexStatelessCpToDpMsgBase {
@@ -49,7 +50,7 @@ public:
/**
* clone the current message
- *
+ *
*/
virtual TrexStatelessCpToDpMsgBase * clone() = 0;
@@ -76,7 +77,7 @@ protected:
/**
* a message to start traffic
- *
+ *
* @author imarom (27-Oct-15)
*/
class TrexStatelessDpStart : public TrexStatelessCpToDpMsgBase {
@@ -137,7 +138,7 @@ private:
/**
* a message to stop traffic
- *
+ *
* @author imarom (27-Oct-15)
*/
class TrexStatelessDpStop : public TrexStatelessCpToDpMsgBase {
@@ -191,9 +192,9 @@ private:
};
/**
- * a message to Quit the datapath traffic. support only stateless for now
- *
- * @author hhaim
+ * a message to Quit the datapath traffic. support only stateless for now
+ *
+ * @author hhaim
*/
class TrexStatelessDpQuit : public TrexStatelessCpToDpMsgBase {
public:
@@ -209,9 +210,9 @@ public:
};
/**
- * a message to check if both port are idel and exit
- *
- * @author hhaim
+ * a message to check if both port are idel and exit
+ *
+ * @author hhaim
*/
class TrexStatelessDpCanQuit : public TrexStatelessCpToDpMsgBase {
public:
@@ -247,7 +248,7 @@ private:
/**
* barrier message for DP core
- *
+ *
*/
class TrexStatelessDpBarrier : public TrexStatelessCpToDpMsgBase {
public:
@@ -270,7 +271,7 @@ private:
/**
* defines the base class for CP to DP messages
- *
+ *
* @author imarom (27-Oct-15)
*/
class TrexStatelessDpToCpMsgBase {
@@ -284,7 +285,7 @@ public:
/**
* virtual function to handle a message
- *
+ *
*/
virtual bool handle() = 0;
@@ -295,9 +296,9 @@ public:
/**
- * a message indicating an event has happened on a port at the
- * DP
- *
+ * a message indicating an event has happened on a port at the
+ * DP
+ *
*/
class TrexDpPortEventMsg : public TrexStatelessDpToCpMsgBase {
public:
@@ -326,8 +327,41 @@ private:
int m_thread_id;
uint8_t m_port_id;
int m_event_id;
-
+
};
-#endif /* __TREX_STATELESS_MESSAGING_H__ */
+/************************* messages from CP to RX **********************/
+
+/**
+ * defines the base class for CP to RX messages
+ *
+ */
+class TrexStatelessCpToRxMsgBase {
+public:
+ TrexStatelessCpToRxMsgBase() {
+ }
+
+ virtual ~TrexStatelessCpToRxMsgBase() {
+ }
+
+ /**
+ * virtual function to handle a message
+ *
+ */
+ virtual bool handle (CRxCoreStateless *rx_core) = 0;
+
+ /* no copy constructor */
+ TrexStatelessCpToRxMsgBase(TrexStatelessCpToRxMsgBase &) = delete;
+
+};
+
+class TrexRxStartMsg : public TrexStatelessCpToRxMsgBase {
+ bool handle (CRxCoreStateless *rx_core);
+};
+
+class TrexRxStopMsg : public TrexStatelessCpToRxMsgBase {
+ bool handle (CRxCoreStateless *rx_core);
+};
+
+#endif /* __TREX_STATELESS_MESSAGING_H__ */
diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp
index a108bef3..86711189 100644
--- a/src/stateless/rx/trex_stateless_rx_core.cpp
+++ b/src/stateless/rx/trex_stateless_rx_core.cpp
@@ -1,48 +1,110 @@
#include <stdio.h>
-#include "latency.h"
+#include "bp_sim.h"
#include "flow_stat_parser.h"
-#include "stateless/rx/trex_stateless_rx_core.h"
-
+#include "latency.h"
+#include "trex_stateless_messaging.h"
+#include "trex_stateless_rx_core.h"
void CRxCoreStateless::create(const CRxSlCfg &cfg) {
m_max_ports = cfg.m_max_ports;
+ CMessagingManager * cp_rx = CMsgIns::Ins()->getCpRx();
+
+ m_ring_from_cp = cp_rx->getRingCpToDp(0);
+ m_ring_to_cp = cp_rx->getRingDpToCp(0);
+ m_state = STATE_IDLE;
+
for (int i = 0; i < m_max_ports; i++) {
CLatencyManagerPerPort * lp = &m_ports[i];
- // CCPortLatency * lpo = &m_ports[swap_port(i)].m_port;
-
lp->m_io = cfg.m_ports[i];
- /* lp->m_port.Create(this,
- i,
- m_pkt_gen.get_payload_offset(),
- m_pkt_gen.get_l4_offset(),
- m_pkt_gen.get_pkt_size(),lpo );???*/
}
+}
+void CRxCoreStateless::handle_cp_msg(TrexStatelessCpToRxMsgBase *msg) {
+ msg->handle(this);
+ delete msg;
}
-void CRxCoreStateless::start() {
- static int count = 0;
- static int i = 0;
- while (1) {
- count += try_rx();
- i++;
- if (i == 100000000) {
- i = 0;
- //??? remove
- printf("counter:%d port0:[%u], port1:[%u]\n", count, m_ports[0].m_port.m_rx_pg_pkts[0], m_ports[1].m_port.m_rx_pg_pkts[1]);
+bool CRxCoreStateless::periodic_check_for_cp_messages() {
+ /* fast path */
+ if ( likely ( m_ring_from_cp->isEmpty() ) ) {
+ return false;
+ }
+
+ while ( true ) {
+ CGenNode * node = NULL;
+
+ if (m_ring_from_cp->Dequeue(node) != 0) {
+ break;
+ }
+ assert(node);
+ TrexStatelessCpToRxMsgBase * msg = (TrexStatelessCpToRxMsgBase *)node;
+ handle_cp_msg(msg);
+ }
+
+ return true;
+
+}
+
+void CRxCoreStateless::idle_state_loop() {
+ const int SHORT_DELAY_MS = 2;
+ const int LONG_DELAY_MS = 50;
+ const int DEEP_SLEEP_LIMIT = 2000;
+
+ int counter = 0;
+
+ while (m_state == STATE_IDLE) {
+ bool had_msg = periodic_check_for_cp_messages();
+ if (had_msg) {
+ counter = 0;
+ continue;
+ }
+
+ /* enter deep sleep only if enough time had passed */
+ if (counter < DEEP_SLEEP_LIMIT) {
+ delay(SHORT_DELAY_MS);
+ counter++;
+ } else {
+ delay(LONG_DELAY_MS);
}
}
}
-// ??? temp try
+void CRxCoreStateless::start() {
+ static int count = 0;
+ static int i = 0;
+
+ while (true) {
+ if (m_state == STATE_WORKING) {
+ count += try_rx();
+ i++;
+ if (i == 100) {
+ i = 0;
+ // if no packets in 100 cycles, sleep for a while to spare the cpu
+ if (count == 0) {
+ delay(1);
+ }
+ count = 0;
+ periodic_check_for_cp_messages();
+ }
+ } else {
+ idle_state_loop();
+ }
+#if 0
+ ??? do we need this?
+ if ( m_core->is_terminated_by_master() ) {
+ break;
+ }
+#endif
+ }
+}
+
int CRxCoreStateless::try_rx() {
rte_mbuf_t * rx_pkts[64];
int i, total_pkts = 0;
for (i = 0; i < m_max_ports; i++) {
CLatencyManagerPerPort * lp = &m_ports[i];
rte_mbuf_t * m;
- //m_cpu_dp_u.start_work();
/* try to read 64 packets clean up the queue */
uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64);
total_pkts += cnt_p;
@@ -63,10 +125,8 @@ int CRxCoreStateless::try_rx() {
}
rte_pktmbuf_free(m);
}
- /* commit only if there was work to do ! */
- //m_cpu_dp_u.commit(); //??? what's this?
- }/* if work */
- }// all ports
+ }/* if work */
+ }// all ports
return total_pkts;
}
diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h
index 942ddbd6..eecc8033 100644
--- a/src/stateless/rx/trex_stateless_rx_core.h
+++ b/src/stateless/rx/trex_stateless_rx_core.h
@@ -23,6 +23,8 @@ limitations under the License.
#include <stdint.h>
#include "latency.h"
+class TrexStatelessCpToRxMsgBase;
+
class CRxSlCfg {
public:
CRxSlCfg (){
@@ -37,19 +39,33 @@ class CRxSlCfg {
};
class CRxCoreStateless {
+ enum state_e {
+ STATE_IDLE,
+ STATE_WORKING,
+ };
+
public:
void start();
void create(const CRxSlCfg &cfg);
void reset_rx_stats(uint8_t port_id);
int get_rx_stats(uint8_t port_id, uint32_t *pkts, uint32_t *prev_pkts
, uint32_t *bytes, uint32_t *prev_bytes, int min, int max);
+ void work() {m_state = STATE_WORKING;}
+ void idle() {m_state = STATE_IDLE;}
private:
+ void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg);
+ bool periodic_check_for_cp_messages();
+ void idle_state_loop();
int try_rx();
bool is_flow_stat_id(uint16_t id);
uint16_t get_hw_id(uint16_t id);
-
+
private:
uint32_t m_max_ports;
+ bool m_has_streams;
CLatencyManagerPerPort m_ports[TREX_MAX_PORTS];
+ state_e m_state; /* state of all ports */
+ CNodeRing *m_ring_from_cp;
+ CNodeRing *m_ring_to_cp;
};
#endif