summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xsrc/bp_sim.cpp7
-rwxr-xr-xsrc/bp_sim.h2
-rw-r--r--src/flow_stat.cpp9
-rw-r--r--src/internal_api/trex_platform_api.h2
-rw-r--r--src/latency.cpp8
-rw-r--r--src/latency.h5
-rw-r--r--src/main_dpdk.cpp205
-rw-r--r--src/stateless/cp/trex_stateless.cpp1
-rw-r--r--src/stateless/cp/trex_streams_compiler.cpp3
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.cpp10
-rw-r--r--src/stateless/messaging/trex_stateless_messaging.h8
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.cpp150
-rw-r--r--src/stateless/rx/trex_stateless_rx_core.h45
13 files changed, 310 insertions, 145 deletions
diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp
index 6ea40be2..cc9af837 100755
--- a/src/bp_sim.cpp
+++ b/src/bp_sim.cpp
@@ -6039,8 +6039,13 @@ uint16_t CSimplePacketParser::getPktSize(){
return ( ip_len +m_vlan_offset+14);
}
+uint16_t CSimplePacketParser::getIpId() {
+ if (m_ipv4) {
+ return ( m_ipv4->getId() );
+ }
-
+ return (0);
+}
uint8_t CSimplePacketParser::getTTl(){
if (m_ipv4) {
diff --git a/src/bp_sim.h b/src/bp_sim.h
index 8127261c..4b1a88e3 100755
--- a/src/bp_sim.h
+++ b/src/bp_sim.h
@@ -1246,7 +1246,7 @@ static inline int get_is_rx_check_mode(){
return (CGlobalInfo::m_options.preview.get_is_rx_check_enable() ?1:0);
}
-static inline bool get_is_rx_filter_enable(){//???
+static inline bool get_is_rx_filter_enable(){
uint32_t latency_rate=CGlobalInfo::m_options.m_latency_rate;
return ( ( get_is_rx_check_mode() || CGlobalInfo::is_learn_mode() || latency_rate != 0
|| get_is_stateless()) ?true:false );
diff --git a/src/flow_stat.cpp b/src/flow_stat.cpp
index d44a91da..778c92b9 100644
--- a/src/flow_stat.cpp
+++ b/src/flow_stat.cpp
@@ -387,7 +387,7 @@ CFlowStatRuleMgr::CFlowStatRuleMgr() {
m_api = NULL;
m_max_hw_id = -1;
m_num_started_streams = 0;
- m_ring_to_rx = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
+ m_ring_to_rx = NULL;
}
std::ostream& operator<<(std::ostream& os, const CFlowStatRuleMgr& cf) {
@@ -439,6 +439,8 @@ int CFlowStatRuleMgr::add_stream(const TrexStream * stream) {
std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl;
#endif
+ // Init everything here, and not in the constructor, since we relay on other objects
+ // By the time a stream is added everything else is initialized.
if (! m_api ) {
TrexStateless *tstateless = get_stateless_obj();
m_api = tstateless->get_platform_api();
@@ -455,6 +457,7 @@ int CFlowStatRuleMgr::add_stream(const TrexStream * stream) {
for (uint8_t port = 0; port < m_num_ports; port++) {
assert(m_api->reset_hw_flow_stats(port) == 0);
}
+ m_ring_to_rx = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
}
if (no_stat_supported)
@@ -641,9 +644,9 @@ void CFlowStatRuleMgr::send_start_stop_msg_to_rx(bool is_start) {
TrexStatelessCpToRxMsgBase *msg;
if (is_start) {
- msg = new TrexRxStartMsg();
+ msg = new TrexStatelessRxStartMsg();
} else {
- msg = new TrexRxStopMsg();
+ msg = new TrexStatelessRxStopMsg();
}
m_ring_to_rx->Enqueue((CGenNode *)msg);
}
diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h
index fc5da491..dbca5a8a 100644
--- a/src/internal_api/trex_platform_api.h
+++ b/src/internal_api/trex_platform_api.h
@@ -42,7 +42,7 @@ public:
struct {
double m_cpu_util;
-
+ double m_rx_cpu_util;
double m_tx_bps;
double m_rx_bps;
diff --git a/src/latency.cpp b/src/latency.cpp
index 930876b3..fff7935d 100644
--- a/src/latency.cpp
+++ b/src/latency.cpp
@@ -178,8 +178,7 @@ void CCPortLatency::reset(){
m_length_error=0;
m_no_ipv4_option=0;
for (int i = 0; i < MAX_FLOW_STATS; i++) {
- m_rx_pg_pkts[i] = 0;
- m_rx_pg_bytes[i] = 0;
+ m_rx_pg_stat[i].clear();
}
m_hist.Reset();
}
@@ -632,8 +631,8 @@ void CLatencyManager::handle_rx_pkt(CLatencyManagerPerPort * lp,
rte_pktmbuf_free(m);
}
-void CLatencyManager::handle_latency_pkt_msg(uint8_t thread_id,
- CGenNodeLatencyPktInfo * msg){
+// In VM, we receive the RX packets in DP core, and send message to RX core with the packet
+void CLatencyManager::handle_latency_pkt_msg(uint8_t thread_id, CGenNodeLatencyPktInfo * msg) {
assert(msg->m_latency_offset==0xdead);
@@ -670,6 +669,7 @@ void CLatencyManager::run_rx_queue_msgs(uint8_t thread_id,
}
}
+// VM mode function. Handle messages from DP
void CLatencyManager::try_rx_queues(){
CMessagingManager * rx_dp = CMsgIns::Ins()->getRxDp();
diff --git a/src/latency.h b/src/latency.h
index e85e3a5a..3dd1cc36 100644
--- a/src/latency.h
+++ b/src/latency.h
@@ -86,6 +86,7 @@ public:
bool Parse();
uint8_t getTTl();
+ uint16_t getIpId();
uint16_t getPktSize();
// Check if packet contains latency data
@@ -244,9 +245,7 @@ public:
uint64_t m_rx_check;
uint64_t m_no_ipv4_option;
uint64_t m_length_error;
- uint32_t m_rx_pg_pkts[MAX_FLOW_STATS];
- uint32_t m_rx_pg_bytes[MAX_FLOW_STATS];
-
+ rx_per_flow_t m_rx_pg_stat[MAX_FLOW_STATS];
CTimeHistogram m_hist; /* all window */
CJitter m_jitter;
};
diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp
index 9e690951..94361ec4 100644
--- a/src/main_dpdk.cpp
+++ b/src/main_dpdk.cpp
@@ -188,11 +188,9 @@ public:
virtual void get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats);
virtual void clear_extended_stats(CPhyEthIF * _if);
- int get_rx_stats(CPhyEthIF * _if, uint32_t *pkts, uint32_t *prev_pkts, uint32_t *bytes, uint32_t *prev_bytes
- , int min, int max);
int dump_fdir_global_stats(CPhyEthIF * _if, FILE *fd) {return 0;}
int get_stat_counters_num() {return MAX_FLOW_STATS;}
- int get_rx_stat_capabilities() {return TrexPlatformApi::IF_STAT_IPV4_ID;}
+ int get_rx_stat_capabilities() {return TrexPlatformApi::IF_STAT_IPV4_ID;}
virtual int wait_for_stable_link();
void wait_after_link_up();
};
@@ -244,6 +242,8 @@ public:
virtual void clear_extended_stats(CPhyEthIF * _if);
virtual int wait_for_stable_link();
+ int get_stat_counters_num() {return MAX_FLOW_STATS;}
+ int get_rx_stat_capabilities() {return TrexPlatformApi::IF_STAT_IPV4_ID;}
};
@@ -270,15 +270,17 @@ public:
virtual bool is_hardware_filter_is_supported(){
return (true);
}
-
virtual int configure_rx_filter_rules(CPhyEthIF * _if);
-
+ virtual int configure_rx_filter_rules_stateless(CPhyEthIF * _if);
+ virtual int configure_rx_filter_rules_statefull(CPhyEthIF * _if);
virtual bool is_hardware_support_drop_queue(){
return(true);
}
virtual void get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats);
virtual void clear_extended_stats(CPhyEthIF * _if);
virtual int wait_for_stable_link();
+ virtual int get_stat_counters_num() {return MAX_FLOW_STATS;}
+ virtual int get_rx_stat_capabilities() {return TrexPlatformApi::IF_STAT_IPV4_ID;}
};
class CTRexExtendedDriverBase40G : public CTRexExtendedDriverBase10G {
@@ -1106,15 +1108,19 @@ public:
m_port_conf.fdir_conf.status=RTE_FDIR_NO_REPORT_STATUS;
/* Offset of flexbytes field in RX packets (in 16-bit word units). */
/* Note: divide by 2 to convert byte offset to word offset */
- if ( CGlobalInfo::m_options.preview.get_ipv6_mode_enable() ){
- m_port_conf.fdir_conf.flexbytes_offset=(14+6)/2;
- }else{
- m_port_conf.fdir_conf.flexbytes_offset=(14+8)/2;
- }
+ if (get_is_stateless()) {
+ m_port_conf.fdir_conf.flexbytes_offset = (14+4)/2;
+ } else {
+ if ( CGlobalInfo::m_options.preview.get_ipv6_mode_enable() ) {
+ m_port_conf.fdir_conf.flexbytes_offset = (14+6)/2;
+ } else {
+ m_port_conf.fdir_conf.flexbytes_offset = (14+8)/2;
+ }
- /* Increment offset 4 bytes for the case where we add VLAN */
- if ( CGlobalInfo::m_options.preview.get_vlan_mode_enable() ){
- m_port_conf.fdir_conf.flexbytes_offset+=(4/2);
+ /* Increment offset 4 bytes for the case where we add VLAN */
+ if ( CGlobalInfo::m_options.preview.get_vlan_mode_enable() ) {
+ m_port_conf.fdir_conf.flexbytes_offset += (4/2);
+ }
}
m_port_conf.fdir_conf.drop_queue=1;
}
@@ -1225,6 +1231,7 @@ void CPhyEthIFStats::Dump(FILE *fd){
DP_A(rx_nombuf);
}
+// Clear the RX queue of an interface, dropping all packets
void CPhyEthIF::flush_rx_queue(void){
rte_mbuf_t * rx_pkts[32];
@@ -1797,6 +1804,9 @@ bool CCoreEthIF::Create(uint8_t core_id,
return (true);
}
+// This function is only relevant if we are in VM. In this case, we only have one rx queue. Can't have
+// rules to drop queue 0, and pass queue 1 to RX core, like in other cases.
+// We receive all packets in the same core that transmitted, and handle them to RX core.
void CCoreEthIF::flush_rx_queue(void){
pkt_dir_t dir ;
bool is_rx = get_is_rx_thread_enabled();
@@ -2311,6 +2321,7 @@ public:
float m_active_flows;
float m_open_flows;
float m_cpu_util;
+ float m_rx_cpu_util;
uint8_t m_threads;
uint32_t m_num_of_ports;
@@ -2605,10 +2616,11 @@ public:
int reset_counters();
private:
- /* try to stop all datapath cores */
- void try_stop_all_dp();
+ /* try to stop all datapath cores and RX core */
+ void try_stop_all_cores();
/* send message to all dp cores */
int send_message_all_dp(TrexStatelessCpToDpMsgBase *msg);
+ int send_message_to_rx(TrexStatelessCpToRxMsgBase *msg);
void check_for_dp_message_from_core(int thread_id);
public:
@@ -2616,7 +2628,6 @@ public:
int start_master_statefull();
int start_master_stateless();
int run_in_core(virtual_thread_id_t virt_core_id);
- int stop_core(virtual_thread_id_t virt_core_id);
int core_for_rx(){
if ( (! get_is_rx_thread_enabled()) ) {
return -1;
@@ -2687,7 +2698,8 @@ public:
CParserOption m_po ;
CFlowGenList m_fl;
bool m_fl_was_init;
- volatile uint8_t m_signal[BP_MAX_CORES] __rte_cache_aligned ;
+ volatile uint8_t m_signal[BP_MAX_CORES] __rte_cache_aligned ; // Signal to main core when DP thread finished
+ volatile bool m_rx_running; // Signal main core when RX thread finished
CLatencyManager m_mg; // statefull RX core
CRxCoreStateless m_rx_sl; // stateless RX core
CTrexGlobalIoMode m_io_modes;
@@ -2776,12 +2788,14 @@ bool CGlobalTRex::is_all_links_are_up(bool dump){
return (all_link_are);
}
+void CGlobalTRex::try_stop_all_cores(){
-void CGlobalTRex::try_stop_all_dp(){
-
- TrexStatelessDpQuit * msg= new TrexStatelessDpQuit();
- send_message_all_dp(msg);
- delete msg;
+ TrexStatelessDpQuit * dp_msg= new TrexStatelessDpQuit();
+ TrexStatelessRxQuit * rx_msg= new TrexStatelessRxQuit();
+ send_message_all_dp(dp_msg);
+ send_message_to_rx(rx_msg);
+ delete dp_msg;
+ // no need to delete rx_msg. Deleted by receiver
bool all_core_finished = false;
int i;
for (i=0; i<20; i++) {
@@ -2812,6 +2826,13 @@ int CGlobalTRex::send_message_all_dp(TrexStatelessCpToDpMsgBase *msg){
return (0);
}
+int CGlobalTRex::send_message_to_rx(TrexStatelessCpToRxMsgBase *msg) {
+ CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0);
+ ring->Enqueue((CGenNode *) msg);
+
+ return (0);
+}
+
int CGlobalTRex::ixgbe_rx_queue_flush(){
int i;
@@ -2868,12 +2889,11 @@ void CGlobalTRex::ixgbe_configure_mg(void) {
// init m_rx_sl object for stateless rx core
void CGlobalTRex::rx_sl_configure(void) {
CRxSlCfg rx_sl_cfg;
+ int i;
rx_sl_cfg.m_max_ports = m_max_ports;
if ( get_vm_one_queue_enable() ) {
-#if 0
- /// what to do here ???
/* vm mode, indirect queues */
for (i=0; i < m_max_ports; i++) {
CMessagingManager * rx_dp = CMsgIns::Ins()->getRxDp();
@@ -2882,9 +2902,8 @@ void CGlobalTRex::rx_sl_configure(void) {
m_latency_vm_vports[i].Create((uint8_t)i, r, &m_mg);
rx_sl_cfg.m_ports[i] = &m_latency_vm_vports[i];
}
-#endif
} else {
- for (int i = 0; i < m_max_ports; i++) {
+ for (i = 0; i < m_max_ports; i++) {
CPhyEthIF * _if = &m_ports[i];
m_latency_vports[i].Create(_if, m_latency_tx_queue_id, 1);
rx_sl_cfg.m_ports[i] = &m_latency_vports[i];
@@ -3403,6 +3422,9 @@ void CGlobalTRex::get_stats(CGlobalStats & stats){
stats.m_num_of_ports = m_max_ports;
stats.m_cpu_util = m_fl.GetCpuUtil();
+ if (get_is_stateless()) {
+ stats.m_rx_cpu_util = m_rx_sl.get_cpu_util();
+ }
stats.m_threads = m_fl.m_threads_info.size();
for (i=0; i<m_max_ports; i++) {
@@ -3766,7 +3788,7 @@ int CGlobalTRex::run_in_master() {
if (!is_all_cores_finished()) {
/* probably CLTR-C */
- try_stop_all_dp();
+ try_stop_all_cores();
}
m_mg.stop();
@@ -3782,19 +3804,16 @@ int CGlobalTRex::run_in_master() {
int CGlobalTRex::run_in_rx_core(void){
if (get_is_stateless()) {
+ m_rx_running = true;
m_rx_sl.start();
} else {
if ( CGlobalInfo::m_options.is_rx_enabled() ){
+ m_rx_running = true;
m_mg.start(0);
}
}
- return (0);
-}
-
-
-int CGlobalTRex::stop_core(virtual_thread_id_t virt_core_id){
- m_signal[virt_core_id]=1;
+ m_rx_running = false;
return (0);
}
@@ -3879,14 +3898,17 @@ int CGlobalTRex::stop_master(){
return (0);
}
-bool CGlobalTRex::is_all_cores_finished(){
+bool CGlobalTRex::is_all_cores_finished() {
int i;
for (i=0; i<get_cores_tx(); i++) {
if ( m_signal[i+1]==0){
- return (false);
+ return false;
}
}
- return (true);
+ if (m_rx_running)
+ return false;
+
+ return true;
}
@@ -3972,7 +3994,6 @@ int CGlobalTRex::start_master_statefull() {
////////////////////////////////////////////
-
static CGlobalTRex g_trex;
int CPhyEthIF::reset_hw_flow_stats() {
@@ -3994,30 +4015,39 @@ int CPhyEthIF::reset_hw_flow_stats() {
int CPhyEthIF::get_flow_stats(rx_per_flow_t *rx_stats, tx_per_flow_t *tx_stats, int min, int max, bool reset) {
uint32_t diff_pkts[MAX_FLOW_STATS];
uint32_t diff_bytes[MAX_FLOW_STATS];
+ bool hw_rx_stat_supported = get_ex_drv()->hw_rx_stat_supported();
- if (get_ex_drv()->get_rx_stats(this, diff_pkts, m_stats.m_fdir_prev_pkts
- , diff_bytes, m_stats.m_fdir_prev_bytes, min, max) < 0) {
- return -1;
+ if (hw_rx_stat_supported) {
+ if (get_ex_drv()->get_rx_stats(this, diff_pkts, m_stats.m_fdir_prev_pkts
+ , diff_bytes, m_stats.m_fdir_prev_bytes, min, max) < 0) {
+ return -1;
+ }
+ } else {
+ g_trex.m_rx_sl.get_rx_stats(get_port_id(), rx_stats, min, max, reset);
}
for (int i = min; i <= max; i++) {
if ( reset ) {
// return value so far, and reset
- if (rx_stats != NULL) {
- rx_stats[i - min].set_pkts(m_stats.m_rx_per_flow_pkts[i] + diff_pkts[i]);
- rx_stats[i - min].set_bytes(m_stats.m_rx_per_flow_bytes[i] + diff_bytes[i]);
+ if (hw_rx_stat_supported) {
+ if (rx_stats != NULL) {
+ rx_stats[i - min].set_pkts(m_stats.m_rx_per_flow_pkts[i] + diff_pkts[i]);
+ rx_stats[i - min].set_bytes(m_stats.m_rx_per_flow_bytes[i] + diff_bytes[i]);
+ }
+ m_stats.m_rx_per_flow_pkts[i] = 0;
+ m_stats.m_rx_per_flow_bytes[i] = 0;
}
if (tx_stats != NULL) {
tx_stats[i - min] = g_trex.clear_flow_tx_stats(m_port_id, i);
}
- m_stats.m_rx_per_flow_pkts[i] = 0;
- m_stats.m_rx_per_flow_bytes[i] = 0;
} else {
- m_stats.m_rx_per_flow_pkts[i] += diff_pkts[i];
- m_stats.m_rx_per_flow_bytes[i] += diff_bytes[i];
- if (rx_stats != NULL) {
- rx_stats[i - min].set_pkts(m_stats.m_rx_per_flow_pkts[i]);
- rx_stats[i - min].set_bytes(m_stats.m_rx_per_flow_bytes[i]);
+ if (hw_rx_stat_supported) {
+ m_stats.m_rx_per_flow_pkts[i] += diff_pkts[i];
+ m_stats.m_rx_per_flow_bytes[i] += diff_bytes[i];
+ if (rx_stats != NULL) {
+ rx_stats[i - min].set_pkts(m_stats.m_rx_per_flow_pkts[i]);
+ rx_stats[i - min].set_bytes(m_stats.m_rx_per_flow_bytes[i]);
+ }
}
if (tx_stats != NULL) {
tx_stats[i - min] = g_trex.get_flow_tx_stats(m_port_id, i);
@@ -4028,6 +4058,8 @@ int CPhyEthIF::get_flow_stats(rx_per_flow_t *rx_stats, tx_per_flow_t *tx_stats,
return 0;
}
+// If needed, send packets to rx core for processing.
+// This is relevant only in VM case, where we receive packets to the working DP core (only 1 DP core in this case)
bool CCoreEthIF::process_rx_pkt(pkt_dir_t dir,
rte_mbuf_t * m){
@@ -4036,17 +4068,25 @@ bool CCoreEthIF::process_rx_pkt(pkt_dir_t dir,
return false;
}
bool send=false;
- CLatencyPktMode *c_l_pkt_mode = g_trex.m_mg.c_l_pkt_mode;
- bool is_lateancy_pkt = c_l_pkt_mode->IsLatencyPkt(parser.m_ipv4) & parser.IsLatencyPkt(parser.m_l4 + c_l_pkt_mode->l4_header_len());
- if (is_lateancy_pkt){
- send=true;
- }else{
- if ( get_is_rx_filter_enable() ){
- uint8_t max_ttl = 0xff - get_rx_check_hops();
- uint8_t pkt_ttl = parser.getTTl();
- if ( (pkt_ttl==max_ttl) || (pkt_ttl==(max_ttl-1) ) ) {
- send=true;
+ if ( get_is_stateless() ) {
+ // In stateless RX, we only care about flow stat packets
+ if ((parser.getIpId() & 0xff00) == IP_ID_RESERVE_BASE) {
+ send = true;
+ }
+ } else {
+ CLatencyPktMode *c_l_pkt_mode = g_trex.m_mg.c_l_pkt_mode;
+ bool is_lateancy_pkt = c_l_pkt_mode->IsLatencyPkt(parser.m_ipv4) & parser.IsLatencyPkt(parser.m_l4 + c_l_pkt_mode->l4_header_len());
+
+ if (is_lateancy_pkt) {
+ send = true;
+ } else {
+ if ( get_is_rx_filter_enable() ) {
+ uint8_t max_ttl = 0xff - get_rx_check_hops();
+ uint8_t pkt_ttl = parser.getTTl();
+ if ( (pkt_ttl==max_ttl) || (pkt_ttl==(max_ttl-1) ) ) {
+ send=true;
+ }
}
}
}
@@ -4086,7 +4126,6 @@ static int latency_one_lcore(__attribute__((unused)) void *dummy)
CPlatformSocketInfo * lpsock=&CGlobalInfo::m_socket;
physical_thread_id_t phy_id =rte_lcore_id();
-
if ( lpsock->thread_phy_is_rx(phy_id) ) {
g_trex.run_in_rx_core();
}else{
@@ -4444,6 +4483,7 @@ int main_test(int argc , char * argv[]){
g_trex.reset_counters();
}
+ g_trex.m_rx_running = false;
if ( get_is_stateless() ) {
g_trex.start_master_stateless();
@@ -4731,11 +4771,13 @@ void CTRexExtendedDriverBase1G::get_extended_stats(CPhyEthIF * _if,CPhyEthIFStat
void CTRexExtendedDriverBase1G::clear_extended_stats(CPhyEthIF * _if){
}
+#if 0
int CTRexExtendedDriverBase1G::get_rx_stats(CPhyEthIF * _if, uint32_t *pkts, uint32_t *prev_pkts
,uint32_t *bytes, uint32_t *prev_bytes, int min, int max) {
uint32_t port_id = _if->get_port_id();
return g_trex.m_rx_sl.get_rx_stats(port_id, pkts, prev_pkts, bytes, prev_bytes, min, max);
}
+#endif
void CTRexExtendedDriverBase10G::clear_extended_stats(CPhyEthIF * _if){
_if->pci_reg_read(IXGBE_RXNFGPC);
@@ -4751,7 +4793,43 @@ void CTRexExtendedDriverBase10G::update_configuration(port_cfg_t * cfg){
cfg->m_tx_conf.tx_thresh.wthresh = TX_WTHRESH;
}
-int CTRexExtendedDriverBase10G::configure_rx_filter_rules(CPhyEthIF * _if){
+int CTRexExtendedDriverBase10G::configure_rx_filter_rules(CPhyEthIF * _if) {
+ if ( get_is_stateless() ) {
+ return configure_rx_filter_rules_stateless(_if);
+ } else {
+ return configure_rx_filter_rules_statefull(_if);
+ }
+
+ return 0;
+}
+
+int CTRexExtendedDriverBase10G::configure_rx_filter_rules_stateless(CPhyEthIF * _if) {
+ uint8_t port_id = _if->get_rte_port_id();
+ int ip_id_lsb;
+
+ for (ip_id_lsb = 0; ip_id_lsb < MAX_FLOW_STATS; ip_id_lsb++ ) {
+ struct rte_eth_fdir_filter fdir_filter;
+ int res = 0;
+
+ memset(&fdir_filter,0,sizeof(fdir_filter));
+ fdir_filter.input.flow_type = RTE_ETH_FLOW_NONFRAG_IPV4_OTHER;
+ fdir_filter.soft_id = ip_id_lsb; // We can use the ip_id_lsb also as filter soft_id
+ fdir_filter.input.flow_ext.flexbytes[0] = 0xff;
+ fdir_filter.input.flow_ext.flexbytes[1] = ip_id_lsb;
+ fdir_filter.action.rx_queue = 1;
+ fdir_filter.action.behavior = RTE_ETH_FDIR_ACCEPT;
+ fdir_filter.action.report_status = RTE_ETH_FDIR_NO_REPORT_STATUS;
+ res = rte_eth_dev_filter_ctrl(port_id, RTE_ETH_FILTER_FDIR, RTE_ETH_FILTER_ADD, &fdir_filter);
+
+ if (res != 0) {
+ rte_exit(EXIT_FAILURE, " ERROR rte_eth_dev_filter_ctrl : %d\n",res);
+ }
+ }
+
+ return 0;
+}
+
+int CTRexExtendedDriverBase10G::configure_rx_filter_rules_statefull(CPhyEthIF * _if) {
uint8_t port_id=_if->get_rte_port_id();
uint16_t hops = get_rx_check_hops();
uint16_t v4_hops = (hops << 8)&0xff00;
@@ -5208,6 +5286,9 @@ TrexDpdkPlatformApi::get_global_stats(TrexPlatformGlobalStats &stats) const {
g_trex.get_stats(trex_stats);
stats.m_stats.m_cpu_util = trex_stats.m_cpu_util;
+ if (get_is_stateless()) {
+ stats.m_stats.m_rx_cpu_util = trex_stats.m_rx_cpu_util;
+ }
stats.m_stats.m_tx_bps = trex_stats.m_tx_bps;
stats.m_stats.m_tx_pps = trex_stats.m_tx_pps;
diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp
index 9e24802b..9df57a50 100644
--- a/src/stateless/cp/trex_stateless.cpp
+++ b/src/stateless/cp/trex_stateless.cpp
@@ -132,6 +132,7 @@ TrexStateless::encode_stats(Json::Value &global) {
api->get_global_stats(stats);
global["cpu_util"] = stats.m_stats.m_cpu_util;
+ global["rx_cpu_util"] = stats.m_stats.m_rx_cpu_util;
global["tx_bps"] = stats.m_stats.m_tx_bps;
global["rx_bps"] = stats.m_stats.m_rx_bps;
diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp
index be5002da..563236c2 100644
--- a/src/stateless/cp/trex_streams_compiler.cpp
+++ b/src/stateless/cp/trex_streams_compiler.cpp
@@ -477,7 +477,8 @@ TrexStreamsCompiler::compile_stream(TrexStream *stream,
TrexStream *fixed_rx_flow_stat_stream = stream->clone(true);
- get_stateless_obj()->m_rx_flow_stat.start_stream(fixed_rx_flow_stat_stream, fixed_rx_flow_stat_stream->m_rx_check.m_hw_id); //???? check for errors
+ // not checking for errors. We assume that if add_stream succeeded, start_stream will too.
+ get_stateless_obj()->m_rx_flow_stat.start_stream(fixed_rx_flow_stat_stream, fixed_rx_flow_stat_stream->m_rx_check.m_hw_id);
/* can this stream be split to many cores ? */
if (!stream->is_splitable(dp_core_count)) {
diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp
index 3468d622..7edf0f13 100644
--- a/src/stateless/messaging/trex_stateless_messaging.cpp
+++ b/src/stateless/messaging/trex_stateless_messaging.cpp
@@ -209,13 +209,17 @@ TrexDpPortEventMsg::handle() {
}
/************************* messages from CP to RX **********************/
-bool TrexRxStartMsg::handle (CRxCoreStateless *rx_core) {
+bool TrexStatelessRxStartMsg::handle (CRxCoreStateless *rx_core) {
rx_core->work();
return true;
}
-/************************* messages from CP to RX **********************/
-bool TrexRxStopMsg::handle (CRxCoreStateless *rx_core) {
+bool TrexStatelessRxStopMsg::handle (CRxCoreStateless *rx_core) {
rx_core->idle();
return true;
}
+
+bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) {
+ rx_core->quit();
+ return true;
+}
diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h
index b7e8fd3f..0eed01bd 100644
--- a/src/stateless/messaging/trex_stateless_messaging.h
+++ b/src/stateless/messaging/trex_stateless_messaging.h
@@ -356,11 +356,15 @@ public:
};
-class TrexRxStartMsg : public TrexStatelessCpToRxMsgBase {
+class TrexStatelessRxStartMsg : public TrexStatelessCpToRxMsgBase {
bool handle (CRxCoreStateless *rx_core);
};
-class TrexRxStopMsg : public TrexStatelessCpToRxMsgBase {
+class TrexStatelessRxStopMsg : public TrexStatelessCpToRxMsgBase {
+ bool handle (CRxCoreStateless *rx_core);
+};
+
+class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase {
bool handle (CRxCoreStateless *rx_core);
};
diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp
index 86711189..ab7c08d1 100644
--- a/src/stateless/rx/trex_stateless_rx_core.cpp
+++ b/src/stateless/rx/trex_stateless_rx_core.cpp
@@ -18,6 +18,7 @@ void CRxCoreStateless::create(const CRxSlCfg &cfg) {
CLatencyManagerPerPort * lp = &m_ports[i];
lp->m_io = cfg.m_ports[i];
}
+ m_cpu_cp_u.Create(&m_cpu_dp_u);
}
void CRxCoreStateless::handle_cp_msg(TrexStatelessCpToRxMsgBase *msg) {
@@ -71,32 +72,94 @@ void CRxCoreStateless::idle_state_loop() {
}
void CRxCoreStateless::start() {
- static int count = 0;
- static int i = 0;
-
- while (true) {
- if (m_state == STATE_WORKING) {
- count += try_rx();
- i++;
- if (i == 100) {
- i = 0;
- // if no packets in 100 cycles, sleep for a while to spare the cpu
- if (count == 0) {
- delay(1);
- }
- count = 0;
- periodic_check_for_cp_messages();
- }
- } else {
- idle_state_loop();
- }
-#if 0
- ??? do we need this?
- if ( m_core->is_terminated_by_master() ) {
- break;
- }
-#endif
- }
+ static int count = 0;
+ static int i = 0;
+ bool do_try_rx_queue =CGlobalInfo::m_options.preview.get_vm_one_queue_enable() ? true : false;
+
+ while (true) {
+ if (m_state == STATE_WORKING) {
+ i++;
+ if (i == 100) {
+ i = 0;
+ // if no packets in 100 cycles, sleep for a while to spare the cpu
+ if (count == 0) {
+ delay(1);
+ }
+ count = 0;
+ periodic_check_for_cp_messages(); // m_state might change in here
+ }
+ } else {
+ if (m_state == STATE_QUIT)
+ break;
+ idle_state_loop();
+ }
+ if (do_try_rx_queue) {
+ try_rx_queues();
+ }
+ count += try_rx();
+ }
+}
+
+void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPort *lp, rte_mbuf_t *m) {
+ Cxl710Parser parser;
+
+ if (parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) {
+ uint16_t ip_id;
+ if (parser.get_ip_id(ip_id) == 0) {
+ if (is_flow_stat_id(ip_id)) {
+ uint16_t hw_id = get_hw_id(ip_id);
+ lp->m_port.m_rx_pg_stat[hw_id].add_pkts(1);
+ lp->m_port.m_rx_pg_stat[hw_id].add_bytes(m->pkt_len);
+ }
+ }
+ }
+}
+
+// In VM setup, handle packets coming as messages from DP cores.
+void CRxCoreStateless::handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r) {
+ while ( true ) {
+ CGenNode * node;
+ if ( r->Dequeue(node) != 0 ) {
+ break;
+ }
+ assert(node);
+
+ CGenNodeMsgBase * msg = (CGenNodeMsgBase *)node;
+ CGenNodeLatencyPktInfo * l_msg;
+ uint8_t msg_type = msg->m_msg_type;
+ uint8_t rx_port_index;
+ CLatencyManagerPerPort * lp;
+
+ switch (msg_type) {
+ case CGenNodeMsgBase::LATENCY_PKT:
+ l_msg = (CGenNodeLatencyPktInfo *)msg;
+ assert(l_msg->m_latency_offset == 0xdead);
+ rx_port_index = (thread_id << 1) + (l_msg->m_dir & 1);
+ assert( rx_port_index < m_max_ports );
+ lp = &m_ports[rx_port_index];
+ handle_rx_pkt(lp, (rte_mbuf_t *)l_msg->m_pkt);
+ break;
+ default:
+ printf("ERROR latency-thread message type is not valid %d \n", msg_type);
+ assert(0);
+ }
+
+ CGlobalInfo::free_node(node);
+ }
+}
+
+// VM mode function. Handle messages from DP
+void CRxCoreStateless::try_rx_queues() {
+
+ CMessagingManager * rx_dp = CMsgIns::Ins()->getRxDp();
+ uint8_t threads=CMsgIns::Ins()->get_num_threads();
+ int ti;
+ for (ti = 0; ti < (int)threads; ti++) {
+ CNodeRing * r = rx_dp->getRingDpToCp(ti);
+ if ( ! r->isEmpty() ) {
+ handle_rx_queue_msgs((uint8_t)ti, r);
+ }
+ }
}
int CRxCoreStateless::try_rx() {
@@ -105,26 +168,19 @@ int CRxCoreStateless::try_rx() {
for (i = 0; i < m_max_ports; i++) {
CLatencyManagerPerPort * lp = &m_ports[i];
rte_mbuf_t * m;
+ m_cpu_dp_u.start_work();
/* try to read 64 packets clean up the queue */
uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64);
total_pkts += cnt_p;
if (cnt_p) {
int j;
for (j = 0; j < cnt_p; j++) {
- Cxl710Parser parser;
m = rx_pkts[j];
- if (parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) {
- uint16_t ip_id;
- if (parser.get_ip_id(ip_id) == 0) {
- if (is_flow_stat_id(ip_id)) {
- uint16_t hw_id = get_hw_id(ip_id);
- m_ports[i].m_port.m_rx_pg_bytes[hw_id] += m->pkt_len;
- m_ports[i].m_port.m_rx_pg_pkts[hw_id]++;
- }
- }
- }
+ handle_rx_pkt(lp, m);
rte_pktmbuf_free(m);
}
+ /* commit only if there was work to do ! */
+ m_cpu_dp_u.commit();
}/* if work */
}// all ports
return total_pkts;
@@ -141,19 +197,21 @@ uint16_t CRxCoreStateless::get_hw_id(uint16_t id) {
void CRxCoreStateless::reset_rx_stats(uint8_t port_id) {
for (int hw_id = 0; hw_id < MAX_FLOW_STATS; hw_id++) {
- m_ports[port_id].m_port.m_rx_pg_bytes[hw_id] = 0;
- m_ports[port_id].m_port.m_rx_pg_pkts[hw_id] = 0;
+ m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear();
}
}
-int CRxCoreStateless::get_rx_stats(uint8_t port_id, uint32_t *pkts, uint32_t *prev_pkts
- , uint32_t *bytes, uint32_t *prev_bytes, int min, int max) {
+int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset) {
for (int hw_id = min; hw_id <= max; hw_id++) {
- pkts[hw_id] = m_ports[port_id].m_port.m_rx_pg_pkts[hw_id] - prev_pkts[hw_id];
- prev_pkts[hw_id] = m_ports[port_id].m_port.m_rx_pg_pkts[hw_id];
- bytes[hw_id] = m_ports[port_id].m_port.m_rx_pg_bytes[hw_id] - prev_bytes[hw_id];
- prev_bytes[hw_id] = m_ports[port_id].m_port.m_rx_pg_bytes[hw_id];
+ rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat[hw_id];
+ if (reset) {
+ m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear();
+ }
}
-
return 0;
}
+
+double CRxCoreStateless::get_cpu_util() {
+ m_cpu_cp_u.Update();
+ return m_cpu_cp_u.GetVal();
+}
diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h
index eecc8033..5ab12f4e 100644
--- a/src/stateless/rx/trex_stateless_rx_core.h
+++ b/src/stateless/rx/trex_stateless_rx_core.h
@@ -1,27 +1,28 @@
/*
- Ido Barnea
- Cisco Systems, Inc.
+ Ido Barnea
+ Cisco Systems, Inc.
*/
/*
-Copyright (c) 2016-2016 Cisco Systems, Inc.
+ Copyright (c) 2016-2016 Cisco Systems, Inc.
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
+ http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
*/
#ifndef __TREX_STATELESS_RX_CORE_H__
#define __TREX_STATELESS_RX_CORE_H__
#include <stdint.h>
#include "latency.h"
+#include "utl_cpuu.h"
class TrexStatelessCpToRxMsgBase;
@@ -34,7 +35,7 @@ class CRxSlCfg {
public:
uint32_t m_max_ports;
- double m_cps;// CPS
+ double m_cps;
CPortLatencyHWBase * m_ports[TREX_MAX_PORTS];
};
@@ -42,21 +43,27 @@ class CRxCoreStateless {
enum state_e {
STATE_IDLE,
STATE_WORKING,
+ STATE_QUIT
};
public:
void start();
void create(const CRxSlCfg &cfg);
void reset_rx_stats(uint8_t port_id);
- int get_rx_stats(uint8_t port_id, uint32_t *pkts, uint32_t *prev_pkts
- , uint32_t *bytes, uint32_t *prev_bytes, int min, int max);
+ int get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset);
void work() {m_state = STATE_WORKING;}
void idle() {m_state = STATE_IDLE;}
+ void quit() {m_state = STATE_QUIT;}
+ double get_cpu_util();
+
private:
void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg);
bool periodic_check_for_cp_messages();
void idle_state_loop();
+ void handle_rx_pkt(CLatencyManagerPerPort * lp, rte_mbuf_t * m);
+ void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r);
int try_rx();
+ void try_rx_queues();
bool is_flow_stat_id(uint16_t id);
uint16_t get_hw_id(uint16_t id);
@@ -64,8 +71,10 @@ class CRxCoreStateless {
uint32_t m_max_ports;
bool m_has_streams;
CLatencyManagerPerPort m_ports[TREX_MAX_PORTS];
- state_e m_state; /* state of all ports */
- CNodeRing *m_ring_from_cp;
- CNodeRing *m_ring_to_cp;
+ state_e m_state; /* state of all ports */
+ CNodeRing *m_ring_from_cp;
+ CNodeRing *m_ring_to_cp;
+ CCpuUtlDp m_cpu_dp_u;
+ CCpuUtlCp m_cpu_cp_u;
};
#endif