diff options
-rwxr-xr-x | src/bp_sim.cpp | 7 | ||||
-rwxr-xr-x | src/bp_sim.h | 2 | ||||
-rw-r--r-- | src/flow_stat.cpp | 9 | ||||
-rw-r--r-- | src/internal_api/trex_platform_api.h | 2 | ||||
-rw-r--r-- | src/latency.cpp | 8 | ||||
-rw-r--r-- | src/latency.h | 5 | ||||
-rw-r--r-- | src/main_dpdk.cpp | 205 | ||||
-rw-r--r-- | src/stateless/cp/trex_stateless.cpp | 1 | ||||
-rw-r--r-- | src/stateless/cp/trex_streams_compiler.cpp | 3 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.cpp | 10 | ||||
-rw-r--r-- | src/stateless/messaging/trex_stateless_messaging.h | 8 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_core.cpp | 150 | ||||
-rw-r--r-- | src/stateless/rx/trex_stateless_rx_core.h | 45 |
13 files changed, 310 insertions, 145 deletions
diff --git a/src/bp_sim.cpp b/src/bp_sim.cpp index 6ea40be2..cc9af837 100755 --- a/src/bp_sim.cpp +++ b/src/bp_sim.cpp @@ -6039,8 +6039,13 @@ uint16_t CSimplePacketParser::getPktSize(){ return ( ip_len +m_vlan_offset+14); } +uint16_t CSimplePacketParser::getIpId() { + if (m_ipv4) { + return ( m_ipv4->getId() ); + } - + return (0); +} uint8_t CSimplePacketParser::getTTl(){ if (m_ipv4) { diff --git a/src/bp_sim.h b/src/bp_sim.h index 8127261c..4b1a88e3 100755 --- a/src/bp_sim.h +++ b/src/bp_sim.h @@ -1246,7 +1246,7 @@ static inline int get_is_rx_check_mode(){ return (CGlobalInfo::m_options.preview.get_is_rx_check_enable() ?1:0); } -static inline bool get_is_rx_filter_enable(){//??? +static inline bool get_is_rx_filter_enable(){ uint32_t latency_rate=CGlobalInfo::m_options.m_latency_rate; return ( ( get_is_rx_check_mode() || CGlobalInfo::is_learn_mode() || latency_rate != 0 || get_is_stateless()) ?true:false ); diff --git a/src/flow_stat.cpp b/src/flow_stat.cpp index d44a91da..778c92b9 100644 --- a/src/flow_stat.cpp +++ b/src/flow_stat.cpp @@ -387,7 +387,7 @@ CFlowStatRuleMgr::CFlowStatRuleMgr() { m_api = NULL; m_max_hw_id = -1; m_num_started_streams = 0; - m_ring_to_rx = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0); + m_ring_to_rx = NULL; } std::ostream& operator<<(std::ostream& os, const CFlowStatRuleMgr& cf) { @@ -439,6 +439,8 @@ int CFlowStatRuleMgr::add_stream(const TrexStream * stream) { std::cout << __METHOD_NAME__ << " user id:" << stream->m_rx_check.m_pg_id << std::endl; #endif + // Init everything here, and not in the constructor, since we relay on other objects + // By the time a stream is added everything else is initialized. if (! m_api ) { TrexStateless *tstateless = get_stateless_obj(); m_api = tstateless->get_platform_api(); @@ -455,6 +457,7 @@ int CFlowStatRuleMgr::add_stream(const TrexStream * stream) { for (uint8_t port = 0; port < m_num_ports; port++) { assert(m_api->reset_hw_flow_stats(port) == 0); } + m_ring_to_rx = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0); } if (no_stat_supported) @@ -641,9 +644,9 @@ void CFlowStatRuleMgr::send_start_stop_msg_to_rx(bool is_start) { TrexStatelessCpToRxMsgBase *msg; if (is_start) { - msg = new TrexRxStartMsg(); + msg = new TrexStatelessRxStartMsg(); } else { - msg = new TrexRxStopMsg(); + msg = new TrexStatelessRxStopMsg(); } m_ring_to_rx->Enqueue((CGenNode *)msg); } diff --git a/src/internal_api/trex_platform_api.h b/src/internal_api/trex_platform_api.h index fc5da491..dbca5a8a 100644 --- a/src/internal_api/trex_platform_api.h +++ b/src/internal_api/trex_platform_api.h @@ -42,7 +42,7 @@ public: struct { double m_cpu_util; - + double m_rx_cpu_util; double m_tx_bps; double m_rx_bps; diff --git a/src/latency.cpp b/src/latency.cpp index 930876b3..fff7935d 100644 --- a/src/latency.cpp +++ b/src/latency.cpp @@ -178,8 +178,7 @@ void CCPortLatency::reset(){ m_length_error=0; m_no_ipv4_option=0; for (int i = 0; i < MAX_FLOW_STATS; i++) { - m_rx_pg_pkts[i] = 0; - m_rx_pg_bytes[i] = 0; + m_rx_pg_stat[i].clear(); } m_hist.Reset(); } @@ -632,8 +631,8 @@ void CLatencyManager::handle_rx_pkt(CLatencyManagerPerPort * lp, rte_pktmbuf_free(m); } -void CLatencyManager::handle_latency_pkt_msg(uint8_t thread_id, - CGenNodeLatencyPktInfo * msg){ +// In VM, we receive the RX packets in DP core, and send message to RX core with the packet +void CLatencyManager::handle_latency_pkt_msg(uint8_t thread_id, CGenNodeLatencyPktInfo * msg) { assert(msg->m_latency_offset==0xdead); @@ -670,6 +669,7 @@ void CLatencyManager::run_rx_queue_msgs(uint8_t thread_id, } } +// VM mode function. Handle messages from DP void CLatencyManager::try_rx_queues(){ CMessagingManager * rx_dp = CMsgIns::Ins()->getRxDp(); diff --git a/src/latency.h b/src/latency.h index e85e3a5a..3dd1cc36 100644 --- a/src/latency.h +++ b/src/latency.h @@ -86,6 +86,7 @@ public: bool Parse(); uint8_t getTTl(); + uint16_t getIpId(); uint16_t getPktSize(); // Check if packet contains latency data @@ -244,9 +245,7 @@ public: uint64_t m_rx_check; uint64_t m_no_ipv4_option; uint64_t m_length_error; - uint32_t m_rx_pg_pkts[MAX_FLOW_STATS]; - uint32_t m_rx_pg_bytes[MAX_FLOW_STATS]; - + rx_per_flow_t m_rx_pg_stat[MAX_FLOW_STATS]; CTimeHistogram m_hist; /* all window */ CJitter m_jitter; }; diff --git a/src/main_dpdk.cpp b/src/main_dpdk.cpp index 9e690951..94361ec4 100644 --- a/src/main_dpdk.cpp +++ b/src/main_dpdk.cpp @@ -188,11 +188,9 @@ public: virtual void get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats); virtual void clear_extended_stats(CPhyEthIF * _if); - int get_rx_stats(CPhyEthIF * _if, uint32_t *pkts, uint32_t *prev_pkts, uint32_t *bytes, uint32_t *prev_bytes - , int min, int max); int dump_fdir_global_stats(CPhyEthIF * _if, FILE *fd) {return 0;} int get_stat_counters_num() {return MAX_FLOW_STATS;} - int get_rx_stat_capabilities() {return TrexPlatformApi::IF_STAT_IPV4_ID;} + int get_rx_stat_capabilities() {return TrexPlatformApi::IF_STAT_IPV4_ID;} virtual int wait_for_stable_link(); void wait_after_link_up(); }; @@ -244,6 +242,8 @@ public: virtual void clear_extended_stats(CPhyEthIF * _if); virtual int wait_for_stable_link(); + int get_stat_counters_num() {return MAX_FLOW_STATS;} + int get_rx_stat_capabilities() {return TrexPlatformApi::IF_STAT_IPV4_ID;} }; @@ -270,15 +270,17 @@ public: virtual bool is_hardware_filter_is_supported(){ return (true); } - virtual int configure_rx_filter_rules(CPhyEthIF * _if); - + virtual int configure_rx_filter_rules_stateless(CPhyEthIF * _if); + virtual int configure_rx_filter_rules_statefull(CPhyEthIF * _if); virtual bool is_hardware_support_drop_queue(){ return(true); } virtual void get_extended_stats(CPhyEthIF * _if,CPhyEthIFStats *stats); virtual void clear_extended_stats(CPhyEthIF * _if); virtual int wait_for_stable_link(); + virtual int get_stat_counters_num() {return MAX_FLOW_STATS;} + virtual int get_rx_stat_capabilities() {return TrexPlatformApi::IF_STAT_IPV4_ID;} }; class CTRexExtendedDriverBase40G : public CTRexExtendedDriverBase10G { @@ -1106,15 +1108,19 @@ public: m_port_conf.fdir_conf.status=RTE_FDIR_NO_REPORT_STATUS; /* Offset of flexbytes field in RX packets (in 16-bit word units). */ /* Note: divide by 2 to convert byte offset to word offset */ - if ( CGlobalInfo::m_options.preview.get_ipv6_mode_enable() ){ - m_port_conf.fdir_conf.flexbytes_offset=(14+6)/2; - }else{ - m_port_conf.fdir_conf.flexbytes_offset=(14+8)/2; - } + if (get_is_stateless()) { + m_port_conf.fdir_conf.flexbytes_offset = (14+4)/2; + } else { + if ( CGlobalInfo::m_options.preview.get_ipv6_mode_enable() ) { + m_port_conf.fdir_conf.flexbytes_offset = (14+6)/2; + } else { + m_port_conf.fdir_conf.flexbytes_offset = (14+8)/2; + } - /* Increment offset 4 bytes for the case where we add VLAN */ - if ( CGlobalInfo::m_options.preview.get_vlan_mode_enable() ){ - m_port_conf.fdir_conf.flexbytes_offset+=(4/2); + /* Increment offset 4 bytes for the case where we add VLAN */ + if ( CGlobalInfo::m_options.preview.get_vlan_mode_enable() ) { + m_port_conf.fdir_conf.flexbytes_offset += (4/2); + } } m_port_conf.fdir_conf.drop_queue=1; } @@ -1225,6 +1231,7 @@ void CPhyEthIFStats::Dump(FILE *fd){ DP_A(rx_nombuf); } +// Clear the RX queue of an interface, dropping all packets void CPhyEthIF::flush_rx_queue(void){ rte_mbuf_t * rx_pkts[32]; @@ -1797,6 +1804,9 @@ bool CCoreEthIF::Create(uint8_t core_id, return (true); } +// This function is only relevant if we are in VM. In this case, we only have one rx queue. Can't have +// rules to drop queue 0, and pass queue 1 to RX core, like in other cases. +// We receive all packets in the same core that transmitted, and handle them to RX core. void CCoreEthIF::flush_rx_queue(void){ pkt_dir_t dir ; bool is_rx = get_is_rx_thread_enabled(); @@ -2311,6 +2321,7 @@ public: float m_active_flows; float m_open_flows; float m_cpu_util; + float m_rx_cpu_util; uint8_t m_threads; uint32_t m_num_of_ports; @@ -2605,10 +2616,11 @@ public: int reset_counters(); private: - /* try to stop all datapath cores */ - void try_stop_all_dp(); + /* try to stop all datapath cores and RX core */ + void try_stop_all_cores(); /* send message to all dp cores */ int send_message_all_dp(TrexStatelessCpToDpMsgBase *msg); + int send_message_to_rx(TrexStatelessCpToRxMsgBase *msg); void check_for_dp_message_from_core(int thread_id); public: @@ -2616,7 +2628,6 @@ public: int start_master_statefull(); int start_master_stateless(); int run_in_core(virtual_thread_id_t virt_core_id); - int stop_core(virtual_thread_id_t virt_core_id); int core_for_rx(){ if ( (! get_is_rx_thread_enabled()) ) { return -1; @@ -2687,7 +2698,8 @@ public: CParserOption m_po ; CFlowGenList m_fl; bool m_fl_was_init; - volatile uint8_t m_signal[BP_MAX_CORES] __rte_cache_aligned ; + volatile uint8_t m_signal[BP_MAX_CORES] __rte_cache_aligned ; // Signal to main core when DP thread finished + volatile bool m_rx_running; // Signal main core when RX thread finished CLatencyManager m_mg; // statefull RX core CRxCoreStateless m_rx_sl; // stateless RX core CTrexGlobalIoMode m_io_modes; @@ -2776,12 +2788,14 @@ bool CGlobalTRex::is_all_links_are_up(bool dump){ return (all_link_are); } +void CGlobalTRex::try_stop_all_cores(){ -void CGlobalTRex::try_stop_all_dp(){ - - TrexStatelessDpQuit * msg= new TrexStatelessDpQuit(); - send_message_all_dp(msg); - delete msg; + TrexStatelessDpQuit * dp_msg= new TrexStatelessDpQuit(); + TrexStatelessRxQuit * rx_msg= new TrexStatelessRxQuit(); + send_message_all_dp(dp_msg); + send_message_to_rx(rx_msg); + delete dp_msg; + // no need to delete rx_msg. Deleted by receiver bool all_core_finished = false; int i; for (i=0; i<20; i++) { @@ -2812,6 +2826,13 @@ int CGlobalTRex::send_message_all_dp(TrexStatelessCpToDpMsgBase *msg){ return (0); } +int CGlobalTRex::send_message_to_rx(TrexStatelessCpToRxMsgBase *msg) { + CNodeRing *ring = CMsgIns::Ins()->getCpRx()->getRingCpToDp(0); + ring->Enqueue((CGenNode *) msg); + + return (0); +} + int CGlobalTRex::ixgbe_rx_queue_flush(){ int i; @@ -2868,12 +2889,11 @@ void CGlobalTRex::ixgbe_configure_mg(void) { // init m_rx_sl object for stateless rx core void CGlobalTRex::rx_sl_configure(void) { CRxSlCfg rx_sl_cfg; + int i; rx_sl_cfg.m_max_ports = m_max_ports; if ( get_vm_one_queue_enable() ) { -#if 0 - /// what to do here ??? /* vm mode, indirect queues */ for (i=0; i < m_max_ports; i++) { CMessagingManager * rx_dp = CMsgIns::Ins()->getRxDp(); @@ -2882,9 +2902,8 @@ void CGlobalTRex::rx_sl_configure(void) { m_latency_vm_vports[i].Create((uint8_t)i, r, &m_mg); rx_sl_cfg.m_ports[i] = &m_latency_vm_vports[i]; } -#endif } else { - for (int i = 0; i < m_max_ports; i++) { + for (i = 0; i < m_max_ports; i++) { CPhyEthIF * _if = &m_ports[i]; m_latency_vports[i].Create(_if, m_latency_tx_queue_id, 1); rx_sl_cfg.m_ports[i] = &m_latency_vports[i]; @@ -3403,6 +3422,9 @@ void CGlobalTRex::get_stats(CGlobalStats & stats){ stats.m_num_of_ports = m_max_ports; stats.m_cpu_util = m_fl.GetCpuUtil(); + if (get_is_stateless()) { + stats.m_rx_cpu_util = m_rx_sl.get_cpu_util(); + } stats.m_threads = m_fl.m_threads_info.size(); for (i=0; i<m_max_ports; i++) { @@ -3766,7 +3788,7 @@ int CGlobalTRex::run_in_master() { if (!is_all_cores_finished()) { /* probably CLTR-C */ - try_stop_all_dp(); + try_stop_all_cores(); } m_mg.stop(); @@ -3782,19 +3804,16 @@ int CGlobalTRex::run_in_master() { int CGlobalTRex::run_in_rx_core(void){ if (get_is_stateless()) { + m_rx_running = true; m_rx_sl.start(); } else { if ( CGlobalInfo::m_options.is_rx_enabled() ){ + m_rx_running = true; m_mg.start(0); } } - return (0); -} - - -int CGlobalTRex::stop_core(virtual_thread_id_t virt_core_id){ - m_signal[virt_core_id]=1; + m_rx_running = false; return (0); } @@ -3879,14 +3898,17 @@ int CGlobalTRex::stop_master(){ return (0); } -bool CGlobalTRex::is_all_cores_finished(){ +bool CGlobalTRex::is_all_cores_finished() { int i; for (i=0; i<get_cores_tx(); i++) { if ( m_signal[i+1]==0){ - return (false); + return false; } } - return (true); + if (m_rx_running) + return false; + + return true; } @@ -3972,7 +3994,6 @@ int CGlobalTRex::start_master_statefull() { //////////////////////////////////////////// - static CGlobalTRex g_trex; int CPhyEthIF::reset_hw_flow_stats() { @@ -3994,30 +4015,39 @@ int CPhyEthIF::reset_hw_flow_stats() { int CPhyEthIF::get_flow_stats(rx_per_flow_t *rx_stats, tx_per_flow_t *tx_stats, int min, int max, bool reset) { uint32_t diff_pkts[MAX_FLOW_STATS]; uint32_t diff_bytes[MAX_FLOW_STATS]; + bool hw_rx_stat_supported = get_ex_drv()->hw_rx_stat_supported(); - if (get_ex_drv()->get_rx_stats(this, diff_pkts, m_stats.m_fdir_prev_pkts - , diff_bytes, m_stats.m_fdir_prev_bytes, min, max) < 0) { - return -1; + if (hw_rx_stat_supported) { + if (get_ex_drv()->get_rx_stats(this, diff_pkts, m_stats.m_fdir_prev_pkts + , diff_bytes, m_stats.m_fdir_prev_bytes, min, max) < 0) { + return -1; + } + } else { + g_trex.m_rx_sl.get_rx_stats(get_port_id(), rx_stats, min, max, reset); } for (int i = min; i <= max; i++) { if ( reset ) { // return value so far, and reset - if (rx_stats != NULL) { - rx_stats[i - min].set_pkts(m_stats.m_rx_per_flow_pkts[i] + diff_pkts[i]); - rx_stats[i - min].set_bytes(m_stats.m_rx_per_flow_bytes[i] + diff_bytes[i]); + if (hw_rx_stat_supported) { + if (rx_stats != NULL) { + rx_stats[i - min].set_pkts(m_stats.m_rx_per_flow_pkts[i] + diff_pkts[i]); + rx_stats[i - min].set_bytes(m_stats.m_rx_per_flow_bytes[i] + diff_bytes[i]); + } + m_stats.m_rx_per_flow_pkts[i] = 0; + m_stats.m_rx_per_flow_bytes[i] = 0; } if (tx_stats != NULL) { tx_stats[i - min] = g_trex.clear_flow_tx_stats(m_port_id, i); } - m_stats.m_rx_per_flow_pkts[i] = 0; - m_stats.m_rx_per_flow_bytes[i] = 0; } else { - m_stats.m_rx_per_flow_pkts[i] += diff_pkts[i]; - m_stats.m_rx_per_flow_bytes[i] += diff_bytes[i]; - if (rx_stats != NULL) { - rx_stats[i - min].set_pkts(m_stats.m_rx_per_flow_pkts[i]); - rx_stats[i - min].set_bytes(m_stats.m_rx_per_flow_bytes[i]); + if (hw_rx_stat_supported) { + m_stats.m_rx_per_flow_pkts[i] += diff_pkts[i]; + m_stats.m_rx_per_flow_bytes[i] += diff_bytes[i]; + if (rx_stats != NULL) { + rx_stats[i - min].set_pkts(m_stats.m_rx_per_flow_pkts[i]); + rx_stats[i - min].set_bytes(m_stats.m_rx_per_flow_bytes[i]); + } } if (tx_stats != NULL) { tx_stats[i - min] = g_trex.get_flow_tx_stats(m_port_id, i); @@ -4028,6 +4058,8 @@ int CPhyEthIF::get_flow_stats(rx_per_flow_t *rx_stats, tx_per_flow_t *tx_stats, return 0; } +// If needed, send packets to rx core for processing. +// This is relevant only in VM case, where we receive packets to the working DP core (only 1 DP core in this case) bool CCoreEthIF::process_rx_pkt(pkt_dir_t dir, rte_mbuf_t * m){ @@ -4036,17 +4068,25 @@ bool CCoreEthIF::process_rx_pkt(pkt_dir_t dir, return false; } bool send=false; - CLatencyPktMode *c_l_pkt_mode = g_trex.m_mg.c_l_pkt_mode; - bool is_lateancy_pkt = c_l_pkt_mode->IsLatencyPkt(parser.m_ipv4) & parser.IsLatencyPkt(parser.m_l4 + c_l_pkt_mode->l4_header_len()); - if (is_lateancy_pkt){ - send=true; - }else{ - if ( get_is_rx_filter_enable() ){ - uint8_t max_ttl = 0xff - get_rx_check_hops(); - uint8_t pkt_ttl = parser.getTTl(); - if ( (pkt_ttl==max_ttl) || (pkt_ttl==(max_ttl-1) ) ) { - send=true; + if ( get_is_stateless() ) { + // In stateless RX, we only care about flow stat packets + if ((parser.getIpId() & 0xff00) == IP_ID_RESERVE_BASE) { + send = true; + } + } else { + CLatencyPktMode *c_l_pkt_mode = g_trex.m_mg.c_l_pkt_mode; + bool is_lateancy_pkt = c_l_pkt_mode->IsLatencyPkt(parser.m_ipv4) & parser.IsLatencyPkt(parser.m_l4 + c_l_pkt_mode->l4_header_len()); + + if (is_lateancy_pkt) { + send = true; + } else { + if ( get_is_rx_filter_enable() ) { + uint8_t max_ttl = 0xff - get_rx_check_hops(); + uint8_t pkt_ttl = parser.getTTl(); + if ( (pkt_ttl==max_ttl) || (pkt_ttl==(max_ttl-1) ) ) { + send=true; + } } } } @@ -4086,7 +4126,6 @@ static int latency_one_lcore(__attribute__((unused)) void *dummy) CPlatformSocketInfo * lpsock=&CGlobalInfo::m_socket; physical_thread_id_t phy_id =rte_lcore_id(); - if ( lpsock->thread_phy_is_rx(phy_id) ) { g_trex.run_in_rx_core(); }else{ @@ -4444,6 +4483,7 @@ int main_test(int argc , char * argv[]){ g_trex.reset_counters(); } + g_trex.m_rx_running = false; if ( get_is_stateless() ) { g_trex.start_master_stateless(); @@ -4731,11 +4771,13 @@ void CTRexExtendedDriverBase1G::get_extended_stats(CPhyEthIF * _if,CPhyEthIFStat void CTRexExtendedDriverBase1G::clear_extended_stats(CPhyEthIF * _if){ } +#if 0 int CTRexExtendedDriverBase1G::get_rx_stats(CPhyEthIF * _if, uint32_t *pkts, uint32_t *prev_pkts ,uint32_t *bytes, uint32_t *prev_bytes, int min, int max) { uint32_t port_id = _if->get_port_id(); return g_trex.m_rx_sl.get_rx_stats(port_id, pkts, prev_pkts, bytes, prev_bytes, min, max); } +#endif void CTRexExtendedDriverBase10G::clear_extended_stats(CPhyEthIF * _if){ _if->pci_reg_read(IXGBE_RXNFGPC); @@ -4751,7 +4793,43 @@ void CTRexExtendedDriverBase10G::update_configuration(port_cfg_t * cfg){ cfg->m_tx_conf.tx_thresh.wthresh = TX_WTHRESH; } -int CTRexExtendedDriverBase10G::configure_rx_filter_rules(CPhyEthIF * _if){ +int CTRexExtendedDriverBase10G::configure_rx_filter_rules(CPhyEthIF * _if) { + if ( get_is_stateless() ) { + return configure_rx_filter_rules_stateless(_if); + } else { + return configure_rx_filter_rules_statefull(_if); + } + + return 0; +} + +int CTRexExtendedDriverBase10G::configure_rx_filter_rules_stateless(CPhyEthIF * _if) { + uint8_t port_id = _if->get_rte_port_id(); + int ip_id_lsb; + + for (ip_id_lsb = 0; ip_id_lsb < MAX_FLOW_STATS; ip_id_lsb++ ) { + struct rte_eth_fdir_filter fdir_filter; + int res = 0; + + memset(&fdir_filter,0,sizeof(fdir_filter)); + fdir_filter.input.flow_type = RTE_ETH_FLOW_NONFRAG_IPV4_OTHER; + fdir_filter.soft_id = ip_id_lsb; // We can use the ip_id_lsb also as filter soft_id + fdir_filter.input.flow_ext.flexbytes[0] = 0xff; + fdir_filter.input.flow_ext.flexbytes[1] = ip_id_lsb; + fdir_filter.action.rx_queue = 1; + fdir_filter.action.behavior = RTE_ETH_FDIR_ACCEPT; + fdir_filter.action.report_status = RTE_ETH_FDIR_NO_REPORT_STATUS; + res = rte_eth_dev_filter_ctrl(port_id, RTE_ETH_FILTER_FDIR, RTE_ETH_FILTER_ADD, &fdir_filter); + + if (res != 0) { + rte_exit(EXIT_FAILURE, " ERROR rte_eth_dev_filter_ctrl : %d\n",res); + } + } + + return 0; +} + +int CTRexExtendedDriverBase10G::configure_rx_filter_rules_statefull(CPhyEthIF * _if) { uint8_t port_id=_if->get_rte_port_id(); uint16_t hops = get_rx_check_hops(); uint16_t v4_hops = (hops << 8)&0xff00; @@ -5208,6 +5286,9 @@ TrexDpdkPlatformApi::get_global_stats(TrexPlatformGlobalStats &stats) const { g_trex.get_stats(trex_stats); stats.m_stats.m_cpu_util = trex_stats.m_cpu_util; + if (get_is_stateless()) { + stats.m_stats.m_rx_cpu_util = trex_stats.m_rx_cpu_util; + } stats.m_stats.m_tx_bps = trex_stats.m_tx_bps; stats.m_stats.m_tx_pps = trex_stats.m_tx_pps; diff --git a/src/stateless/cp/trex_stateless.cpp b/src/stateless/cp/trex_stateless.cpp index 9e24802b..9df57a50 100644 --- a/src/stateless/cp/trex_stateless.cpp +++ b/src/stateless/cp/trex_stateless.cpp @@ -132,6 +132,7 @@ TrexStateless::encode_stats(Json::Value &global) { api->get_global_stats(stats); global["cpu_util"] = stats.m_stats.m_cpu_util; + global["rx_cpu_util"] = stats.m_stats.m_rx_cpu_util; global["tx_bps"] = stats.m_stats.m_tx_bps; global["rx_bps"] = stats.m_stats.m_rx_bps; diff --git a/src/stateless/cp/trex_streams_compiler.cpp b/src/stateless/cp/trex_streams_compiler.cpp index be5002da..563236c2 100644 --- a/src/stateless/cp/trex_streams_compiler.cpp +++ b/src/stateless/cp/trex_streams_compiler.cpp @@ -477,7 +477,8 @@ TrexStreamsCompiler::compile_stream(TrexStream *stream, TrexStream *fixed_rx_flow_stat_stream = stream->clone(true); - get_stateless_obj()->m_rx_flow_stat.start_stream(fixed_rx_flow_stat_stream, fixed_rx_flow_stat_stream->m_rx_check.m_hw_id); //???? check for errors + // not checking for errors. We assume that if add_stream succeeded, start_stream will too. + get_stateless_obj()->m_rx_flow_stat.start_stream(fixed_rx_flow_stat_stream, fixed_rx_flow_stat_stream->m_rx_check.m_hw_id); /* can this stream be split to many cores ? */ if (!stream->is_splitable(dp_core_count)) { diff --git a/src/stateless/messaging/trex_stateless_messaging.cpp b/src/stateless/messaging/trex_stateless_messaging.cpp index 3468d622..7edf0f13 100644 --- a/src/stateless/messaging/trex_stateless_messaging.cpp +++ b/src/stateless/messaging/trex_stateless_messaging.cpp @@ -209,13 +209,17 @@ TrexDpPortEventMsg::handle() { } /************************* messages from CP to RX **********************/ -bool TrexRxStartMsg::handle (CRxCoreStateless *rx_core) { +bool TrexStatelessRxStartMsg::handle (CRxCoreStateless *rx_core) { rx_core->work(); return true; } -/************************* messages from CP to RX **********************/ -bool TrexRxStopMsg::handle (CRxCoreStateless *rx_core) { +bool TrexStatelessRxStopMsg::handle (CRxCoreStateless *rx_core) { rx_core->idle(); return true; } + +bool TrexStatelessRxQuit::handle (CRxCoreStateless *rx_core) { + rx_core->quit(); + return true; +} diff --git a/src/stateless/messaging/trex_stateless_messaging.h b/src/stateless/messaging/trex_stateless_messaging.h index b7e8fd3f..0eed01bd 100644 --- a/src/stateless/messaging/trex_stateless_messaging.h +++ b/src/stateless/messaging/trex_stateless_messaging.h @@ -356,11 +356,15 @@ public: }; -class TrexRxStartMsg : public TrexStatelessCpToRxMsgBase { +class TrexStatelessRxStartMsg : public TrexStatelessCpToRxMsgBase { bool handle (CRxCoreStateless *rx_core); }; -class TrexRxStopMsg : public TrexStatelessCpToRxMsgBase { +class TrexStatelessRxStopMsg : public TrexStatelessCpToRxMsgBase { + bool handle (CRxCoreStateless *rx_core); +}; + +class TrexStatelessRxQuit : public TrexStatelessCpToRxMsgBase { bool handle (CRxCoreStateless *rx_core); }; diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index 86711189..ab7c08d1 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -18,6 +18,7 @@ void CRxCoreStateless::create(const CRxSlCfg &cfg) { CLatencyManagerPerPort * lp = &m_ports[i]; lp->m_io = cfg.m_ports[i]; } + m_cpu_cp_u.Create(&m_cpu_dp_u); } void CRxCoreStateless::handle_cp_msg(TrexStatelessCpToRxMsgBase *msg) { @@ -71,32 +72,94 @@ void CRxCoreStateless::idle_state_loop() { } void CRxCoreStateless::start() { - static int count = 0; - static int i = 0; - - while (true) { - if (m_state == STATE_WORKING) { - count += try_rx(); - i++; - if (i == 100) { - i = 0; - // if no packets in 100 cycles, sleep for a while to spare the cpu - if (count == 0) { - delay(1); - } - count = 0; - periodic_check_for_cp_messages(); - } - } else { - idle_state_loop(); - } -#if 0 - ??? do we need this? - if ( m_core->is_terminated_by_master() ) { - break; - } -#endif - } + static int count = 0; + static int i = 0; + bool do_try_rx_queue =CGlobalInfo::m_options.preview.get_vm_one_queue_enable() ? true : false; + + while (true) { + if (m_state == STATE_WORKING) { + i++; + if (i == 100) { + i = 0; + // if no packets in 100 cycles, sleep for a while to spare the cpu + if (count == 0) { + delay(1); + } + count = 0; + periodic_check_for_cp_messages(); // m_state might change in here + } + } else { + if (m_state == STATE_QUIT) + break; + idle_state_loop(); + } + if (do_try_rx_queue) { + try_rx_queues(); + } + count += try_rx(); + } +} + +void CRxCoreStateless::handle_rx_pkt(CLatencyManagerPerPort *lp, rte_mbuf_t *m) { + Cxl710Parser parser; + + if (parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) { + uint16_t ip_id; + if (parser.get_ip_id(ip_id) == 0) { + if (is_flow_stat_id(ip_id)) { + uint16_t hw_id = get_hw_id(ip_id); + lp->m_port.m_rx_pg_stat[hw_id].add_pkts(1); + lp->m_port.m_rx_pg_stat[hw_id].add_bytes(m->pkt_len); + } + } + } +} + +// In VM setup, handle packets coming as messages from DP cores. +void CRxCoreStateless::handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r) { + while ( true ) { + CGenNode * node; + if ( r->Dequeue(node) != 0 ) { + break; + } + assert(node); + + CGenNodeMsgBase * msg = (CGenNodeMsgBase *)node; + CGenNodeLatencyPktInfo * l_msg; + uint8_t msg_type = msg->m_msg_type; + uint8_t rx_port_index; + CLatencyManagerPerPort * lp; + + switch (msg_type) { + case CGenNodeMsgBase::LATENCY_PKT: + l_msg = (CGenNodeLatencyPktInfo *)msg; + assert(l_msg->m_latency_offset == 0xdead); + rx_port_index = (thread_id << 1) + (l_msg->m_dir & 1); + assert( rx_port_index < m_max_ports ); + lp = &m_ports[rx_port_index]; + handle_rx_pkt(lp, (rte_mbuf_t *)l_msg->m_pkt); + break; + default: + printf("ERROR latency-thread message type is not valid %d \n", msg_type); + assert(0); + } + + CGlobalInfo::free_node(node); + } +} + +// VM mode function. Handle messages from DP +void CRxCoreStateless::try_rx_queues() { + + CMessagingManager * rx_dp = CMsgIns::Ins()->getRxDp(); + uint8_t threads=CMsgIns::Ins()->get_num_threads(); + int ti; + for (ti = 0; ti < (int)threads; ti++) { + CNodeRing * r = rx_dp->getRingDpToCp(ti); + if ( ! r->isEmpty() ) { + handle_rx_queue_msgs((uint8_t)ti, r); + } + } } int CRxCoreStateless::try_rx() { @@ -105,26 +168,19 @@ int CRxCoreStateless::try_rx() { for (i = 0; i < m_max_ports; i++) { CLatencyManagerPerPort * lp = &m_ports[i]; rte_mbuf_t * m; + m_cpu_dp_u.start_work(); /* try to read 64 packets clean up the queue */ uint16_t cnt_p = lp->m_io->rx_burst(rx_pkts, 64); total_pkts += cnt_p; if (cnt_p) { int j; for (j = 0; j < cnt_p; j++) { - Cxl710Parser parser; m = rx_pkts[j]; - if (parser.parse(rte_pktmbuf_mtod(m, uint8_t *), m->pkt_len) == 0) { - uint16_t ip_id; - if (parser.get_ip_id(ip_id) == 0) { - if (is_flow_stat_id(ip_id)) { - uint16_t hw_id = get_hw_id(ip_id); - m_ports[i].m_port.m_rx_pg_bytes[hw_id] += m->pkt_len; - m_ports[i].m_port.m_rx_pg_pkts[hw_id]++; - } - } - } + handle_rx_pkt(lp, m); rte_pktmbuf_free(m); } + /* commit only if there was work to do ! */ + m_cpu_dp_u.commit(); }/* if work */ }// all ports return total_pkts; @@ -141,19 +197,21 @@ uint16_t CRxCoreStateless::get_hw_id(uint16_t id) { void CRxCoreStateless::reset_rx_stats(uint8_t port_id) { for (int hw_id = 0; hw_id < MAX_FLOW_STATS; hw_id++) { - m_ports[port_id].m_port.m_rx_pg_bytes[hw_id] = 0; - m_ports[port_id].m_port.m_rx_pg_pkts[hw_id] = 0; + m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear(); } } -int CRxCoreStateless::get_rx_stats(uint8_t port_id, uint32_t *pkts, uint32_t *prev_pkts - , uint32_t *bytes, uint32_t *prev_bytes, int min, int max) { +int CRxCoreStateless::get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset) { for (int hw_id = min; hw_id <= max; hw_id++) { - pkts[hw_id] = m_ports[port_id].m_port.m_rx_pg_pkts[hw_id] - prev_pkts[hw_id]; - prev_pkts[hw_id] = m_ports[port_id].m_port.m_rx_pg_pkts[hw_id]; - bytes[hw_id] = m_ports[port_id].m_port.m_rx_pg_bytes[hw_id] - prev_bytes[hw_id]; - prev_bytes[hw_id] = m_ports[port_id].m_port.m_rx_pg_bytes[hw_id]; + rx_stats[hw_id - min] = m_ports[port_id].m_port.m_rx_pg_stat[hw_id]; + if (reset) { + m_ports[port_id].m_port.m_rx_pg_stat[hw_id].clear(); + } } - return 0; } + +double CRxCoreStateless::get_cpu_util() { + m_cpu_cp_u.Update(); + return m_cpu_cp_u.GetVal(); +} diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index eecc8033..5ab12f4e 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -1,27 +1,28 @@ /* - Ido Barnea - Cisco Systems, Inc. + Ido Barnea + Cisco Systems, Inc. */ /* -Copyright (c) 2016-2016 Cisco Systems, Inc. + Copyright (c) 2016-2016 Cisco Systems, Inc. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ #ifndef __TREX_STATELESS_RX_CORE_H__ #define __TREX_STATELESS_RX_CORE_H__ #include <stdint.h> #include "latency.h" +#include "utl_cpuu.h" class TrexStatelessCpToRxMsgBase; @@ -34,7 +35,7 @@ class CRxSlCfg { public: uint32_t m_max_ports; - double m_cps;// CPS + double m_cps; CPortLatencyHWBase * m_ports[TREX_MAX_PORTS]; }; @@ -42,21 +43,27 @@ class CRxCoreStateless { enum state_e { STATE_IDLE, STATE_WORKING, + STATE_QUIT }; public: void start(); void create(const CRxSlCfg &cfg); void reset_rx_stats(uint8_t port_id); - int get_rx_stats(uint8_t port_id, uint32_t *pkts, uint32_t *prev_pkts - , uint32_t *bytes, uint32_t *prev_bytes, int min, int max); + int get_rx_stats(uint8_t port_id, rx_per_flow_t *rx_stats, int min, int max, bool reset); void work() {m_state = STATE_WORKING;} void idle() {m_state = STATE_IDLE;} + void quit() {m_state = STATE_QUIT;} + double get_cpu_util(); + private: void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg); bool periodic_check_for_cp_messages(); void idle_state_loop(); + void handle_rx_pkt(CLatencyManagerPerPort * lp, rte_mbuf_t * m); + void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r); int try_rx(); + void try_rx_queues(); bool is_flow_stat_id(uint16_t id); uint16_t get_hw_id(uint16_t id); @@ -64,8 +71,10 @@ class CRxCoreStateless { uint32_t m_max_ports; bool m_has_streams; CLatencyManagerPerPort m_ports[TREX_MAX_PORTS]; - state_e m_state; /* state of all ports */ - CNodeRing *m_ring_from_cp; - CNodeRing *m_ring_to_cp; + state_e m_state; /* state of all ports */ + CNodeRing *m_ring_from_cp; + CNodeRing *m_ring_to_cp; + CCpuUtlDp m_cpu_dp_u; + CCpuUtlCp m_cpu_cp_u; }; #endif |